# -*- coding: utf-8 -*-
"""
===============================================================================
Copyright © 2021 Kouadio K.Laurent
This file is part of pyCSAMT.
pyCSAMT is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyCSAMT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with pyCSAMT. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
.. _module-Func-utils::`csamtpy.utils.func_utils`
:synopsis: helpers functions
...
Created on Sun Sep 13 09:24:00 2020
@author: @Daniel03
:utils:
* averageData
* concat_array_from_list
* sort_array_data
* transfer_array_ (deprecated)
* interpol_scipy
* _set_depth_to_coeff
* broke_array_to_
* _OlDFUNCNOUSEsearch_fill_data (deprecated)
* _search_ToFill_Data
* straighten_out_list
* take_firstValue_offDepth
* dump_comma
* build_wellData
* compute_azimuth
* build_geochemistry_sample
* _nonelist_checker
* _order_well
* intell_index
* _nonevalue_checker
* _clean_space
*_cross_eraser
* _remove_str_word
* stn_check_split_type
* minimum_parser_to_write_edi
"""
####################### import modules #######################
import os
import shutil
import warnings
import inspect
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
import csamtpy.utils.gis_tools as gis
from csamtpy.utils.decorator import deprecated
# import deprecated
# set logger #
from csamtpy.utils._csamtpylog import csamtpylog
_logger = csamtpylog.get_csamtpy_logger(__name__)
# _logger.setLevel(logging.DEBUG)
try:
import scipy
scipy_version = [int(ss) for ss in scipy.__version__.split('.')]
if scipy_version[0] == 0:
if scipy_version[1] < 14:
warnings.warn('Note: need scipy version 0.14.0 or higher or interpolation '
'might not work.', ImportWarning)
_logger.warning('Note: need scipy version 0.14.0 or higher or interpolation '
'might not work.')
import scipy.interpolate as spi
interp_import = True
except ImportError: # pragma: no cover
warnings.warn('Could not find scipy.interpolate, cannot use method interpolate'
'check installation you can get scipy from scipy.org.')
_logger.warning('Could not find scipy.interpolate, cannot use method interpolate'
'check installation you can get scipy from scipy.org.')
interp_import = False
###################### end import module ###################################
[docs]def averageData(np_array, filter_order=0,
axis_average=0, astype="float32"): #array_of_average_array=0
"""
Parameters
-----------
* np_array : numpy array
must be an array data
* filter_order : int
must be the index of the column you want to sort
* axis average : int
axis you want to see data averaged, also , it is the concatenate axis
default is axis=0
* astype*: str ,
is the ndarray dtype array .
change to have an outup arry dtype , you want .
Returns
--------
numpy array
Data averaged array
:Example :
>>> import numpy as np
>>> list8=[[4,2,0.1],[8,2,0.7],[10,1,0.18],[4,3,0.1],
... [7,2,1.2],[10,3,0.5],[10,1,0.5],[8.2,0,1.9],
... [10,7,0.5],[10,1,0.5],
... [2,0,1.4],[5,4,0.5],[10,2,0.7],[7,2,1.078],
... [10,2,3.5],[10,8,1.9]]
>>> np_test=np.array(list8)
>>> ss=averageData(np_array=np_test,filter_order=1,
... axis_average=0, astype="int")
>>> ss
"""
idx,sep_counts=0,0
global_list=[]
#Filter the array
np_array=np_array[np_array[:,filter_order].argsort(kind="mergesort")]
#returns the differents value on the filtersort index of array :
values, counts =np.unique(np_array[:,filter_order], return_counts=True)
# append array with numpy :(values.shape)
# # new_array=np.append(new_array,rowline,axis=0)
for rowline in np_array :
if rowline[filter_order] ==values.max():
new_array=np_array[sep_counts:,:]
temp_array=new_array.mean(axis=axis_average)
temp_array=temp_array.reshape((1,temp_array.shape[0]))
global_list.append(temp_array)
break
elif rowline[filter_order] !=np_array[idx+1,filter_order]:
new_array=np_array[sep_counts:idx+1,:]
temp_array=new_array.mean(axis=axis_average)
temp_array=temp_array.reshape((1,temp_array.shape[0]))
global_list.append(temp_array)
sep_counts=idx+1
idx=idx+1
np_out_put=concat_array_from_list(list_of_array=global_list,
concat_axis=axis_average)
np_out_put=np_out_put.astype(astype)
return np_out_put
[docs]def concat_array_from_list(list_of_array, concat_axis=0):
"""
Small function to concatenate a list with array contents
Parameters
-----------
* list_of_array : list
contains a list for array data. the concatenation is possible
if an index array have the same size
Returns
-------
array_like
numpy concatenated data
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> ass=np.random.randn(10)
>>> ass2=np.linspace(0,15,12)
>>> ass=ass.reshape((ass.shape[0],1))
>>> ass2=ass2.reshape((ass2.shape[0],1))
>>> or_list=[ass,ass2]
>>> ss_check_error=concat_array_from_list(list_of_array=or_list,
... concat_axis=0)
>>> secont test :
>>> ass=np.linspace(0,15,14)
>>> ass2=np.random.randn(14)
>>> ass=ass.reshape((ass.shape[0],1))
>>> ass2=ass2.reshape((ass2.shape[0],1))
>>> or_list=[ass,ass2]
>>> ss=concat_array_from_list(list_of_array=or_list, concat_axis=0)
>>> ss=concat_array_from_list(list_of_array=or_list, concat_axis=1)
>>> ss
>>> ss.shape
"""
#first attemp when the len of list is ==1 :
if len(list_of_array)==1:
if type(list_of_array[0])==np.ndarray:
output_array=list_of_array[0]
if output_array.ndim==1:
if concat_axis==0 :
output_array=output_array.reshape((1,output_array.shape[0]))
else :
output_array=output_array.reshape((output_array.shape[0],1))
return output_array
elif type(list_of_array[0])==list:
output_array=np.array(list_of_array[0])
if concat_axis==0 :
output_array=output_array.reshape((1,output_array.shape[0]))
else :
output_array=output_array.reshape((output_array.shape[0],1))
return output_array
# check the size of array in the liste when the len of list is >=2
for ii,elt in enumerate(list_of_array) :
if type(elt)==list:
elt=np.array(elt)
if elt is None :
pass
elif elt.ndim ==1 :
if concat_axis==0 :
elt=elt.reshape((1,elt.shape[0]))
else :
elt=elt.reshape((elt.shape[0],1))
list_of_array[ii]=elt
output_array=list_of_array[0]
for ii in list_of_array[1:]:
output_array=np.concatenate((output_array,ii), axis=concat_axis)
return output_array
[docs]def sort_array_data(data, sort_order =0,
concatenate=False, concat_axis_order=0 ):
"""
Function to sort array data and concatenate
numpy.ndarray
Parameters
----------
* data : numpy.ndarray
must be in simple array , list of array and
dictionary whom the value is numpy.ndarray
* sort_order : int, optional
index of colum to sort data. The default is 0.
* concatenate : Boolean , optional
concatenate all array in the object.
Must be the same dimentional if concatenate is set to True.
The *default* is False.
* concat_axis_order : int, optional
must the axis of concatenation . The default is axis=0.
Returns
-------
numpy.ndarray
data , Either the simple sort data or
array sorted and concatenated .
"""
if type(data)==list :
for ss, val in data :
val=val[val[:,sort_order].argsort(kind="mergesort")]
data[ss]=val
if concatenate:
data=concat_array_from_list(list_of_array=data,
concat_axis=concat_axis_order)
elif type(data)==dict:
for key, value in data.items():
value=value[value[:,sort_order].argsort(kind="mergesort")]
data[key]=value
if concatenate==True :
temp_list=list(data.values())
data=concat_array_from_list(list_of_array=temp_list,
concat_axis=concat_axis_order)
else :
data=data[data[:,sort_order].argsort(kind="mergesort")]
return data
[docs]@deprecated ("Function replaced to another {_search_ToFill_Data}")
def transfer_array_(data, index_key,start_value_depth, end_value_depth,
column_order_selection=0, axis=0):
"""
Parameters
----------
* data : dict
Dictionnary of numpy ndarray .
* index_key : float
key of the dictionnary .
Must be a number of the first column of offset .
* start_value_depth : float
If the depth is not reach must add depth of the closest point.
give the start value which match to the maxi depth of the data :
The *default* is -214.
* end_value_depth : float
Maximum depth of the survey. The default is -904.
* column_order_selection : int,
the index of depth column. The default is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
numpy.ndarray
return the array data we want to top to .
:Example:
>>> import numpy as np
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> print(sos2)
>>> sis1=sort_array_data(data=sos,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis2=sort_array_data(data=sos2,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> dico={"18.4":sis1,
... "21.4":sis2}
>>> test=transfer_array_(data=dico, index_key=11.4,
... start_value_depth=-14, end_value_depth=23,
... column_order_selection=1)
>>> print("sis1:",sis1)
>>> print("sis2:",sis2)
>>> print("Finaltest", test)
"""
start_value_depth=abs(start_value_depth)
end_value_depth=abs(end_value_depth)
comp, flag,iter_,translist_=0,-1,0,[]
# chef the depth colum if negative before enter in loop :
if type (data)==dict :
for key, value in data.items(): # scroll the dictionary
if float(key)> float(index_key): # check the key of dictionnary
#before using its value
maxi_depth=abs(value[:,column_order_selection]).max() # if yes
# calculate the max depth of the value :
comp=0
for rowline in value : # scroll the row of the array dict value
if abs(rowline[column_order_selection])>start_value_depth : # check its
# print(abs(rowline[column_order_selection]))
# value and compare it to section we must start extract (start)
if end_value_depth > maxi_depth:
transData_=value[comp:,:] # if yes transfer data :
# print(transData_)
translist_.append(transData_)
start_value_depth=abs(transData_[:,column_order_selection]).max()
# start_value_depth=maxi_depth
comp,iter_=0,iter_+1
flag=1
elif end_value_depth<=maxi_depth:
indix =abs(value[:,column_order_selection]).tolist().index(end_value_depth)
if iter_==0 :
transData_=value[comp:indix,::]
flag=0
break
else :
transData_=value[comp:indix,::]
translist_.append(transData_)
flag=1
if start_value_depth >= end_value_depth:
flag=1
break
else :
comp +=1
if flag ==0 :
return transData_
if flag==1 :
trans_array=concat_array_from_list(list_of_array=translist_, concat_axis=axis)
return trans_array
if type(data)==list: # create a dictionnary of array value
dico={}
for ss, value in enumerate( data) :
dico["{0}".format(value[0,0])]=value
# call recursive function
transfer_array_(data=dico,index_key=index_key,start_value_depth=start_value_depth,
end_value_depth=end_value_depth,column_order_selection=column_order_selection, axis=0)
[docs]def interpol_scipy (x_value, y_value,x_new,
kind="linear",plot=False, fill="extrapolate"):
"""
function to interpolate data
Parameters
------------
* x_value : np.ndarray
value on array data : original absciss
* y_value : np.ndarray
value on array data : original coordinates (slope)
* x_new : np.ndarray
new value of absciss you want to interpolate data
* kind : str
projection kind :
maybe : "linear", "cubic"
* fill : str
kind of extraolation, if None , *spi will use constraint interpolation
can be "extrapolate" to fill_value.
* plot : Boolean
Set to True to see a wiewer graph
Returns
--------
np.ndarray
y_new ,new function interplolate values .
:Example:
>>> import numpy as np
>>> fill="extrapolate"
>>> x=np.linspace(0,15,10)
>>> y=np.random.randn(10)
>>> x_=np.linspace(0,20,15)
>>> ss=interpol_Scipy(x_value=x, y_value=y, x_new=x_, kind="linear")
>>> ss
"""
func_=spi.interp1d(x_value, y_value, kind=kind,fill_value=fill)
y_new=func_(x_new)
if plot :
plt.plot(x_value, y_value,"o",x_new, y_new,"--")
plt.legend(["data", "linear","cubic"],loc="best")
plt.show()
return y_new
def _set_depth_to_coeff(data, depth_column_index,coeff=1, depth_axis=0):
"""
Parameters
----------
* data : np.ndarray
must be on array channel .
* depth_column_index : int
index of depth_column.
* depth_axis : int, optional
Precise kind of orientation of depth data(axis =0 or axis=1)
The *default* is 0.
* coeff : float,
the value you want to multiplie depth.
set depth to negative multiply by one.
The *default* is -1.
Returns
-------
data : np.ndarray
new data after set depth according to it value.
:Example:
>>> import numpy as np
>>> np.random.seed(4)
>>> data=np.random.rand(4,3)
>>> data=data*(-1)
>>> print("data\n",data)
>>> data[:,1]=data[:,1]*(-1)
>>> data[data<0]
>>> print("data2\n",data)
"""
if depth_axis==0:
data[:,depth_column_index]=data[:,depth_column_index]*coeff
if depth_axis==1:
data[depth_column_index,:]=data[depth_column_index,:]*coeff
return data
[docs]def broke_array_to_(arrayData, keyIndex=0, broken_type="dict"):
"""
broke data array into different value with their same key
Parameters
----------
* arrayData :np.array
data array .
* keyIndex : int
index of column to create dict key
Returns
-------
dict
dico_brok ,dictionnary of array.
"""
# find the max_counts
vcounts_temp,counts_temp=np.unique(arrayData[:,keyIndex], return_counts=True)
vcounts_temp_max=vcounts_temp.max()
# print(vcounts_temp)
# print(vcounts_temp_max)
# print(vcounts_temp.min())
dico_brok={}
lis_brok=[]
index=0
deb=0
for rowlines in arrayData :
if rowlines[0] == vcounts_temp_max:
value=arrayData[index:,::]
if broken_type=="dict":
dico_brok["{0}".format(rowlines[0])]=value
break
elif broken_type=="list":
lis_brok.append(value)
break
elif rowlines[0] !=arrayData[index+1,keyIndex]:
value=arrayData[deb:index+1,::]
if broken_type=="dict":
dico_brok["{0}".format(rowlines[0])]=value
elif broken_type=="list":
lis_brok.append(value)
deb=index+1
index=index+1
if broken_type=="dict":
return dico_brok
elif broken_type=="list":
return lis_brok
@deprecated('this function is replaced by [_search_ToFill_Data] ')
def _OlDFUNCNOUSEsearch_fill_data(dicoReal, arrayTemp ,
max_value, index_of_depth,axis=0):
"""
Deprecated function , very expensive.
Parameters
----------
* data : dict
Dictionnary of numpy ndarray .
* dataReal : dict
dictionnary . must be a dictionnary of real of offset .
* arrayTemp : np.ndarray
must be a numpy array of reserve data , the one , we want to
extract the depth data to fill array
* max_value : float
Maximum depth of the survey.
* index_of_depth : int,
the index of depth column. The *default* is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
array_like
the array data we want to top to .
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> sos3=abs(np.randon.rand(8,3)*12.4)
>>> # print(sos2)
>>> sis1=sort_array_data(data=sos,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis2=sort_array_data(data=sos2,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis3=sort_array_data(data=sos3,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> dico={"18.4":sis1,
... "21.4":sis2}
>>> test=_search_fill_data(dicoReal=dico, index_key=11.4,
... start_value=10, max_value=23,
... index_of_depth=1)
>>> print("sis1\n:",sis1)
>>> print("sis2\n:",sis2)
>>> print("Finaltest\n", test)
"""
# arange a dictionany :
#from keys : for k in sorted(dico.keys()):
# print(%s: %s",%(k,names[k]))
# from values : for k , v in sorted(dico.items(),key=lambda x:x[1]):
# print(%s: %s",%(k,v))
_all_checker,keyToSkip=[],[]
litemp= broke_array_to_(arrayData=arrayTemp,keyIndex=0,
broken_type="list")
itemp,real,realTem=[],[],[]
for ii in litemp:
temp=sort_array_data(data=ii,sort_order=1,
concatenate=False)
itemp.append(temp)
# print(itemp)
for key , value in sorted(dicoReal.items()):
# print(sorted(dicoReal.items()))
# for ii, rowline in enumerate(value) :
rowmax=value[:,index_of_depth].max()
real.append((float(key),rowmax))
_all_checker.append(rowmax)
#chek if all elements are reach the depth max
# if all(_all_checker)==True : # if one of the depth is the same
# return dicoReal
realTem=real.copy()
for ii , value in enumerate(realTem):
if value[1] == max_value:
del realTem[ii]
#real.pop(ii)
if realTem ==[]:
return dicoReal
print("Real : \n",real)
idx,flag=0,0
comp,sp=0,-1
fin_list,endList=[],[]
while idx < len(realTem):
indexKey=realTem[idx][0] #11,4
maxKey=realTem[idx][1] #214
# if real[idx][1]==max_value : # case where thedepth value of realdico is get
# flag=3
# # elif real[idx][1]==max_value : # case where thedepth value of realdico is get
# # flag=3
# else :
if idx==len(realTem)-1:
indic=real.index(realTem[idx])
# check whether there is data after a delete the offset with depth value reach
if realTem[idx][0]==real[-1][0]:
nexIndex=itemp[-1][0][0]
else :
nexIndex=real[indic+1][0]
else :
nexIndex=realTem[idx+1][0]#61.4
# loop the reserve list :
for ii, array in enumerate (itemp): #itemp is the list of reserve broken list
# if array[0][0]> indexKey and array[0][0]> nexIndex:
# keyToSkip.append(realTem[idx])
# continue
if array[0][0]> indexKey and array[0][0]<nexIndex : #check offset and next offset , if True :
for index, rowline in enumerate(array) : # loop the array now
# tem_depth=rowline[1] # take the value of depth of reserve array for the first row
if rowline[1] > maxKey :# maxKey=214, rowline[1]= :# (max_value=904) if True :
sp=sp+1
if sp==0 :
add_array=array[index:,:]
# num=index
# print("True:\n",add_array)
flag=4
else :
sp=-1
pass
# print(maxtem)
if flag==4:
# maxtem=array[-1][1] # take the maximum depth of the reserve array , last row
if array[-1][1] < max_value : # if the maximum depth not reach
comp=comp+1
if array[0][0]<nexIndex :
endList.append(add_array)
else :
fin_list.append(add_array) # keep the array in temporary list
# maxKey = maxtem # set up new maximum depth
# flag=1
maxKey = array[-1][1]
# print("flag4,comp>1")
# if maxKey <
elif array[-1][1] ==max_value : # the maximum depth is reached
flag=5
if flag==5 :
if comp==0 : # first check is ok
# add_array=array[num:,:] # cut the array
endList.append(add_array)
flag=0
# print("flag5,comp=0")
else :
add_array=array[index:,:] #
fin_list.append(add_array) # list to create one ar
flag=1
# if flag==3 : # in that case is true , save the value of the offset
# # for key , value in dicoReal.items() :
# # if float(key) == real[idx][0]:
# # endList.append(value)
# keyToSkip.append(array[0][0])
if flag==1 :
arT=concat_array_from_list(list_of_array=fin_list,concat_axis=axis)
endList.append(arT)
if flag==0 :
endList=endList
idx=idx+1
print("keyToSkip\n:",keyToSkip)
#delete the the offset which are full depth on the list
# print(keyToSkip)
# print(real,"\n")
for ii , value in enumerate(realTem):
if value[0] in keyToSkip:
del realTem[ii]
#real.pop(ii)
# now we are the list of recoverd depth
# build dictionnary
print("Realtem\n",realTem,)
print("endlist : \n",endList)
# print(keyToSkip)
for ii , tuple_ in enumerate (realTem) : #take the realkey from dico
realKey, maxValue=tuple_
print(realKey,maxValue)
add_array=endList[ii] # take the add_value generated
# print(endList)
for key , value in dicoReal.items(): # search in dictionnary the key
if float(key)==realKey: # and compare it to the key from tuple ...
#... just to have certitude then concatenate
val=np.concatenate((value,add_array),axis=0)
dicoReal[key]=val
# print(ii)
return dicoReal
def _search_ToFill_Data (dicoReal, arrayTemp ,
max_value, index_of_depth,axis=0):
"""
Parameters
------------
* data : dict
Dictionnary of numpy ndarray .
* dicoReal : dict
dictionnary . must be a dictionnary of real of offset .
* arrayTemp : np.ndarray
must be a numpy array of reserve data , the one , we want to
extract the depth data to fill array
* max_value : float
Maximum depth of the survey.
* index_of_depth : int,
the index of depth column. The default is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
dict
dictionnary of offsets filled the array data we want to top to
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> sos3=abs(np.random.randn(5,3)*10.8)
>>> temp3=abs(np.random.rand(4,3)*12.4)
>>> temp2=abs(np.random.rand(4,3)*15.4)
>>> temp1=abs(np.random.rand(4,3)*9.4)
>>> temp4=abs(np.random.rand(5,3)*9.9)
>>> dico,temp={},[]
>>> ff=[sos,sos2,sos3,temp1,temp2,temp3,temp4]
>>> fin=[sort_array_data(data=ii,sort_order =1,
... concatenate=False, concat_axis_order=0) for ii in ff ]
>>> key=[11.9,61.4,102.7]
>>> vat=[214,405,904]
>>> for ii in range(3):
... fin[ii][:,0]=key[ii]
... fin[ii][-1][1]=vat[ii]
>>> tempi=[(19.4,[11,18,50,120]),(28.4,[12,17,403,904]),
... (78.3,[11,8,202,804]),(124.4,[203,403,604,714,904])]
>>> for ss, val in enumerate(tempi) :
... fin[ss+3][:,0]=val[0]
... fin[ss+3][:,1]=np.array(val[1])
>>> for ss, van in enumerate (fin):
... if ss<=3 :
... dico[van[0][0]]=van
... if ss>3 :
... temp.append(van)
>>> arrayTemp=concat_array_from_list(list_of_array=temp,
... concat_axis=0)
>>> sis02=_search_ToFill_Data(dicoReal=dico, arrayTemp=arrayTemp ,
... max_value=904, index_of_depth=1,axis=0)
>>> print(sis02)
"""
#Notes :
# arange a dictionany :
#from keys : for k in sorted(dico.keys()):
# print(%s: %s",%(k,names[k]))
# from values : for k , v in sorted(dico.items(),key=lambda x:x[1]):
# print(%s: %s",%(k,v)
# for ii , value in enumerate(realTem):
# if value[0] in keyToSkip:
# del realTem[ii]
# #real.pop(ii)
#------------------------------------
# _all_checker,keyToSkip=[],[]
litemp= broke_array_to_(arrayData=arrayTemp,keyIndex=0,
broken_type="list")
itemp,real=[],[],
# flag=-1
for ii in litemp:
temp=sort_array_data(data=ii,sort_order=1,
concatenate=False)
itemp.append(temp)
# print(itemp)
for key , value in sorted(dicoReal.items()):
# print(sorted(dicoReal.items()))
# for ii, rowline in enumerate(value) :
rowmax=value[:,index_of_depth].max()
real.append((float(key),rowmax))
# _all_checker.append(rowmax)
# print(itemp)
iter_=-1
for ss,(offs, maxDepth) in enumerate( real):
# [(11.4,214),(19.4,120),(61.4,405),(102,904)]
# print(offs)
if maxDepth < max_value : # max_value=904 , maxDepth=214
if ss == len(real)-1:
for num, arTem in enumerate(itemp):
kk=arTem[0][0]
if kk > offs :
nextIndex=kk
else :
break
# nextIndex=itemp[-1][0][0]
else :
nextIndex=real[ss+1][0]
for ii, array in enumerate(itemp) : # [array(28),array(78), ....]
if offs < array[0][0] and array[0][0] < nextIndex:
for index, rowline in enumerate(array) : # scroll array(28)
if rowline[1]> maxDepth : # if rowline > the maxDepth=214
iter_=iter_+1 # first iteration on array (28)
if iter_==0 : # collect the remain info and add to dico
add_array=array[index:,:]
maxDepth=array[:,1].max() # caluclate nnex
# print(add_array)
else :
pass
if iter_==0 and maxDepth<=max_value :
for key, value in dicoReal.items() :
if float(key) ==offs :
new_value=concat_array_from_list(list_of_array=[value,add_array],
concat_axis=axis)
dicoReal[key]=new_value
iter_=-1
# break
else :
pass
return dicoReal
[docs]def straighten_out_list (main_list , list_to_straigh):
"""
Parameters
----------
* main_list : list
list of which the data must absolutely appear into the straighen list.
in our case , it is the station list : a list of offset
* list_to_straigh : list
list contain the data (offset calculated , the depth and the resistivity (log10)),
Returns
-------
* list
the straighen list.
some offset have been replaced by the offsets which are not in the
main_list whithout change the lengh of the straighen list.
:Example:
>>> import numpy as np
>>> np.random.seed(14)
>>> ss=np.random.randn(10)*12
>>> ss=ss.tolist()
>>> ss=[round(float(jj),4) for jj in ss]
>>> ss.sort()
>>> red=np.random.randn(7)*12
>>> red=red.tolist()
>>> test=[19, 15.012, 5.5821, 0.7234,3.1,
... 0.7919, 3.445, 4.7398, 5.1, 10.8, 15.51,21]
>>> main=[20., 0.7234, 5, 3.445, 15.51,10.7, 3,5.1]
>>> test.sort()
>>> main.sort()
>>> red=[round(float(ss),1) for ss in red]
>>> print(test)
>>> print(main)
>>> sos=straighten_out_list (main_list=main ,
... list_to_straigh=test)
>>> print("sos:\n",sos)
"""
staIter=np.array(list_to_straigh)# ARRAY
max_X=staIter.max()
valuesIter,counts=np.unique(staIter, return_counts=True)
valuesIter=valuesIter.tolist() # List sorted of straighten value
# print(valuesIter)
misoffs,vamin,tempi=[],[],[]
# min_,flag=1,0
for sta in main_list: # keep the offset not in list to straight
if sta not in valuesIter :
misoffs.append(sta)
if misoffs ==[]:
return list_to_straigh
# inject miss offset value in the temporary list and sorted
#this to choose the closet value we want to substitude the missoffset value
tempi=[ii for ii in valuesIter]
for ii in misoffs :
tempi.append(ii)
tempi=deepcopy(tempi)
tempi.sort(reverse =False)
# print("mainlist:\n",main_list)
# print("tempiSorted:\n",tempi)
# print("missoff:\n",misoffs)
# print("lisofStraight_max:\n",max_X)
# # print(len(misoffs))
# sp=-1
for jj, off in enumerate(tempi): #scroll the create temporary list
for ss, ofi in enumerate(misoffs): # scroll the miss offset list and compare it
# to the temportary list
if off==ofi:
# print(off,ss)
indexof =main_list.index(off) # find the index of "off" value in the mainlist
# in order to calculate the distance between the value to its next value
# or forward value ex :off=3 , backward =0.724 , forward =3.445
# if main_list[indexof]==main_list[-1]: # that's mean the "off" value reaches the end of mainlist
# if tempi[-1]>main_list[indexof]: #
# deltaMain=main_list[indexof]-tempi[-1] # take the diff between
# vamin.append(tempi[-1])
# else :
# deltaMain=main_list[indexof]-tempi[-2]
# else :
# deltaMain=main_list[indexof]-main_list[indexof+1]
# if main_list[indexof+1] != tempi[jj+1] :
if off==tempi[-1]:
# deltaOffneg=abs(off-tempi[jj-1])
vamin.append(tempi[jj-1])
# delta=deltaOffneg
elif off==tempi[0]:
deltaOffpos=abs(off-tempi[jj+1])
vamin.append(tempi[jj+1])
# delta=deltaOffpos
else :
deltaOffpos=abs(off-tempi[jj+1])
deltaOffneg=abs(off-tempi[jj-1])
if deltaOffpos >= deltaOffneg :
# delta=deltaOffneg
vamin.append(tempi[jj-1])
# index=jj-1
else :
# delta=deltaOffpos
# index=jj+1
vamin.append(tempi[jj+1])
# print("vamin:\n",vamin)
# print(len(vamin))
for ii, val in enumerate(list_to_straigh):
for ss, of in enumerate(vamin):
if val==of :
list_to_straigh[ii]=misoffs[ss]
# if list_to_straigh[-1] !=max_X :
return list_to_straigh
[docs]def take_firstValue_offDepth(data_array,
filter_order=1):
"""
Parameters
----------
* data_array : np.array
array of the data .
* filter_order : int , optional
the column you want to filter. The default is 1.
Returns
-------
array_like
return array of the data filtered.
:Example:
>>> import numpy as np
>>> list8=[[4,2,0.1],[8,2,0.7],[10,1,0.18],[4,3,0.1],
... [7,2,1.2],[10,3,0.5],[10,1,0.5],[8.2,0,1.9],
... [10,7,0.5],[10,1,0.5],[2,0,1.4],[5,4,0.5],
... [10,2,0.7],[7,2,1.078],[10,2,3.5],[10,8,1.9]]
>>> test=np.array(list8)
>>> print(np_test)
>>> ss=take_firstValue_offDepth(data_array =np_test, filter_order=1)
>>> ss=averageData(np_array=np_test,filter_order=1,
>>> axis_average=0, astype="int")
>>> print(ss)
"""
listofArray=[]#data_array[0,:]]
data_array=data_array[data_array[:,filter_order].argsort(kind="mergesort")]
# print(data_array,"\n:")
# np_array=np_array[np_array[:,filter_order].argsort(kind="mergesort")]
#returns the differents value on the filtersort index of array :
values, counts =np.unique(data_array[:,filter_order], return_counts=True)
for ii, rowline in enumerate(data_array ):
if rowline[filter_order]==values[-1]:
listofArray.append(data_array[ii])
break
elif rowline[filter_order] !=data_array[ii-1][filter_order]:
listofArray.append(data_array[ii])
array =concat_array_from_list(list_of_array=listofArray, concat_axis=0)
array=array[array[:,filter_order].argsort(kind="mergesort")]
listofArray=[]
return array
[docs]def dump_comma(input_car, max_value=2, carType='mixed'):
"""
Parameters
----------
* input_car : str,
Input character.
* max_value : int, optional
The default is 2.
* carType: str
Type of character , you want to entry
Returns
-------
Tuple of input character
must be return tuple of float value, or string value
.. note:: carType may be as arguments parameters like ['value','val',"numeric",
"num", "num","float","int"] or for pure character like
["car","character","ch","char","str", "mix", "mixed","merge","mer",
"both","num&val","val&num&"]
if not , can not possible to convert to float or integer.
the *defaut* is mixed
:Example:
>>> import numpy as np
>>> ss=dump_comma(input_car=",car,box", max_value=3,
... carType="str")
>>> print(ss)
... ('0', 'car', 'box')
"""
# dump "," at the end of
flag=0
if input_car[-1]==",":
input_car=input_car[:-1]
if input_car[0]==",":
input_car="0,"+ input_car[1:]
if carType.lower() in ['value','val',"numeric",
"num", "num","float","int"]:
input_car=eval(input_car)
elif carType.lower() in ["car","character","ch","char","str",
"mix", "mixed","merge","mer",
"both","num&val","val&num&"]:
input_car=input_car.strip(",").split(",")
flag=1
# elif carType.lower() in ["mix", "mixed","merge","mer",
# "both","num&val","val&num&"]:
# input_car=input_car.split(",")
if np.iterable(input_car)==False :
inputlist=[input_car,0]
# input_car=tuple(inputlist)
elif np.iterable(input_car) is True :
# if flag==1 :
# inputlist=input_car
# # print(inputlist)
# else :
inputlist=list(input_car)
# print("false")
input_car=inputlist[:max_value]
# print(input_car)
if flag==1 :
if len(inputlist)==1 :
return(inputlist[0])
return tuple(input_car)
[docs]def build_wellData (add_azimuth=False, utm_zone="49N",
report_path=None,add_geochemistry_sample=False):
"""
Parameters
----------
* add_azimuth : Bool, optional
compute azimuth if add_azimut is set to True. The default is False.
* utm_zone : Str, optional
WGS84 utm_projection. set your zone if add_azimuth is turn to True.
The default is "49N".
* report_path : str, optional
path to save your _well_report. The default is None.
its match the current work directory
* add_geochemistry_sample: bool
add_sample_data.Set to True if you want to add_mannually Geochimistry data.
default is False.
Raises
------
Exception
manage the dimentionaly of ndarrays .
OSError
when report_path is not found in your O.S.
Returns
-------
str
name of location of well .
np.ndarray
WellData , data of build Wells .
np.ndarray
GeolData , data of build geology.
:Example:
>>> import numpy as np
>>> import os, shutil
>>> import warnings,
>>> form _utils.avgpylog import AvgPyLog
>>> well=build_wellData (add_azimuth=True, utm_zone="49N")
>>> print("nameof locations\n:",well[0])
>>> print("CollarData\n:",well[1])
>>> print("GeolData\n:", well[2])
... nameof locations
... Shimen
... CollarData
... [['S01' '477205.6935' '2830978.218' '987.25' '-90' '0.0' 'Shi01'
... 'Wdanxl0']
... ['S18' '477915.4355' '2830555.927' '974.4' '-90' '2.111' 'Shi18'
... 'Wdanxl0']]
... GeolData
... [['S01' '0.0' '240.2' 'granite']
... ['S01' '240.2' '256.4' ' basalte']
... ['S01' '256.4' '580.0' ' granite']
... ['S01' '580.0' '987.25' 'rock']
... ['S18' '0.0' '110.3' 'sand']
... ['S18' '110.3' '520.2' 'agrilite']
... ['S18' '520.2' '631.3' ' granite']
... ['S18' '631.3' '974.4' ' rock']]
... Shimen_wellReports_
"""
reg_lines=[]
wellSites,ftgeo,hole_list,Geolist=[],[],[],[]
text=["Enter the name of Location:",
"well_name :",
"Coordinates (Easting, Northing)_UTM_{0} : ".format(utm_zone),
"Hole Buttom and dip values (Bottom, dip):" ,
"Layers-thickness levels in (meters):",
"Geology-layers or <stratigraphy> names (Top_To_Buttom):",
"{0:-^70}".format(' Report '),
"DH_Hole,DH_Easting, DH_Northing, DH_Buttom,"\
" DH_Dip,DH_Azimuth, DH_PlanDepth,DH_Descrip",
"GeolData :",
"WellData:",
"DH_Hole, DH_From, DH_To, Rock",
"SampleData",
"DH_Hole, DH_From,DH_To, Sample",
"{0:-^70}".format(' InputData '),
]
name_of_location =input("Enter the name of Location:")
reg_lines.append(''.join(text[0]+'{0:>18}'.format(name_of_location)+'\n'))
reg_lines.append('\n')
reg_lines.append(text[13]+'\n')
comp=-1
while 1 :
DH_Hole=input("Enter the well_name :")
if DH_Hole=="" or DH_Hole=="end":
Geol=concat_array_from_list(list_of_array=Geolist,
concat_axis=0)
break
print("Enter the coordinates (Easting, Northing) : ", end="")
DH_East_North=input()
DH_East_North=dump_comma(input_car=DH_East_North,
max_value=2, carType='value')
print("Enter the Hole Bottom value and dip (Bottom, dip):", end='')
dh_botdip=input()
dh_botdip=dump_comma(input_car=dh_botdip,
max_value=2, carType='value')
#check the dip of the well
if float(dh_botdip[1])==0.:
dh_botdip[1]=(90.)
elif float(dh_botdip[0])==0.:
raise Exception(
"The curent bottom has a value 0.0 . Must put the bottom of the well as deep as possible !"
)
hole_list.append(DH_Hole)
wellSites.append((DH_Hole, DH_East_North[0],DH_East_North[1],
dh_botdip[0],dh_botdip[1] ))
#DH_Hole (ID) DH_East DH_North DH_RH DH_Dip DH_Azimuth DH_Top DH_Bottom DH_PlanDepth DH_Decr Mask
reg_lines.append("".join(text[1]+'{0:>7}'.format(DH_Hole))+"\n")
reg_lines.append("".join(text[2])+"\n")
reg_lines.append(",".join(['{0:>14}'.format(str(ii)) for ii in list(DH_East_North)])+"\n")
reg_lines.append("".join(text[3])+"\n")
reg_lines.append(",".join(['{0:>7}'.format(str(ii)) for ii in list(dh_botdip)])+"\n")
comp+=-1
while DH_Hole :
# print("Enter the layer thickness (From_, _To, geology):",end='')
if comp==-1 :
Geol=concat_array_from_list(list_of_array=ftgeo,
concat_axis=0)
ftgeo=[] # initialize temporary list
break
# comp=comp+1
print("Enter the layers-thickness levels in (meters):", end='')
dh_from_in=input()
if dh_from_in=="" or dh_from_in=="end":
break
dh_from=eval(dh_from_in)
dh_from_ar=np.array(dh_from)
dh_from_ar=dh_from_ar.reshape((dh_from_ar.shape[0],1))
# check the last input bottom :
if dh_from_ar[-1] >= float(dh_botdip[0]):
_logger.info("The input bottom of well {{0}}, is {{1}}. It's less last layer thickess: {{2}}."\
"we add maximum bottom at 1.023km depth.".format(DH_Hole,dh_botdip[0],
dh_from_ar[-1]))
dh_botdip[0]=(1023.)
wellSites[-1][3]=dh_botdip[0] # last append of wellSites
#find Dh_to through give dh_from
dh_to=dh_from_ar[1:]
dh_to=np.append(dh_to,dh_botdip[0])
dh_to=dh_to.reshape((dh_to.shape[0],1))
print("Enter the geology-layers names (From _To):",end="")
rock=input()
rock=rock.strip(",").split(",") # strip in the case where ","appear at the end
rock_ar=np.array(rock)
rock_ar=rock_ar.reshape((rock_ar.shape[0],1))
try :
if rock_ar.shape[0]==dh_from_ar.shape[0]:
drill_names=np.full((rock_ar.shape[0],1),DH_Hole)
fromtogeo=np.concatenate((drill_names,dh_from_ar,
dh_to, rock_ar),axis=1)
except IndexError:
_logger.warn("np.ndarry sizeError:Check 'geologie', 'Dh_From', and "\
"'Dh_To' arrays size properly . It seems one size is "\
" too longeR than another. ")
warnings.warn(" All the arrays size must match propertly!")
ftgeo.append(fromtogeo)
comp=-1
reg_lines.append("".join(text[4])+"\n")
reg_lines.append(",".join(['{0:>7}'.format(str(jj)) for jj in list(dh_from)])+"\n")
reg_lines.append("".join(text[5])+"\n")
reg_lines.append(",".join(['{0:>12}'.format(jj) for jj in rock])+"\n") # rock already in list
# reg_lines.append("".join(rock+"\n"))
reg_lines.append("\n")
Geolist.append(Geol)
name_of_location=name_of_location.capitalize()
#set on numpy array
for ii , value in enumerate(wellSites):
value=np.array(list(value))
wellSites[ii]=value
#create a wellsites array
wellSites=concat_array_from_list(wellSites,concat_axis=0)
DH_Hole=wellSites[:,0]
DH_PlanDepth=np.zeros((wellSites.shape[0],),dtype="<U8")
DH_Decr=np.full((wellSites.shape[0],),"Wdanxl0",dtype="<U9")
lenloc=len(name_of_location)
for ii , row in enumerate(DH_Hole):
# in order to keep all the well name location
DH_PlanDepth[ii]=np.array(name_of_location[:-int(lenloc/2)]+row[-2:])
Geol[:,1]=np.array([np.float(ii) for ii in Geol[:,1]])
Geol[:,2]=np.array([np.float(ii) for ii in Geol[:,2]])
if add_azimuth==False:
DH_Azimuth=np.full((wellSites.shape[0]),0)
elif add_azimuth == True :
DH_Azimuth=compute_azimuth(easting = np.array([np.float(ii) for ii in wellSites[:,1]]),
northing =np.array([np.float(ii) for ii in wellSites[:,2]]),
utm_zone=utm_zone)
DH_Azimuth=DH_Azimuth.reshape((DH_Azimuth.shape[0],1))
WellData=np.concatenate((wellSites,DH_Azimuth,
DH_PlanDepth.reshape((DH_PlanDepth.shape[0],1)),
DH_Decr.reshape((DH_Decr.shape[0],1))), axis=1)
GeolData=Geol.copy()
#-----write Report---
reg_lines.append(text[6]+'\n')
# reg_lines.append(text[7]+"\n")
reg_lines.append(text[9]+'\n')
reg_lines.append("".join(['{0:>12}'.format(ss) for ss in text[7].split(",")]) +'\n')
for rowline in WellData :
reg_lines.append(''.join(["{0:>12}".format(ss) for ss in rowline.tolist()])+"\n")
reg_lines.append(text[8]+"\n")
reg_lines.append("".join(['{0:>12}'.format(ss) for ss in text[10].split(",")]) +'\n')
for ii , row in enumerate(GeolData):
reg_lines.append(''.join(["{0:>12}".format(ss) for ss in row.tolist()])+"\n")
if add_geochemistry_sample==True:
SampleData=build_geochemistry_sample()
reg_lines.append(text[11]+'\n')
reg_lines.append("".join(['{0:>12}'.format(ss) for ss in text[12].split(",")]) +'\n')
for ii , row in enumerate(SampleData):
reg_lines.append(''.join(["{0:>12}".format(ss) for ss in row.tolist()])+"\n")
else :
SampleData=None
with open("{0}_wellReport_".format(name_of_location),"w", encoding="utf-8") as fid:
# for ii in reg_lines :
fid.writelines(reg_lines)
fid.close()
#---end write report---
if report_path is None:
report_path=os.getcwd()
elif report_path is not None :
if os.path.exists(report_path):
shutil.move((os.path.join(os.getcwd(),"{0}_wellReport_".\
format(name_of_location))),report_path)
else :
raise OSError ("The path does not match in your O.S .Try to put the right path")
warnings.warn ("ignore","the report_path doesn't match properly.Try to fix it !")
return (name_of_location, WellData , GeolData, SampleData)
[docs]def compute_azimuth(easting, northing, utm_zone="49N", extrapolate=False):
"""
Parameters
----------
* easting : np.ndarray
Easting value of coordinates _UTM_WGS84
* northing : np.ndarray
Northing value of coordinates._UTM_WGS84
* utm_zone : str, optional
the utm_zone . if None try to get is through
gis.get_utm_zone(latitude, longitude).
latitude and longitude must be on degree decimals. The default is "49N".
* extrapolate : bool ,
for other purpose , user can extrapolate azimuth value , in order to get the sizesize as
the easting and northing size. The the value will repositionate at each point data were collected.
Default is False as originally azimuth computation .
Returns
-------
np.ndarray
azimuth.
:Example:
>>> import numpy as np
>>> import gis_tools as gis
>>> easting=[477205.6935,477261.7258,477336.4355,477373.7903,477448.5,
... 477532.5484,477588.5806,477616.5968]
>>> northing=[2830978.218, 2830944.879,2830900.427, 2830878.202,2830833.75,
... 2830783.742,2830750.403,2830733.734]
>>> test=compute_azimuth(easting=np.array(easting),
... northing=np.array(northing), utm_zone="49N")
>>> print(test)
"""
#---**** method to compute azimuth****----
reference_ellipsoid=23
lat,long=gis.utm_to_ll(reference_ellipsoid=reference_ellipsoid,
northing=northing, easting=easting, zone=utm_zone)
#i, idx, ic_=0,0,pi/180
azimuth=0
i,ic_=0,np.pi /180
while i < lat.shape[0]:
xl=np.cos(lat[i]*ic_)*np.sin(lat[i+1]*ic_) - np.sin(lat[i]*ic_)\
*np.cos(lat[i+1]*ic_)*np.cos((long[i+1]-long[i])*ic_)
yl=np.sin((long[i+1]-long[i])*ic_)*np.cos(lat[i+1])
azim=np.arctan2(yl,xl)
azimuth=np.append(azimuth, azim)
i=i+1
if i==lat.shape[0]-1 :
# azimuth.append(0)
break
if extrapolate is True :
# interpolate azimuth to find the azimuth to first station considered to 0.
ff=spi.interp1d(x=np.arange(1,azimuth.size),
y=azimuth[1:], fill_value='extrapolate')
y_new , azim = ff(0),np.ones_like(azimuth)
azim[0], azim[1:] = y_new , azimuth[1:]
else :
azim=azimuth[1:] # substract the zero value added for computation as origin.
#convert to degree : modulo 45degree
azim = np.apply_along_axis(lambda zz : zz * 180/np.pi , 0, azim)
return np.around(azim,3)
[docs]def build_geochemistry_sample():
"""
Build geochemistry_sample_data
Raises
------
Process to build geochemistry sample data manually .
Returns
-------
np.ndarray
Sample ,Geochemistry sample Data.
:Example:
>>> geoch=build_geochemistry_sample()
>>> print(geoch)
... sampleData
... [['S0X4' '0' '254.0' 'PUP']
... ['S0X4' '254' '521.0' 'mg']
... ['S0X4' '521' '625.0' 'tut']
... ['S0X4' '625' '984.0' 'suj']
... ['S0X2' '0' '19.0' 'pup']
... ['S0X2' '19' '425.0' 'hut']
... ['S0X2' '425' '510.0' 'mgt']
... ['S0X2' '510' '923.2' 'pyt']]
"""
tempsamp,SampleList=[],[]
comp=-1
while 1:
print('Enter Hole Name or <Enter/end> to stop:',end='')
holeName=input()
if holeName=="" or holeName.lower() in ["stop","end","enter",
"finish","close"]:
Sample=concat_array_from_list(list_of_array=SampleList,
concat_axis=0)
break
comp=comp+1
samp_buttom=np.float(input("Enter the buttom of the sampling (m):"))
while holeName:
if comp==-1:
samP=concat_array_from_list(list_of_array=tempsamp,concat_axis=0)
tempsamp=[]
break
print("Enter the sampling levels:",end='')
samplevel=input()
samplevel=dump_comma(input_car=samplevel, max_value=12,
carType='value')
samp_ar=np.array(list(samplevel))
samp_ar=samp_ar.reshape((samp_ar.shape[0],1))
dh_to=samp_ar[1:]
dh_to=np.append(dh_to,samp_buttom)
dh_to=dh_to.reshape((dh_to.shape[0],1))
print("Enter the samples' names:",end='')
sampName=input()
sampName=dump_comma(input_car=sampName, max_value=samp_ar.shape[0],
carType='mixed')
sampName_ar=np.array(list(sampName))
sampName_ar=sampName_ar.reshape((sampName_ar.shape[0],1))
try :
holes_names=np.full((sampName_ar.shape[0],1),holeName)
samfromto=np.concatenate((holes_names,samp_ar,dh_to,sampName_ar),axis=1)
except Exception as e :
raise ("IndexError!, arrrays sample_DH_From:{0} &DH_To :{1}&"\
" 'Sample':{2} doesn't not match proprerrly.{3}".\
format(samp_ar.shape,dh_to.shape,sampName_ar.shape, e))
warnings.warn("IndexError !, dimentional problem."\
" please check np.ndarrays.shape.")
_logger.warn("IndexError !, dimentional problem."\
" please check np.ndarrays.shape.")
tempsamp.append(samfromto)
comp=-1
SampleList.append(samP)
print("\n")
return Sample
[docs]def parse_wellData(filename=None, include_azimuth=False,
utm_zone="49N"):
"""
Function to parse well information in*csv file
Parameters
----------
* filename : str, optional
full path to parser file, The default is None.
* include_azimuth: bool ,
Way to compute azimuth automatically
* utm_zone : str,
set coordinate _utm_WGS84. Defaut is 49N
Raises
------
FileNotFoundError
if typical file deoesnt match the *csv file.
Returns
-------
location: str
Name of location .
WellData : np.ndarray
Specificy the collar Data .
GeoData : np.ndarray
specify the geology data .
SampleData : TYPE
geochemistry sample Data.
:Example:
>>> import numpy as np
>>> dir_=r"F:\OneDrive\Python\CodesExercices\ex_avgfiles\modules"
>>> parse_=parse_wellData(filename='Drill&GeologydataT.csv')
>>> print("NameOflocation:\n",parse_[0])
>>> print("WellData:\n",parse_[1])
>>> print("GeoData:\n",parse_[2])
>>> print("Sample:\n",parse_[3])
"""
identity=["DH_Hole (ID)","DH_East","DH_North","DH_Dip",
"Elevation" ,'DH_Azimuth',"DH_Top","DH_Bottom",
"DH_PlanDepth","DH_Decr","Mask "]
car=",".join([ss for ss in identity])
# print(car)
#ckeck the if it is the correct file
_flag=0
if filename is None :
filename=[file for file in os.listdir(os.getcwd()) \
if (os.path.isfile(file)and file.endswith(".csv"))]
# print(filename)
if np.iterable (filename):
for file in filename :
with open (file,"r", encoding="utf-8") as f :
data=f.readlines()
_flag=1
else :
_logger.error('The {0} doesnt not match the *csv file'\
' You must convert file on *.csv format'.format(filename))
warnings.error("The input file is wrong ! only *.csv file "\
" can be parsed.")
elif filename is not None :
assert filename.endswith(".csv"), "The input file {0} is not in *.csv format"
with open (filename,'r', encoding='utf-8') as f:
data=f.readlines()
_flag=1
if _flag==1 :
try :
# print(data[0])
head=data[0].strip("'\ufeff").strip('\n').split(',')[:-1]
head=_nonevalue_checker(list_of_value=head,
value_to_delete='')
chk=[1 for ii ,value in enumerate( head) if value ==identity[ii]]
# data[0]=head
if not all(chk):
_logger.error('The {0} doesnt not match the correct file'\
' to parse drill data'.format(filename))
warnings.warn("The input file is wrong ! must "\
"input the correct file to be parsed.")
except Exception as e :
raise FileNotFoundError("The *csv file does no match the well file",e)
# process to parse all data
# coll,geol,samp=[],[],[]
for ss, elm in enumerate (data):
elm=elm.split(',')
for ii, value in enumerate(elm):
if value=='' or value=='\n':
elm=elm[:ii]
data[ss]=elm
data[0]=head
[data.pop(jj)for jj, val in enumerate(data) if val==[]]
if data[-1]==[]:
data=data[:-1]
## final check ###
safeData=_nonelist_checker(data=data, _checker=True ,
list_to_delete=['\n'])
data=safeData[2]
# identify collar , geology dans sample data
comp=0
for ss , elm in enumerate(data):
if elm[0].lower()=='geology':
coll=data[:ss]
comp=ss
if elm[0].lower() =="sample":
geol=data[comp+1:ss]
samp=data[ss+1:]
# build numpy data array
collar_list=coll[1:]
# print(collar_list)
collar_ar=np.zeros((len(collar_list), len(identity)),dtype='<U12')
for ii, row in enumerate(collar_ar):
collar_ar[ii:,:len(collar_list[ii])]= np.array(collar_list[ii])
bottom_ar=collar_ar[:,7]
# print(bottom_ar)
geol_list=geol[1:]
geol_ar=_order_well (data=geol_list,bottom_value=bottom_ar)
samp_list=samp[1:]
samp_ar=_order_well (data=samp_list,bottom_value=bottom_ar)
name_of_location =filename[:-3]
# find Description
DH_PlanDepth=collar_ar[:,8]
DH_Decr=collar_ar[:,9]
lenloc=len(name_of_location)
for ss , singleArray in enumerate (DH_PlanDepth):
# print(singleArray)
if singleArray == ''or singleArray is None :
singleArray=name_of_location[:-int(lenloc/2)]+collar_ar[ss,0][-2:]
DH_PlanDepth[ss]=singleArray
if DH_Decr[ss] ==''or DH_Decr[ss] is None :
DH_Decr[ss] = "wdanx"+collar_ar[ss,0][0]+\
name_of_location.lower()[0]+collar_ar[ss,0][-1]
collar_ar[:,8]=DH_PlanDepth
collar_ar[:,9]=DH_Decr
if include_azimuth==False:
DH_Azimuth=np.full((collar_ar.shape[0]),0)
elif include_azimuth== True:
DH_Azimuth=compute_azimuth(easting = np.array([np.float(ii) for ii in collar_ar[:,1]]),
northing = np.array([np.float(ii) for ii in collar_ar[:,2]]),
utm_zone=utm_zone, extrapolate=True)
collar_ar[:,5]=DH_Azimuth
name_of_location=name_of_location.capitalize()
WellData,GeoData,SampleData=collar_ar,geol_ar, samp_ar
return (name_of_location, WellData,GeoData,SampleData)
def _nonelist_checker(data, _checker=False ,
list_to_delete=['\n']):
"""
Function to delete a special item on list in data.
Any item you want to delete is acceptable as long as item is on a list.
Parameters
----------
* data : list
container of list. Data must contain others list.
the element to delete should be on list.
* _checker : bool, optional
The default is False.
* list_to_delete : TYPE, optional
The default is ['\n'].
Returns
-------
_occ : int
number of occurence.
num_turn : int
number of turns to elimate the value.
data : list
data safeted exempt of the value we want to delete.
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04', 'sample02', 'sample03'],
... ['sample04'], [], ['\n'],
... ['S01', '98.62776918', '204.7500461', '420.0266651'], ['prt'],
... ['pup', 'pzs'],[], ['papate04', '\n'],
... ['S02', '174.4293956'], [], ['400.12', '974.8945704'],
... ['pup', 'prt', 'pup', 'pzs', '', '\n'],
... ['saple07'], [], '', ['sample04'], ['\n'],
... [''], [313.9043882], [''], [''], ['2'], [''], ['2'], [''], [''], ['\n'],
... [''], ['968.82'], [], [],[], [''],[ 0.36], [''], ['\n']]
>>> ss=_nonelist_checker(data=listtest, _checker=True,
... list_to_delete=[''])
>>> print(ss)
"""
_occ,num_turn=0,0
if _checker is False:
_occ=0
return _occ, num_turn, data
while _checker is True :
if list_to_delete in data :
for indix , elem_chker in enumerate(data):
if list_to_delete == elem_chker :
_occ=_occ+1
del data[indix]
if data[-1]==list_to_delete:
_occ +=1
# data=data[:-1]
data.pop()
elif list_to_delete not in data :
_checker =False
num_turn+=1
return _occ,num_turn,data
def _order_well (data,**kwargs):
"""
Function to reorganize value , depth rock and depth-sample
the program controls the input depth value and compare it .
with the bottom. It will pay attention that bottom depth must
be greater or egual of any other value. In the same time ,
the program check if value entered are sorted on ascending order .
well must go deep ( less value to great value). Negative values of
depths are not acceptable.
Parameters
----------
* data : list,
data contains list of well thickness and rock description .
* bottom_value : np.ndarray float
value of bottom . it may the basement extrapolation.
default is 1.023 km
Returns
-------
np.ndarray
data, data aranged to [DH_Hole, DH_From, DH_To, Rock.] arrays
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04','Rock01', 'Rock02', 'Rock03', 'Rock04'],
>>> ['S01', '0.0', '98.62776918', '204.7500461','420.0266651',
... 'GRT', 'ATRK', 'GRT', 'ROCK'],
>>> ['S02', '174.4293956', '313.9043882', '400.12', '974.8945704',
... 'GRT', 'ATRK', 'GRT', 'ROCK']]
>>> print(listtest[1:])
>>> ss=_order_well(listtest[1:])
>>> print(ss)
"""
this_function_name=inspect.getframeinfo(inspect.currentframe())[2]
bottomgeo=kwargs.pop("bottom_value", None)
# bottomsamp=kwargs.pop("sample_bottom_value",None)
# if type(bottomgeo) is np.ndarray :_flag=1
# else : _flag=0
temp=[]
_logger.info ("You pass by {0} function! Thin now , everything is ok. ".format(this_function_name))
for jj, value in enumerate (data):
thickness_len,dial1,dial2=intell_index(datalist=value[1:]) #call intell_index
value=np.array(value)
dh_from=value[1:thickness_len+1]
dh_to =np.zeros((thickness_len,), dtype="<U12")
# ---- check the last value given
max_given_bottom=np.max(np.array([np.float(ii) for ii in dh_from]))
# ----it may be less than bottom value .
dh_geo=value[thickness_len+1:]
dh_to=dh_from[1:]
# if _flag==0 :
if bottomgeo[jj] =="" or bottomgeo[jj]==None :
bottomgeoidx=1023.
if max_given_bottom > bottomgeoidx:
_logger.warn("value {0} is greater than the Bottom depth {1}".format(max_given_bottom ,bottomgeoidx))
warnings.warn ("Given values have a value greater than the depth !")
dh_to=np.append(dh_to,bottomgeoidx)
else: #elif :_flag==1 : # check the bottom , any values given
# must be less or egual to depth not greater.
if max_given_bottom> np.float(bottomgeo[jj]):
_logger.warn("value {0} is greater than the Bottom depth {1}".format(max_given_bottom ,bottomgeo[jj]))
warnings.warn ("Given values have a value greater than the depth !")
dh_to=np.append(dh_to,bottomgeo[jj])
dh_hole=np.full((thickness_len),value[0])
temp.append(np.concatenate((dh_hole.reshape((dh_hole.shape[0],1)),
dh_from.reshape((dh_from.shape[0],1)),
dh_to.reshape((dh_to.shape[0],1)),
dh_geo.reshape((dh_geo.shape[0],1))),axis=1 ))
data=concat_array_from_list(list_of_array=temp, concat_axis=0)
return data
[docs]def intell_index (datalist,assembly_dials =False):
"""
function to search index to differency value to string element like
geological rocks and geologicals samples. It check that value are sorted
in ascending order.
Parameters
----------
* datalist : list
list of element : may contain value and rocks or sample .
* assembly_dials : list, optional
separate on two list : values and rocks or samples. The default is False.
Returns
-------
index: int
index of breaking up.
first_dial: list ,
first sclice of value part
secund_dial: list ,
second slice of rocks or sample part.
assembly : list
list of first_dial and second_dial
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04','Rock01', 'Rock02', 'Rock03', 'Rock04'],
... ['S01', '0.0', '98.62776918', '204.7500461','420.0266651','520', 'GRT',
... 'ATRK', 'GRT', 'ROCK','GRANODIORITE'],
... ['S02', '174.4293956', '313.9043882','974.8945704', 'GRT', 'ATRK', 'GRT']]
>>> listtest2=[listtest[1][1:],listtest[2][1:]]
>>> for ii in listtest2 :
>>> op=intell_index(datalist=ii)
>>> print("index:\n",op [0])
>>> print('firstDials :\n',op [1])
>>> print('secondDials:\n',op [2])
"""
# assembly_dials=[]
max_=0 # way to check whether values are in sort (ascending =True) order
# because we go to deep (first value must be less than the next none)
for ii, value in enumerate (datalist):
try :
thick=float(value)
if thick >= max_:
max_=thick
else :
_logger.warning("the input value {0} is less than the previous one."\
" Please enter value greater than {1}.".format(thick, max_) )
warnings.warn("Value {1} must be greater than the previous value {0}."\
" Must change on your input data.".format(thick,max_))
except :
# pass
indexi=ii
break
first_dial=datalist[:indexi]
second_dial =datalist[indexi:]
if assembly_dials:
assembly_dials=[first_dial,second_dial]
return indexi, assembly_dials
return indexi, first_dial, second_dial
def _nonevalue_checker (list_of_value, value_to_delete=None):
"""
Function similar to _nonelist_checker. the function deletes the specific
value on the list whatever the number of repetition of value_to_delete.
The difference with none list checker is value to delete
may not necessary be on list.
Parameters
----------
* list_of_value : list
list to check.
* value_to_delete : TYPE, optional
specific value to delete. The default is ''.
Returns
-------
list
list_of_value , safe list without the deleted value .
:Example:
>>> import numpy as np
>>> test=['DH_Hole (ID)', 'DH_East', 'DH_North',
... 'DH_Dip', 'Elevation ', 'DH_Azimuth',
... 'DH_Top', 'DH_Bottom', 'DH_PlanDepth', 'DH_Decr',
... 'Mask', '', '', '', '']
>>> test0= _nonevalue_checker (list_of_value=test)
>>> print(test0)
"""
if value_to_delete is None :
value_to_delete =''
if type (list_of_value) is not list :
list_of_value=list(list_of_value)
start_point=1
while start_point ==1 :
if value_to_delete in list_of_value :
[list_of_value.pop(ii) for ii, elm in\
enumerate (list_of_value) if elm==value_to_delete]
elif value_to_delete not in list_of_value :
start_point=0 # not necessary , just for secure the loop.
break # be sure one case or onother , it will break
return list_of_value
def _strip_item(item_to_clean, item=None, multi_space=12):
"""
Function to strip item around string values. if the item to clean is None or
item-to clean is "''", function will return None value
Parameters
----------
* item_to_clean : list or np.ndarray of string
List to strip item.
* cleaner : str , optional
item to clean , it may change according the use. The default is ''.
* multi_space : int, optional
degree of repetition may find around the item. The default is 12.
Returns
-------
list or ndarray
item_to_clean , cleaned item
:Example:
>>> import numpy as np
>>> new_data=_strip_item (item_to_clean=np.array([' ss_data',' pati ']))
>>> print(np.array([' ss_data',' pati ']))
... print(new_data)
"""
if item==None :item = ' '
cleaner =[(''+ ii*'{0}'.format(item)) for ii in range(multi_space)]
if type(item_to_clean ) != list :#or type(item_to_clean ) !=np.ndarray:
if type(item_to_clean ) !=np.ndarray:
item_to_clean=[item_to_clean]
if item_to_clean in cleaner or item_to_clean ==['']:
warnings.warn ('No data found in <item_to_clean :{}> We gonna return None.')
return None
try :
multi_space=int(multi_space)
except :
raise TypeError('argument <multplier> must be'\
' an integer not {0}'.format(type(multi_space)))
for jj, ss in enumerate(item_to_clean) :
for space in cleaner:
if space in ss :
new_ss=ss.strip(space)
item_to_clean[jj]=new_ss
return item_to_clean
def _cross_eraser (data , to_del, deep_cleaner =False):
"""
Function to delete some item present in another list. It may cheCk deeper
Parameters
----------
* data : list
Main data user want to filter.
* to_del : list
list of item you want to delete present on the main data.
* deep_cleaner : bool, optional
Way to deeply check. Sometimes the values are uncleaned and
capitalizeed . this way must not find their safety correspondace
then the algorth must clean item and to match all at the same time before eraisng.
The *default* is False.
Returns
-------
list
data , list erased.
:Example:
>>> data =['Z.mwgt','Z.pwgt','Freq',' Tx.Amp','E.mag',' E.phz',
... ' B.mag',' B.phz',' Z.mag', ' Zphz ']
>>> data2=[' B.phz',' Z.mag',]
... remain_data =cross_eraser(data=data, to_del=data2,
... deep_cleaner=True)
>>> print(remain_data)
"""
data , to_del=_strip_item(item_to_clean=data), _strip_item(item_to_clean=to_del)
if deep_cleaner :
data, to_del =[ii.lower() for ii in data], [jj.lower() for jj in to_del]
for index, item in enumerate(data):
while item in to_del :
del data[index]
if index==len(data)-1 :
break
return data
def _remove_str_word (char, word_to_remove, deep_remove=False):
"""
Small funnction to remove a word present on astring character
whatever the number of times it will repeated.
Parameters
----------
* char : str
may the the str phrases or sentences . main items.
* word_to_remove : str
specific word to remove.
* deep_remove : bool, optional
use the lower case to remove the word even the word is uppercased
of capitalized. The default is False.
Returns
-------
str
char , new_char without the removed word .
:Example:
>>> from csamtpy.utils import func_utils as func
>>> ch ='AMTAVG 7.76: "K1.fld", Dated 99-01-01,AMTAVG, Processed 11 Jul 17 AMTAVG'
>>> ss=func._remove_str_word(char=ch, word_to_remove='AMTAVG', deep_remove=False)
>>> print(ss)
"""
if type(char) is not str : char =str(char)
if type(word_to_remove) is not str : word_to_remove=str(word_to_remove)
if deep_remove == True :
word_to_remove, char =word_to_remove.lower(),char.lower()
if word_to_remove not in char :
return char
while word_to_remove in char :
if word_to_remove not in char :
break
index_wr = char.find(word_to_remove)
remain_len=index_wr+len(word_to_remove)
char=char[:index_wr]+char[remain_len:]
return char
[docs]def stn_check_split_type(data_lines):
"""
Read data_line and check for data line the presence of
split_type < ',' or ' ', or any other marks.>
Threshold is assume to be third of total data length.
:params data_lines: list of data to parse .
:type data_lines: list
:returns: The split _type
:rtype: str
:Example:
>>> from csamtpy.utils import func_utils as func
>>> path = os.path.join(os.environ["pyCSAMT"],
'csamtpy','data', K6.stn)
>>> with open (path, 'r', encoding='utf8') as f :
... data= f.readlines()
>>> print(func.stn_check_split_type(data_lines=data))
"""
split_type =[',', ':',' ',';' ]
data_to_read =[]
if isinstance(data_lines, np.ndarray): # change the data if data is not dtype string elements.
if data_lines.dtype in ['float', 'int', 'complex']: data_lines=data_lines.astype('<U12')
data_lines= data_lines.tolist()
if isinstance(data_lines, list):
for ii, item in enumerate(data_lines[:int(len(data_lines)/3)]):
data_to_read.append(item)
data_to_read=[''.join([str(item) for item in data_to_read])] # be sure the list is str item .
elif isinstance(data_lines, str): data_to_read=[str(data_lines)]
for jj, sep in enumerate(split_type) :
if data_to_read[0].find(sep) > 0 :
if data_to_read[0].count(sep) >= 2 * len(data_lines)/3:
if sep == ' ': return None # use None more conventional
else : return sep
[docs]def minimum_parser_to_write_edi (edilines, parser = '='):
"""
This fonction validates edifile for writing , string with egal.we assume that
dictionnary in list will be for definemeasurment E and H fied.
:param edilines: list of item to parse
:type edilines: list
:param parser: the egal is use to parser edifile .
can be changed, default is `=`
:type parser: str
"""
if isinstance(edilines,list):
if isinstance(edilines , tuple) : edilines =list(edilines)
else :raise TypeError('<Edilines> Must be on list')
for ii, lines in enumerate(edilines) :
if isinstance(lines, dict):continue
elif lines.find('=') <0 : raise 'None <"="> found on this item<{0}> of the edilines list. list can not'\
' be parsed.Please put egal between key and value '.format(edilines[ii])
return edilines
[docs]def round_dipole_length(value, round_value =5.):
"""
small function to graduate dipole length 5 to 5. Goes to be reality and
simple computation .
:param value: value of dipole length
:type value: float
:returns: value of dipole length rounded 5 to 5
:rtype: float
"""
mm = value % round_value
if mm < 3 :return np.around(value - mm)
elif mm >= 3 and mm < 7 :return np.around(value -mm +round_value)
else:return np.around(value - mm +10.)
# if __name__=="__main__" :
# parse_=parse_wellData(filename='shimenDH.csv', include_azimuth=True,utm_zone='49N')
# print("NameOflocation:\n",parse_[0])
# print("WellData:\n",parse_[1])
# print("GeoData:\n",parse_[2])
# print("Sample:\n",parse_[3])
# data =['Z.mwgt','Z.pwgt','Freq',' Tx.Amp','E.mag',' E.phz',
# ' B.mag',' B.phz',' Z.mag', ' Zphz ']
# data2=[' B.phz',' Z.mag',]
# # cleaner =[(''+ ii*'*') for ii in range(7)]
# print(_cross_eraser(data=data2, to_del=data))
# print(_cross_eraser(data=data, to_del=data2))
# # print(_strip_item(item_to_clean=data, item=' '))
# # # print(cleaner)
# ts ='50.0'
# ch ='AMTAVG 7.76: "K1.fld", Dated 99-01-01,AMTAVG, Processed 11 Jul 17 AMTAVG'
# ss=_remove_str_word(char=ts, word_to_remove='m', deep_remove=False)