# -*- coding: utf-8 -*-
# Created on Sun Sep 13 09:24:00 2020
# Author: Kouadio K.Laurent<etanoyau@gmail.com>
# Licence: LGPL
"""
.. _module-Func-utils::`pycsamt.utils.func_utils`
:synopsis: helpers functions
* check_dimensionality
* subprocess_module_installation
* cpath
* smart_format
* make_introspection
* show_quick_edi_stats
* sPath
* averageData
* concat_array_from_list
* sort_array_data
* transfer_array_ (deprecated)
* interpol_scipy
* _set_depth_to_coeff
* broke_array_to_
* _OlDFUNCNOUSEsearch_fill_data (deprecated)
* _search_ToFill_Data
* straighten_out_list
* take_firstValue_offDepth
* dump_comma
* build_wellData
* compute_azimuth
* build_geochemistry_sample
* _nonelist_checker
* _order_well
* intell_index
* _nonevalue_checker
* _clean_space
*_cross_eraser
* _remove_str_word
* stn_check_split_type
* minimum_parser_to_write_edi
* round_dipole_length
* keepmin
* get_closest_value
* geo_length_checker
* fr_en_parser
* convert_csvdata_from_fr_to_en
"""
import os
import sys
import subprocess
import shutil
import warnings
import inspect
import csv
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
import pycsamt.utils.gis_tools as gis
import pycsamt.utils.exceptions as CSex
from pycsamt.utils.decorator import deprecated
from pycsamt.utils._csamtpylog import csamtpylog
_logger = csamtpylog.get_csamtpy_logger(__name__)
_msg= ''.join([
'Note: need scipy version 0.14.0 or higher or interpolation,',
' might not work.']
)
_msg0 = ''.join([
'Could not find scipy.interpolate, cannot use method interpolate'
'check installation you can get scipy from scipy.org.']
)
try:
import scipy
scipy_version = [int(ss) for ss in scipy.__version__.split('.')]
if scipy_version[0] == 0:
if scipy_version[1] < 14:
warnings.warn(_msg, ImportWarning)
_logger.warning(_msg)
import scipy.interpolate as spi
interp_import = True
# pragma: no cover
except ImportError:
warnings.warn(_msg0)
_logger.warning(_msg0)
interp_import = False
[docs]def check_dimensionality(obj, data, z, x):
""" Check dimensionality of data and fix it.
:param obj: Object, can be a class logged or else.
:param data: 2D grid data of ndarray (z, x) dimensions
:param z: array-like should be reduced along the row axis
:param x: arraylike should be reduced along the columns axis.
"""
def reduce_shape(Xshape, x, axis_name =None):
""" Reduce shape to keep the same shape"""
mess ="`{0}` shape({1}) {2} than the data shape `{0}` = ({3})."
ox = len(x)
dsh = Xshape
if len(x) > Xshape :
x = x[: int (Xshape)]
obj._logging.debug(''.join([
f"Resize {axis_name!r}={ox!r} to {Xshape!r}.",
mess.format(axis_name, len(x),'more',Xshape)]))
elif len(x) < Xshape:
Xshape = len(x)
obj._logging.debug(''.join([
f"Resize {axis_name!r}={dsh!r} to {Xshape!r}.",
mess.format(axis_name, len(x),'less', Xshape)]))
return int(Xshape), x
sz0, z = reduce_shape(data.shape[0],
x=z, axis_name ='Z')
sx0, x =reduce_shape (data.shape[1],
x=x, axis_name ='X')
data = data [:sz0, :sx0]
return data , z, x
[docs]def subprocess_module_installation (module, upgrade =True , DEVNULL=False,
action=True, verbose =0, **subpkws):
""" Install or uninstall a module using the subprocess under the hood.
:param module: str, module name
:param upgrade:bool, install the lastest version.
:param verbose:output a message
:param DEVNULL: decline the stdoutput the message in the console
:param action: str, install or uninstall a module
:param subpkws: additional subprocess keywords arguments.
:Example:
>>> from pycsamt.utils.func_utils import subprocess_module_installation
>>> subprocess_module_installation(
'tqdm', action ='install', DEVNULL=True, verbose =1)
>>> subprocess_module_installation(
'tqdm', action ='uninstall', verbose =1)
"""
#implement pip as subprocess
# refer to https://pythongeeks.org/subprocess-in-python/
if not action:
if verbose > 0 :
print("---> No action `install`or `uninstall`"
f" of the module {module!r} performed.")
return action # DO NOTHING
MOD_IMP=False
action_msg ='uninstallation' if action =='uninstall' else 'installation'
if action in ('install', 'uninstall', True) and verbose > 0:
print(f'---> Module {module!r} {action_msg} will take a while,'
' please be patient...')
cmdg =f'<pip install {module}> | <python -m pip install {module}>'\
if action in (True, 'install') else ''.join([
f'<pip uninstall {module} -y> or <pip3 uninstall {module} -y ',
f'or <python -m pip uninstall {module} -y>.'])
upgrade ='--upgrade' if upgrade else ''
if action == 'uninstall':
upgrade= '-y' # Don't ask for confirmation of uninstall deletions.
elif action in ('install', True):
action = 'install'
cmd = ['-m', 'pip', f'{action}', f'{module}', f'{upgrade}']
try:
STDOUT = subprocess.DEVNULL if DEVNULL else None
STDERR= subprocess.STDOUT if DEVNULL else None
subprocess.check_call(
[sys.executable] + cmd, stdout= STDOUT, stderr=STDERR,
**subpkws)
if action in (True, 'install'):
# freeze the dependancies
reqs = subprocess.check_output(
[sys.executable,'-m', 'pip','freeze'])
[r.decode().split('==')[0] for r in reqs.split()]
_logger.info( f"{action_msg.capitalize()} of `{module}` "
"and dependancies was successfully done!")
MOD_IMP=True
except:
_logger.error(f"Failed to {action} the module =`{module}`.")
if verbose > 0 :
print(f'---> Module {module!r} {action_msg} failed. Please use'
f' the following command: {cmdg} to manually do it.')
else :
if verbose > 0:
print(f"{action_msg.capitalize()} of `{module}` "
"and dependancies was successfully done!")
return MOD_IMP
[docs]def make_introspection(Obj , subObj):
""" Make introspection by using the attributes of instance created to
populate the new classes created.
:param Obj: callable
New object to fully inherits of `subObject` attributes
:param subObj: Callable
Instance created.
"""
# make introspection and set the all attributes to self object.
# if Obj attribute has the same name with subObj attribute, then
# Obj attributes get the priority.
for key, value in subObj.__dict__.items():
if not hasattr(Obj, key) and key != ''.join(['__', str(key), '__']):
setattr(Obj, key, value)
[docs]def cpath (savepath=None , dpath= None):
""" Control the existing path and create one of it does not exist.
:param savepath: Pathlike obj, str
:param dpath: str, default pathlike obj
"""
if dpath is None:
file, _= os.path.splitext(os.path.basename(__file__))
dpath = ''.join(['_', file,
'_']) #.replace('.py', '')
if savepath is None :
savepath = os.path.join(os.getcwd(), dpath)
if savepath is not None:
try :
if not os.path.isdir(savepath):
os.mkdir(savepath)# mode =0o666)
except : pass
return savepath
[docs]def show_quick_edi_stats(nedic , nedir, fmtl='~', lenl=77):
""" Format the Edi files and ckeck the number of edifiles
successfully read.
:param nedic: number of input or collected edifiles
:param nedir: number of edifiles read sucessfully
:param fmt: str to format the stats line
:param lenl: length of line denileation."""
def get_obj_len (value):
""" Control if obj is iterable then take its length """
try :
iter(value)
except :pass
else : value =len(value)
return value
nedic = get_obj_len(nedic)
nedir = get_obj_len(nedir)
print(fmtl * lenl )
mesg ='|'.join( ['|{0:<15}{1:^2} {2:<7}',
'{3:<15}{4:^2} {5:<7}',
'{6:<9}{7:^2} {8:<7}%|'])
print(mesg.format('EDI collected','=', nedic, 'EDI success. read',
'=', nedir, 'Rate','=', round ((nedir/nedic) *100, 2),
2))
print(fmtl * lenl )
[docs]def sPath (name_of_path:str):
""" Savepath func. Create a path with `name_of_path`
if path not exists.
:param name_of_path: str, Path-like object. If path does not exist,
`name_of_path` should be created.
"""
try :
savepath = os.path.join(os.getcwd(), name_of_path)
if not os.path.isdir(savepath):
os.mkdir(name_of_path)# mode =0o666)
except :
warnings.warn("The path seems to be existed!")
return
return savepath
[docs]def averageData(np_array, filter_order=0,
axis_average=0, astype="float32"): #array_of_average_array=0
"""
Parameters
-----------
* np_array : numpy array
must be an array data
* filter_order : int
must be the index of the column you want to sort
* axis average : int
axis you want to see data averaged, also
it is the concatenate axis
default is axis=0
* astype*: str ,
is the ndarray dtype array .
change to have an outup arry dtype , you want .
Returns
--------
numpy array
Data averaged array
:Example :
>>> import numpy as np
>>> list8=[[4,2,0.1],[8,2,0.7],[10,1,0.18],[4,3,0.1],
... [7,2,1.2],[10,3,0.5],[10,1,0.5],[8.2,0,1.9],
... [10,7,0.5],[10,1,0.5],
... [2,0,1.4],[5,4,0.5],[10,2,0.7],[7,2,1.078],
... [10,2,3.5],[10,8,1.9]]
>>> np_test=np.array(list8)
>>> ss=averageData(np_array=np_test,filter_order=1,
... axis_average=0, astype="int")
>>> ss
"""
idx,sep_counts=0,0
global_list=[]
#Filter the array
np_array=np_array[np_array[:,filter_order].argsort(kind="mergesort")]
#returns the differents value on the filtersort index of array :
values, counts =np.unique(np_array[:,filter_order], return_counts=True)
# append array with numpy :(values.shape)
# # new_array=np.append(new_array,rowline,axis=0)
for rowline in np_array :
if rowline[filter_order] ==values.max():
new_array=np_array[sep_counts:,:]
temp_array=new_array.mean(axis=axis_average)
temp_array=temp_array.reshape((1,temp_array.shape[0]))
global_list.append(temp_array)
break
elif rowline[filter_order] !=np_array[idx+1,filter_order]:
new_array=np_array[sep_counts:idx+1,:]
temp_array=new_array.mean(axis=axis_average)
temp_array=temp_array.reshape((1,temp_array.shape[0]))
global_list.append(temp_array)
sep_counts=idx+1
idx=idx+1
np_out_put=concat_array_from_list(list_of_array=global_list,
concat_axis=axis_average)
np_out_put=np_out_put.astype(astype)
return np_out_put
[docs]def concat_array_from_list(list_of_array, concat_axis=0):
"""
Small function to concatenate a list with array contents
Parameters
-----------
* list_of_array : list
contains a list for array data. the concatenation is possible
if an index array have the same size
Returns
-------
array_like
numpy concatenated data
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> ass=np.random.randn(10)
>>> ass2=np.linspace(0,15,12)
>>> ass=ass.reshape((ass.shape[0],1))
>>> ass2=ass2.reshape((ass2.shape[0],1))
>>> or_list=[ass,ass2]
>>> ss_check_error=concat_array_from_list(list_of_array=or_list,
... concat_axis=0)
>>> secont test :
>>> ass=np.linspace(0,15,14)
>>> ass2=np.random.randn(14)
>>> ass=ass.reshape((ass.shape[0],1))
>>> ass2=ass2.reshape((ass2.shape[0],1))
>>> or_list=[ass,ass2]
>>> ss=concat_array_from_list(list_of_array=or_list, concat_axis=0)
>>> ss=concat_array_from_list(list_of_array=or_list, concat_axis=1)
>>> ss
>>> ss.shape
"""
#first attemp when the len of list is ==1 :
if len(list_of_array)==1:
if type(list_of_array[0])==np.ndarray:
output_array=list_of_array[0]
if output_array.ndim==1:
if concat_axis==0 :
output_array=output_array.reshape((1,output_array.shape[0]))
else :
output_array=output_array.reshape((output_array.shape[0],1))
return output_array
elif type(list_of_array[0])==list:
output_array=np.array(list_of_array[0])
if concat_axis==0 :
output_array=output_array.reshape((1,output_array.shape[0]))
else :
output_array=output_array.reshape((output_array.shape[0],1))
return output_array
# check the size of array in the liste when the len of list is >=2
for ii,elt in enumerate(list_of_array) :
if type(elt)==list:
elt=np.array(elt)
if elt is None :
pass
elif elt.ndim ==1 :
if concat_axis==0 :
elt=elt.reshape((1,elt.shape[0]))
else :
elt=elt.reshape((elt.shape[0],1))
list_of_array[ii]=elt
output_array=list_of_array[0]
for ii in list_of_array[1:]:
output_array=np.concatenate((output_array,ii), axis=concat_axis)
return output_array
[docs]def sort_array_data(data, sort_order =0,
concatenate=False, concat_axis_order=0 ):
"""
Function to sort array data and concatenate
numpy.ndarray
Parameters
----------
* data : numpy.ndarray
must be in simple array , list of array and
dictionary whom the value is numpy.ndarray
* sort_order : int, optional
index of colum to sort data. The default is 0.
* concatenate : Boolean , optional
concatenate all array in the object.
Must be the same dimentional if concatenate is set to True.
The *default* is False.
* concat_axis_order : int, optional
must the axis of concatenation . The default is axis=0.
Returns
-------
numpy.ndarray
data , Either the simple sort data or
array sorted and concatenated .
"""
if type(data)==list :
for ss, val in data :
val=val[val[:,sort_order].argsort(kind="mergesort")]
data[ss]=val
if concatenate:
data=concat_array_from_list(list_of_array=data,
concat_axis=concat_axis_order)
elif type(data)==dict:
for key, value in data.items():
value=value[value[:,sort_order].argsort(kind="mergesort")]
data[key]=value
if concatenate==True :
temp_list=list(data.values())
data=concat_array_from_list(list_of_array=temp_list,
concat_axis=concat_axis_order)
else :
data=data[data[:,sort_order].argsort(kind="mergesort")]
return data
[docs]@deprecated ("Function replaced to another {_search_ToFill_Data}")
def transfer_array_(data, index_key,start_value_depth, end_value_depth,
column_order_selection=0, axis=0):
"""
Parameters
----------
* data : dict
Dictionnary of numpy ndarray .
* index_key : float
key of the dictionnary .
Must be a number of the first column of offset .
* start_value_depth : float
If the depth is not reach must add depth of the closest point.
give the start value which match to the maxi depth of the data :
The *default* is -214.
* end_value_depth : float
Maximum depth of the survey. The default is -904.
* column_order_selection : int,
the index of depth column. The default is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
numpy.ndarray
return the array data we want to top to .
:Example:
>>> import numpy as np
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> print(sos2)
>>> sis1=sort_array_data(data=sos,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis2=sort_array_data(data=sos2,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> dico={"18.4":sis1,
... "21.4":sis2}
>>> test=transfer_array_(data=dico, index_key=11.4,
... start_value_depth=-14, end_value_depth=23,
... column_order_selection=1)
>>> print("sis1:",sis1)
>>> print("sis2:",sis2)
>>> print("Finaltest", test)
"""
start_value_depth=abs(start_value_depth)
end_value_depth=abs(end_value_depth)
comp, flag,iter_,translist_=0,-1,0,[]
# chef the depth colum if negative before enter in loop :
if type (data)==dict :
for key, value in data.items(): # scroll the dictionary
if float(key)> float(index_key): # check the key of dictionnary
#before using its value
maxi_depth=abs(value[:,column_order_selection]).max() # if yes
# calculate the max depth of the value :
comp=0
for rowline in value : # scroll the row of the array dict value
if abs(rowline[column_order_selection])>start_value_depth : # check its
# print(abs(rowline[column_order_selection]))
# value and compare it to section we must start extract (start)
if end_value_depth > maxi_depth:
transData_=value[comp:,:] # if yes transfer data :
# print(transData_)
translist_.append(transData_)
start_value_depth=abs(transData_[:,column_order_selection]).max()
# start_value_depth=maxi_depth
comp,iter_=0,iter_+1
flag=1
elif end_value_depth<=maxi_depth:
indix =abs(value[:,column_order_selection]).tolist().index(end_value_depth)
if iter_==0 :
transData_=value[comp:indix,::]
flag=0
break
else :
transData_=value[comp:indix,::]
translist_.append(transData_)
flag=1
if start_value_depth >= end_value_depth:
flag=1
break
else :
comp +=1
if flag ==0 :
return transData_
if flag==1 :
trans_array=concat_array_from_list(list_of_array=translist_, concat_axis=axis)
return trans_array
if type(data)==list: # create a dictionnary of array value
dico={}
for ss, value in enumerate( data) :
dico["{0}".format(value[0,0])]=value
# call recursive function
transfer_array_(data=dico,index_key=index_key,start_value_depth=start_value_depth,
end_value_depth=end_value_depth,column_order_selection=column_order_selection, axis=0)
[docs]def interpol_scipy (x_value, y_value,x_new,
kind="linear",plot=False, fill="extrapolate"):
"""
function to interpolate data
Parameters
------------
* x_value : np.ndarray
value on array data : original absciss
* y_value : np.ndarray
value on array data : original coordinates (slope)
* x_new : np.ndarray
new value of absciss you want to interpolate data
* kind : str
projection kind :
maybe : "linear", "cubic"
* fill : str
kind of extraolation, if None , *spi will use constraint interpolation
can be "extrapolate" to fill_value.
* plot : Boolean
Set to True to see a wiewer graph
Returns
--------
np.ndarray
y_new ,new function interplolate values .
:Example:
>>> import numpy as np
>>> fill="extrapolate"
>>> x=np.linspace(0,15,10)
>>> y=np.random.randn(10)
>>> x_=np.linspace(0,20,15)
>>> ss=interpol_Scipy(x_value=x, y_value=y, x_new=x_, kind="linear")
>>> ss
"""
func_=spi.interp1d(x_value, y_value, kind=kind,fill_value=fill)
y_new=func_(x_new)
if plot :
plt.plot(x_value, y_value,"o",x_new, y_new,"--")
plt.legend(["data", "linear","cubic"],loc="best")
plt.show()
return y_new
def _set_depth_to_coeff(data, depth_column_index,coeff=1, depth_axis=0):
"""
Parameters
----------
* data : np.ndarray
must be on array channel .
* depth_column_index : int
index of depth_column.
* depth_axis : int, optional
Precise kind of orientation of depth data(axis =0 or axis=1)
The *default* is 0.
* coeff : float,
the value you want to multiplie depth.
set depth to negative multiply by one.
The *default* is -1.
Returns
-------
data : np.ndarray
new data after set depth according to it value.
:Example:
>>> import numpy as np
>>> np.random.seed(4)
>>> data=np.random.rand(4,3)
>>> data=data*(-1)
>>> print("data\n",data)
>>> data[:,1]=data[:,1]*(-1)
>>> data[data<0]
>>> print("data2\n",data)
"""
if depth_axis==0:
data[:,depth_column_index]=data[:,depth_column_index]*coeff
if depth_axis==1:
data[depth_column_index,:]=data[depth_column_index,:]*coeff
return data
[docs]def broke_array_to_(arrayData, keyIndex=0, broken_type="dict"):
"""
broke data array into different value with their same key
Parameters
----------
* arrayData :np.array
data array .
* keyIndex : int
index of column to create dict key
Returns
-------
dict
dico_brok ,dictionnary of array.
"""
# find the max_counts
vcounts_temp,counts_temp=np.unique(arrayData[:,keyIndex], return_counts=True)
vcounts_temp_max=vcounts_temp.max()
# print(vcounts_temp)
# print(vcounts_temp_max)
# print(vcounts_temp.min())
dico_brok={}
lis_brok=[]
index=0
deb=0
for rowlines in arrayData :
if rowlines[0] == vcounts_temp_max:
value=arrayData[index:,::]
if broken_type=="dict":
dico_brok["{0}".format(rowlines[0])]=value
break
elif broken_type=="list":
lis_brok.append(value)
break
elif rowlines[0] !=arrayData[index+1,keyIndex]:
value=arrayData[deb:index+1,::]
if broken_type=="dict":
dico_brok["{0}".format(rowlines[0])]=value
elif broken_type=="list":
lis_brok.append(value)
deb=index+1
index=index+1
if broken_type=="dict":
return dico_brok
elif broken_type=="list":
return lis_brok
@deprecated('this function is replaced by [_search_ToFill_Data] ')
def _OlDFUNCNOUSEsearch_fill_data(dicoReal, arrayTemp ,
max_value, index_of_depth,axis=0):
"""
Deprecated function , very expensive.
Parameters
----------
* data : dict
Dictionnary of numpy ndarray .
* dataReal : dict
dictionnary . must be a dictionnary of real of offset .
* arrayTemp : np.ndarray
must be a numpy array of reserve data , the one , we want to
extract the depth data to fill array
* max_value : float
Maximum depth of the survey.
* index_of_depth : int,
the index of depth column. The *default* is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
array_like
the array data we want to top to .
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> sos3=abs(np.randon.rand(8,3)*12.4)
>>> # print(sos2)
>>> sis1=sort_array_data(data=sos,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis2=sort_array_data(data=sos2,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> sis3=sort_array_data(data=sos3,sort_order =1,
... concatenate=False, concat_axis_order=0)
>>> dico={"18.4":sis1,
... "21.4":sis2}
>>> test=_search_fill_data(dicoReal=dico, index_key=11.4,
... start_value=10, max_value=23,
... index_of_depth=1)
>>> print("sis1\n:",sis1)
>>> print("sis2\n:",sis2)
>>> print("Finaltest\n", test)
"""
# arange a dictionany :
#from keys : for k in sorted(dico.keys()):
# print(%s: %s",%(k,names[k]))
# from values : for k , v in sorted(dico.items(),key=lambda x:x[1]):
# print(%s: %s",%(k,v))
_all_checker,keyToSkip=[],[]
litemp= broke_array_to_(arrayData=arrayTemp,keyIndex=0,
broken_type="list")
itemp,real,realTem=[],[],[]
for ii in litemp:
temp=sort_array_data(data=ii,sort_order=1,
concatenate=False)
itemp.append(temp)
# print(itemp)
for key , value in sorted(dicoReal.items()):
# print(sorted(dicoReal.items()))
# for ii, rowline in enumerate(value) :
rowmax=value[:,index_of_depth].max()
real.append((float(key),rowmax))
_all_checker.append(rowmax)
#chek if all elements are reach the depth max
# if all(_all_checker)==True : # if one of the depth is the same
# return dicoReal
realTem=real.copy()
for ii , value in enumerate(realTem):
if value[1] == max_value:
del realTem[ii]
#real.pop(ii)
if realTem ==[]:
return dicoReal
print("Real : \n",real)
idx,flag=0,0
comp,sp=0,-1
fin_list,endList=[],[]
while idx < len(realTem):
indexKey=realTem[idx][0] #11,4
maxKey=realTem[idx][1] #214
# if real[idx][1]==max_value : # case where thedepth value of realdico is get
# flag=3
# # elif real[idx][1]==max_value : # case where thedepth value of realdico is get
# # flag=3
# else :
if idx==len(realTem)-1:
indic=real.index(realTem[idx])
# check whether there is data after a delete the offset with depth value reach
if realTem[idx][0]==real[-1][0]:
nexIndex=itemp[-1][0][0]
else :
nexIndex=real[indic+1][0]
else :
nexIndex=realTem[idx+1][0]#61.4
# loop the reserve list :
for ii, array in enumerate (itemp): #itemp is the list of reserve broken list
# if array[0][0]> indexKey and array[0][0]> nexIndex:
# keyToSkip.append(realTem[idx])
# continue
if array[0][0]> indexKey and array[0][0]<nexIndex : #check offset and next offset , if True :
for index, rowline in enumerate(array) : # loop the array now
# tem_depth=rowline[1] # take the value of depth of reserve array for the first row
if rowline[1] > maxKey :# maxKey=214, rowline[1]= :# (max_value=904) if True :
sp=sp+1
if sp==0 :
add_array=array[index:,:]
# num=index
# print("True:\n",add_array)
flag=4
else :
sp=-1
pass
# print(maxtem)
if flag==4:
# maxtem=array[-1][1] # take the maximum depth of the reserve array , last row
if array[-1][1] < max_value : # if the maximum depth not reach
comp=comp+1
if array[0][0]<nexIndex :
endList.append(add_array)
else :
fin_list.append(add_array) # keep the array in temporary list
# maxKey = maxtem # set up new maximum depth
# flag=1
maxKey = array[-1][1]
# print("flag4,comp>1")
# if maxKey <
elif array[-1][1] ==max_value : # the maximum depth is reached
flag=5
if flag==5 :
if comp==0 : # first check is ok
# add_array=array[num:,:] # cut the array
endList.append(add_array)
flag=0
# print("flag5,comp=0")
else :
add_array=array[index:,:] #
fin_list.append(add_array) # list to create one ar
flag=1
# if flag==3 : # in that case is true , save the value of the offset
# # for key , value in dicoReal.items() :
# # if float(key) == real[idx][0]:
# # endList.append(value)
# keyToSkip.append(array[0][0])
if flag==1 :
arT=concat_array_from_list(list_of_array=fin_list,concat_axis=axis)
endList.append(arT)
if flag==0 :
endList=endList
idx=idx+1
print("keyToSkip\n:",keyToSkip)
#delete the the offset which are full depth on the list
# print(keyToSkip)
# print(real,"\n")
for ii , value in enumerate(realTem):
if value[0] in keyToSkip:
del realTem[ii]
#real.pop(ii)
# now we are the list of recoverd depth
# build dictionnary
print("Realtem\n",realTem,)
print("endlist : \n",endList)
# print(keyToSkip)
for ii , tuple_ in enumerate (realTem) : #take the realkey from dico
realKey, maxValue=tuple_
print(realKey,maxValue)
add_array=endList[ii] # take the add_value generated
# print(endList)
for key , value in dicoReal.items(): # search in dictionnary the key
if float(key)==realKey: # and compare it to the key from tuple ...
#... just to have certitude then concatenate
val=np.concatenate((value,add_array),axis=0)
dicoReal[key]=val
# print(ii)
return dicoReal
def _search_ToFill_Data (dicoReal, arrayTemp ,
max_value, index_of_depth,axis=0):
"""
Parameters
------------
* data : dict
Dictionnary of numpy ndarray .
* dicoReal : dict
dictionnary . must be a dictionnary of real of offset .
* arrayTemp : np.ndarray
must be a numpy array of reserve data , the one , we want to
extract the depth data to fill array
* max_value : float
Maximum depth of the survey.
* index_of_depth : int,
the index of depth column. The default is 0.
* axis : int , optional
numpy.ndarray axis . The default is 0.
Returns
-------
dict
dictionnary of offsets filled the array data we want to top to
:Example:
>>> import numpy as np
>>> np.random.seed(0)
>>> sos=abs(np.random.randn(4,3)*4)
>>> sos2=abs(np.random.randn(4,3)*10.8)
>>> sos3=abs(np.random.randn(5,3)*10.8)
>>> temp3=abs(np.random.rand(4,3)*12.4)
>>> temp2=abs(np.random.rand(4,3)*15.4)
>>> temp1=abs(np.random.rand(4,3)*9.4)
>>> temp4=abs(np.random.rand(5,3)*9.9)
>>> dico,temp={},[]
>>> ff=[sos,sos2,sos3,temp1,temp2,temp3,temp4]
>>> fin=[sort_array_data(data=ii,sort_order =1,
... concatenate=False, concat_axis_order=0) for ii in ff ]
>>> key=[11.9,61.4,102.7]
>>> vat=[214,405,904]
>>> for ii in range(3):
... fin[ii][:,0]=key[ii]
... fin[ii][-1][1]=vat[ii]
>>> tempi=[(19.4,[11,18,50,120]),(28.4,[12,17,403,904]),
... (78.3,[11,8,202,804]),(124.4,[203,403,604,714,904])]
>>> for ss, val in enumerate(tempi) :
... fin[ss+3][:,0]=val[0]
... fin[ss+3][:,1]=np.array(val[1])
>>> for ss, van in enumerate (fin):
... if ss<=3 :
... dico[van[0][0]]=van
... if ss>3 :
... temp.append(van)
>>> arrayTemp=concat_array_from_list(list_of_array=temp,
... concat_axis=0)
>>> sis02=_search_ToFill_Data(dicoReal=dico, arrayTemp=arrayTemp ,
... max_value=904, index_of_depth=1,axis=0)
>>> print(sis02)
"""
#Notes :
# arange a dictionany :
#from keys : for k in sorted(dico.keys()):
# print(%s: %s",%(k,names[k]))
# from values : for k , v in sorted(dico.items(),key=lambda x:x[1]):
# print(%s: %s",%(k,v)
# for ii , value in enumerate(realTem):
# if value[0] in keyToSkip:
# del realTem[ii]
# #real.pop(ii)
#------------------------------------
# _all_checker,keyToSkip=[],[]
litemp= broke_array_to_(arrayData=arrayTemp,keyIndex=0,
broken_type="list")
itemp,real=[],[],
# flag=-1
for ii in litemp:
temp=sort_array_data(data=ii,sort_order=1,
concatenate=False)
itemp.append(temp)
# print(itemp)
for key , value in sorted(dicoReal.items()):
# print(sorted(dicoReal.items()))
# for ii, rowline in enumerate(value) :
rowmax=value[:,index_of_depth].max()
real.append((float(key),rowmax))
# _all_checker.append(rowmax)
# print(itemp)
iter_=-1
for ss,(offs, maxDepth) in enumerate( real):
# [(11.4,214),(19.4,120),(61.4,405),(102,904)]
# print(offs)
if maxDepth < max_value : # max_value=904 , maxDepth=214
if ss == len(real)-1:
for num, arTem in enumerate(itemp):
kk=arTem[0][0]
if kk > offs :
nextIndex=kk
else :
break
# nextIndex=itemp[-1][0][0]
else :
nextIndex=real[ss+1][0]
for ii, array in enumerate(itemp) : # [array(28),array(78), ....]
if offs < array[0][0] and array[0][0] < nextIndex:
for index, rowline in enumerate(array) : # scroll array(28)
if rowline[1]> maxDepth : # if rowline > the maxDepth=214
iter_=iter_+1 # first iteration on array (28)
if iter_==0 : # collect the remain info and add to dico
add_array=array[index:,:]
maxDepth=array[:,1].max() # caluclate nnex
# print(add_array)
else :
pass
if iter_==0 and maxDepth<=max_value :
for key, value in dicoReal.items() :
if float(key) ==offs :
new_value=concat_array_from_list(list_of_array=[value,add_array],
concat_axis=axis)
dicoReal[key]=new_value
iter_=-1
# break
else :
pass
return dicoReal
[docs]def straighten_out_list (main_list , list_to_straigh):
"""
Parameters
----------
* main_list : list
list of which the data must absolutely appear into
the straighen list.in our case , it is the station
list : a list of offset
* list_to_straigh : list
list contain the data (offset calculated,
the depth and the resistivity (log10)),
Returns
-------
* list
the straighen list.
some offset have been replaced by the offsets which are not in the
main_list whithout change the lengh of the straighen list.
:Example:
>>> import numpy as np
>>> np.random.seed(14)
>>> ss=np.random.randn(10)*12
>>> ss=ss.tolist()
>>> ss=[round(float(jj),4) for jj in ss]
>>> ss.sort()
>>> red=np.random.randn(7)*12
>>> red=red.tolist()
>>> test=[19, 15.012, 5.5821, 0.7234,3.1,
... 0.7919, 3.445, 4.7398, 5.1, 10.8, 15.51,21]
>>> main=[20., 0.7234, 5, 3.445, 15.51,10.7, 3,5.1]
>>> test.sort()
>>> main.sort()
>>> red=[round(float(ss),1) for ss in red]
>>> print(test)
>>> print(main)
>>> sos=straighten_out_list (main_list=main ,
... list_to_straigh=test)
>>> print("sos:\n",sos)
"""
staIter=np.array(list_to_straigh)# ARRAY
max_X=staIter.max()
valuesIter,counts=np.unique(staIter, return_counts=True)
valuesIter=valuesIter.tolist() # List sorted of straighten value
# print(valuesIter)
misoffs,vamin,tempi=[],[],[]
# min_,flag=1,0
for sta in main_list: # keep the offset not in list to straight
if sta not in valuesIter :
misoffs.append(sta)
if misoffs ==[]:
return list_to_straigh
# inject miss offset value in the temporary list and sorted
#this to choose the closet value we want to substitude the missoffset value
tempi=[ii for ii in valuesIter]
for ii in misoffs :
tempi.append(ii)
tempi=deepcopy(tempi)
tempi.sort(reverse =False)
# print("mainlist:\n",main_list)
# print("tempiSorted:\n",tempi)
# print("missoff:\n",misoffs)
# print("lisofStraight_max:\n",max_X)
# # print(len(misoffs))
# sp=-1
for jj, off in enumerate(tempi): #scroll the create temporary list
for ss, ofi in enumerate(misoffs): # scroll the miss offset list and compare it
# to the temportary list
if off==ofi:
# print(off,ss)
indexof =main_list.index(off) # find the index of "off" value in the mainlist
# in order to calculate the distance between the value to its next value
# or forward value ex :off=3 , backward =0.724 , forward =3.445
# if main_list[indexof]==main_list[-1]: # that's mean the "off" value reaches the end of mainlist
# if tempi[-1]>main_list[indexof]: #
# deltaMain=main_list[indexof]-tempi[-1] # take the diff between
# vamin.append(tempi[-1])
# else :
# deltaMain=main_list[indexof]-tempi[-2]
# else :
# deltaMain=main_list[indexof]-main_list[indexof+1]
# if main_list[indexof+1] != tempi[jj+1] :
if off==tempi[-1]:
# deltaOffneg=abs(off-tempi[jj-1])
vamin.append(tempi[jj-1])
# delta=deltaOffneg
elif off==tempi[0]:
deltaOffpos=abs(off-tempi[jj+1])
vamin.append(tempi[jj+1])
# delta=deltaOffpos
else :
deltaOffpos=abs(off-tempi[jj+1])
deltaOffneg=abs(off-tempi[jj-1])
if deltaOffpos >= deltaOffneg :
# delta=deltaOffneg
vamin.append(tempi[jj-1])
# index=jj-1
else :
# delta=deltaOffpos
# index=jj+1
vamin.append(tempi[jj+1])
# print("vamin:\n",vamin)
# print(len(vamin))
for ii, val in enumerate(list_to_straigh):
for ss, of in enumerate(vamin):
if val==of :
list_to_straigh[ii]=misoffs[ss]
# if list_to_straigh[-1] !=max_X :
return list_to_straigh
[docs]def take_firstValue_offDepth(data_array,
filter_order=1):
"""
Parameters
----------
* data_array : np.array
array of the data .
* filter_order : int , optional
the column you want to filter. The default is 1.
Returns
-------
array_like
return array of the data filtered.
:Example:
>>> import numpy as np
>>> list8=[[4,2,0.1],[8,2,0.7],[10,1,0.18],[4,3,0.1],
... [7,2,1.2],[10,3,0.5],[10,1,0.5],[8.2,0,1.9],
... [10,7,0.5],[10,1,0.5],[2,0,1.4],[5,4,0.5],
... [10,2,0.7],[7,2,1.078],[10,2,3.5],[10,8,1.9]]
>>> test=np.array(list8)
>>> print(np_test)
>>> ss=take_firstValue_offDepth(data_array =np_test, filter_order=1)
>>> ss=averageData(np_array=np_test,filter_order=1,
>>> axis_average=0, astype="int")
>>> print(ss)
"""
listofArray=[]#data_array[0,:]]
data_array=data_array[data_array[:,filter_order].argsort(kind="mergesort")]
# print(data_array,"\n:")
# np_array=np_array[np_array[:,filter_order].argsort(kind="mergesort")]
#returns the differents value on the filtersort index of array :
values, counts =np.unique(data_array[:,filter_order], return_counts=True)
for ii, rowline in enumerate(data_array ):
if rowline[filter_order]==values[-1]:
listofArray.append(data_array[ii])
break
elif rowline[filter_order] !=data_array[ii-1][filter_order]:
listofArray.append(data_array[ii])
array =concat_array_from_list(list_of_array=listofArray, concat_axis=0)
array=array[array[:,filter_order].argsort(kind="mergesort")]
listofArray=[]
return array
[docs]def dump_comma(input_car, max_value=2, carType='mixed'):
"""
Parameters
----------
* input_car : str,
Input character.
* max_value : int, optional
The default is 2.
* carType: str
Type of character , you want to entry
Returns
-------
Tuple of input character
must be return tuple of float value, or string value
.. note:: carType may be as arguments parameters like ['value','val',"numeric",
"num", "num","float","int"] or for pure character like
["car","character","ch","char","str", "mix", "mixed","merge","mer",
"both","num&val","val&num&"]
if not , can not possible to convert to float or integer.
the *defaut* is mixed
:Example:
>>> import numpy as np
>>> ss=dump_comma(input_car=",car,box", max_value=3,
... carType="str")
>>> print(ss)
... ('0', 'car', 'box')
"""
# dump "," at the end of
flag=0
if input_car[-1]==",":
input_car=input_car[:-1]
if input_car[0]==",":
input_car="0,"+ input_car[1:]
if carType.lower() in ['value','val',"numeric",
"num", "num","float","int"]:
input_car=eval(input_car)
elif carType.lower() in ["car","character","ch","char","str",
"mix", "mixed","merge","mer",
"both","num&val","val&num&"]:
input_car=input_car.strip(",").split(",")
flag=1
# elif carType.lower() in ["mix", "mixed","merge","mer",
# "both","num&val","val&num&"]:
# input_car=input_car.split(",")
if np.iterable(input_car)==False :
inputlist=[input_car,0]
# input_car=tuple(inputlist)
elif np.iterable(input_car) is True :
# if flag==1 :
# inputlist=input_car
# # print(inputlist)
# else :
inputlist=list(input_car)
# print("false")
input_car=inputlist[:max_value]
# print(input_car)
if flag==1 :
if len(inputlist)==1 :
return(inputlist[0])
return tuple(input_car)
[docs]def build_wellData (add_azimuth=False, utm_zone="49N",
report_path=None,add_geochemistry_sample=False):
"""
Parameters
----------
* add_azimuth : Bool, optional
compute azimuth if add_azimut is set to True.
The default is False.
* utm_zone : Str, optional
WGS84 utm_projection. set your zone if
add_azimuth is turn to True.
The default is "49N".
* report_path : str, optional
path to save your _well_report. The default is None.
its match the current work directory
* add_geochemistry_sample: bool
add_sample_data.Set to True if you want to add_mannually
Geochimistry data. default is False.
Raises
------
Exception
manage the dimentionaly of ndarrays .
OSError
when report_path is not found in your O.S.
Returns
-------
str
name of location of well .
np.ndarray
WellData , data of build Wells .
np.ndarray
GeolData , data of build geology.
:Example:
>>> import numpy as np
>>> import os, shutil
>>> import warnings,
>>> form _utils.avgpylog import AvgPyLog
>>> well=build_wellData (add_azimuth=True, utm_zone="49N")
>>> print("nameof locations\n:",well[0])
>>> print("CollarData\n:",well[1])
>>> print("GeolData\n:", well[2])
... nameof locations
... Shimen
... CollarData
... [['S01' '477205.6935' '2830978.218' '987.25' '-90' '0.0' 'Shi01'
... 'Wdanxl0']
... ['S18' '477915.4355' '2830555.927' '974.4' '-90' '2.111' 'Shi18'
... 'Wdanxl0']]
... GeolData
... [['S01' '0.0' '240.2' 'granite']
... ['S01' '240.2' '256.4' ' basalte']
... ['S01' '256.4' '580.0' ' granite']
... ['S01' '580.0' '987.25' 'rock']
... ['S18' '0.0' '110.3' 'sand']
... ['S18' '110.3' '520.2' 'agrilite']
... ['S18' '520.2' '631.3' ' granite']
... ['S18' '631.3' '974.4' ' rock']]
... Shimen_wellReports_
"""
reg_lines=[]
wellSites,ftgeo,hole_list,Geolist=[],[],[],[]
text=["Enter the name of Location:",
"well_name :",
"Coordinates (Easting, Northing)_UTM_{0} : ".format(utm_zone),
"Hole Buttom and dip values (Bottom, dip):" ,
"Layers-thickness levels in (meters):",
"Geology-layers or <stratigraphy> names (Top_To_Buttom):",
"{0:-^70}".format(' Report '),
"DH_Hole,DH_Easting, DH_Northing, DH_Buttom,"\
" DH_Dip,DH_Azimuth, DH_PlanDepth,DH_Descrip",
"GeolData :",
"WellData:",
"DH_Hole, DH_From, DH_To, Rock",
"SampleData",
"DH_Hole, DH_From,DH_To, Sample",
"{0:-^70}".format(' InputData '),
]
name_of_location =input("Enter the name of Location:")
reg_lines.append(''.join(text[0]+'{0:>18}'.format(name_of_location)+'\n'))
reg_lines.append('\n')
reg_lines.append(text[13]+'\n')
comp=-1
while 1 :
DH_Hole=input("Enter the well_name :")
if DH_Hole=="" or DH_Hole=="end":
Geol=concat_array_from_list(list_of_array=Geolist,
concat_axis=0)
break
print("Enter the coordinates (Easting, Northing) : ", end="")
DH_East_North=input()
DH_East_North=dump_comma(input_car=DH_East_North,
max_value=2, carType='value')
print("Enter the Hole Bottom value and dip (Bottom, dip):", end='')
dh_botdip=input()
dh_botdip=dump_comma(input_car=dh_botdip,
max_value=2, carType='value')
#check the dip of the well
if float(dh_botdip[1])==0.:
dh_botdip[1]=(90.)
elif float(dh_botdip[0])==0.:
raise Exception(
"The curent bottom has a value 0.0 . "
"Must put the bottom of the well as deep as possible !"
)
hole_list.append(DH_Hole)
wellSites.append((DH_Hole, DH_East_North[0],DH_East_North[1],
dh_botdip[0],dh_botdip[1] ))
#DH_Hole (ID) DH_East DH_North DH_RH DH_Dip DH_Azimuth DH_Top DH_Bottom DH_PlanDepth DH_Decr Mask
reg_lines.append("".join(text[1]+'{0:>7}'.format(DH_Hole))+"\n")
reg_lines.append("".join(text[2])+"\n")
reg_lines.append(",".join(['{0:>14}'.format(str(ii))
for ii in list(DH_East_North)])+"\n")
reg_lines.append("".join(text[3])+"\n")
reg_lines.append(",".join(['{0:>7}'.format(str(ii))
for ii in list(dh_botdip)])+"\n")
comp+=-1
while DH_Hole :
# print("Enter the layer thickness (From_, _To, geology):",end='')
if comp==-1 :
Geol=concat_array_from_list(list_of_array=ftgeo,
concat_axis=0)
ftgeo=[] # initialize temporary list
break
# comp=comp+1
print("Enter the layers-thickness levels in (meters):", end='')
dh_from_in=input()
if dh_from_in=="" or dh_from_in=="end":
break
dh_from=eval(dh_from_in)
dh_from_ar=np.array(dh_from)
dh_from_ar=dh_from_ar.reshape((dh_from_ar.shape[0],1))
# check the last input bottom :
if dh_from_ar[-1] >= float(dh_botdip[0]):
_logger.info("The input bottom of well {0}, is {1}. "
"It's less last layer thickess: {2}."
"we add maximum bottom at 1.023km depth.".format(
DH_Hole,dh_botdip[0],dh_from_ar[-1]))
dh_botdip[0]=(1023.)
wellSites[-1][3]=dh_botdip[0] # last append of wellSites
#find Dh_to through give dh_from
dh_to=dh_from_ar[1:]
dh_to=np.append(dh_to,dh_botdip[0])
dh_to=dh_to.reshape((dh_to.shape[0],1))
print("Enter the geology-layers names (From _To):",end="")
rock=input()
rock=rock.strip(",").split(",") # strip in the case where ","appear at the end
rock_ar=np.array(rock)
rock_ar=rock_ar.reshape((rock_ar.shape[0],1))
try :
if rock_ar.shape[0]==dh_from_ar.shape[0]:
drill_names=np.full((rock_ar.shape[0],1),DH_Hole)
fromtogeo=np.concatenate((drill_names,dh_from_ar,
dh_to, rock_ar),axis=1)
except IndexError:
_logger.warn("np.ndarry sizeError:Check 'geologie', 'Dh_From', and "
"'Dh_To' arrays size properly. It seems one size is "
" too longeR than another. ")
warnings.warn(" All the arrays size must match propertly!")
ftgeo.append(fromtogeo)
comp=-1
reg_lines.append("".join(text[4])+"\n")
reg_lines.append(",".join(['{0:>7}'.format(str(jj))
for jj in list(dh_from)])+"\n")
reg_lines.append("".join(text[5])+"\n")
reg_lines.append(",".join(['{0:>12}'.format(jj)
for jj in rock])+"\n") # rock already in list
# reg_lines.append("".join(rock+"\n"))
reg_lines.append("\n")
Geolist.append(Geol)
name_of_location=name_of_location.capitalize()
#set on numpy array
for ii , value in enumerate(wellSites):
value=np.array(list(value))
wellSites[ii]=value
#create a wellsites array
wellSites=concat_array_from_list(wellSites,concat_axis=0)
DH_Hole=wellSites[:,0]
DH_PlanDepth=np.zeros((wellSites.shape[0],),dtype="<U8")
DH_Decr=np.full((wellSites.shape[0],),"Wdanxl0",dtype="<U9")
lenloc=len(name_of_location)
for ii , row in enumerate(DH_Hole):
# in order to keep all the well name location
DH_PlanDepth[ii]=np.array(name_of_location[:-int(lenloc/2)]+row[-2:])
Geol[:,1]=np.array([np.float(ii) for ii in Geol[:,1]])
Geol[:,2]=np.array([np.float(ii) for ii in Geol[:,2]])
if add_azimuth==False:
DH_Azimuth=np.full((wellSites.shape[0]),0)
elif add_azimuth == True :
DH_Azimuth=compute_azimuth(easting = np.array(
[np.float(ii) for ii in wellSites[:,1]]),
northing =np.array(
[np.float(ii) for ii in wellSites[:,2]]),
utm_zone=utm_zone)
DH_Azimuth=DH_Azimuth.reshape((DH_Azimuth.shape[0],1))
WellData=np.concatenate((wellSites,DH_Azimuth,
DH_PlanDepth.reshape((DH_PlanDepth.shape[0],1)),
DH_Decr.reshape((DH_Decr.shape[0],1))), axis=1)
GeolData=Geol.copy()
#-----write Report---
reg_lines.append(text[6]+'\n')
# reg_lines.append(text[7]+"\n")
reg_lines.append(text[9]+'\n')
reg_lines.append("".join(['{0:>12}'.format(ss)
for ss in text[7].split(",")]) +'\n')
for rowline in WellData :
reg_lines.append(''.join(["{0:>12}".format(ss)
for ss in rowline.tolist()])+"\n")
reg_lines.append(text[8]+"\n")
reg_lines.append("".join(['{0:>12}'.format(ss)
for ss in text[10].split(",")]) +'\n')
for ii , row in enumerate(GeolData):
reg_lines.append(''.join(["{0:>12}".format(ss)
for ss in row.tolist()])+"\n")
if add_geochemistry_sample==True:
SampleData=build_geochemistry_sample()
reg_lines.append(text[11]+'\n')
reg_lines.append("".join(['{0:>12}'.format(ss)
for ss in text[12].split(",")]) +'\n')
for ii , row in enumerate(SampleData):
reg_lines.append(''.join(["{0:>12}".format(ss)
for ss in row.tolist()])+"\n")
else :
SampleData=None
with open("{0}_wellReport_".format(name_of_location),"w") as fid:
# for ii in reg_lines :
fid.writelines(reg_lines)
fid.close()
#---end write report---
if report_path is None:
report_path=os.getcwd()
elif report_path is not None :
if os.path.exists(report_path):
shutil.move((os.path.join(os.getcwd(),"{0}_wellReport_".\
format(name_of_location))),report_path)
else :
raise OSError (
"The path does not exit.Try to put the right path")
warnings.warn (
"ignore","the report_path doesn't match properly.Try to fix it !")
return (name_of_location, WellData , GeolData, SampleData)
[docs]def compute_azimuth(easting, northing, utm_zone="49N", extrapolate=False):
"""
Parameters
----------
* easting : np.ndarray
Easting value of coordinates _UTM_WGS84
* northing : np.ndarray
Northing value of coordinates._UTM_WGS84
* utm_zone : str, optional
the utm_zone . if None try to get is through
gis.get_utm_zone(latitude, longitude).
latitude and longitude must be on degree decimals.
The default is "49N".
* extrapolate : bool ,
for other purpose , user can extrapolate azimuth value,
in order to get the sizesize as
the easting and northing size. The the value will
repositionate at each point data were collected.
Default is False as originally azimuth computation .
Returns
-------
np.ndarray
azimuth.
:Example:
>>> import numpy as np
>>> import gis_tools as gis
>>> easting=[477205.6935,477261.7258,477336.4355,477373.7903,477448.5,
... 477532.5484,477588.5806,477616.5968]
>>> northing=[2830978.218, 2830944.879,2830900.427, 2830878.202,2830833.75,
... 2830783.742,2830750.403,2830733.734]
>>> test=compute_azimuth(easting=np.array(easting),
... northing=np.array(northing), utm_zone="49N")
>>> print(test)
"""
#---**** method to compute azimuth****----
reference_ellipsoid=23
lat,long=gis.utm_to_ll(reference_ellipsoid=reference_ellipsoid,
northing=northing, easting=easting, zone=utm_zone)
#i, idx, ic_=0,0,pi/180
azimuth=0
i,ic_=0,np.pi /180
while i < lat.shape[0]:
xl=np.cos(lat[i]*ic_)*np.sin(lat[i+1]*ic_) - np.sin(lat[i]*ic_)\
*np.cos(lat[i+1]*ic_)*np.cos((long[i+1]-long[i])*ic_)
yl=np.sin((long[i+1]-long[i])*ic_)*np.cos(lat[i+1])
azim=np.arctan2(yl,xl)
azimuth=np.append(azimuth, azim)
i=i+1
if i==lat.shape[0]-1 :
# azimuth.append(0)
break
if extrapolate is True :
# interpolate azimuth to find the azimuth to first station considered to 0.
ff=spi.interp1d(x=np.arange(1,azimuth.size),
y=azimuth[1:], fill_value='extrapolate')
y_new , azim = ff(0),np.ones_like(azimuth)
azim[0], azim[1:] = y_new , azimuth[1:]
else :
azim=azimuth[1:] # substract the zero value added for computation as origin.
#convert to degree : modulo 45degree
azim = np.apply_along_axis(lambda zz : zz * 180/np.pi , 0, azim)
return np.around(azim,3)
[docs]def build_geochemistry_sample():
"""
Build geochemistry_sample_data
Raises
------
Process to build geochemistry sample data manually .
Returns
-------
np.ndarray
Sample ,Geochemistry sample Data.
:Example:
>>> geoch=build_geochemistry_sample()
>>> print(geoch)
... sampleData
... [['S0X4' '0' '254.0' 'PUP']
... ['S0X4' '254' '521.0' 'mg']
... ['S0X4' '521' '625.0' 'tut']
... ['S0X4' '625' '984.0' 'suj']
... ['S0X2' '0' '19.0' 'pup']
... ['S0X2' '19' '425.0' 'hut']
... ['S0X2' '425' '510.0' 'mgt']
... ['S0X2' '510' '923.2' 'pyt']]
"""
tempsamp,SampleList=[],[]
comp=-1
while 1:
print('Enter Hole Name or <Enter/end> to stop:',end='')
holeName=input()
if holeName=="" or holeName.lower() in ["stop","end","enter",
"finish","close"]:
Sample=concat_array_from_list(list_of_array=SampleList,
concat_axis=0)
break
comp=comp+1
samp_buttom=np.float(input("Enter the buttom of the sampling (m):"))
while holeName:
if comp==-1:
samP=concat_array_from_list(list_of_array=tempsamp,concat_axis=0)
tempsamp=[]
break
print("Enter the sampling levels:",end='')
samplevel=input()
samplevel=dump_comma(input_car=samplevel, max_value=12,
carType='value')
samp_ar=np.array(list(samplevel))
samp_ar=samp_ar.reshape((samp_ar.shape[0],1))
dh_to=samp_ar[1:]
dh_to=np.append(dh_to,samp_buttom)
dh_to=dh_to.reshape((dh_to.shape[0],1))
print("Enter the samples' names:",end='')
sampName=input()
sampName=dump_comma(input_car=sampName, max_value=samp_ar.shape[0],
carType='mixed')
sampName_ar=np.array(list(sampName))
sampName_ar=sampName_ar.reshape((sampName_ar.shape[0],1))
try :
holes_names=np.full((sampName_ar.shape[0],1),holeName)
samfromto=np.concatenate((holes_names,samp_ar,dh_to,sampName_ar),axis=1)
except Exception as e :
raise ("IndexError!, arrrays sample_DH_From:{0} &DH_To :{1}&"
" 'Sample':{2} doesn't not match proprerrly.{3}".\
format(samp_ar.shape,dh_to.shape,sampName_ar.shape, e))
warnings.warn("IndexError !, dimentional problem."
" please check np.ndarrays.shape.")
_logger.warn("IndexError !, dimentional problem."
" please check np.ndarrays.shape.")
tempsamp.append(samfromto)
comp=-1
SampleList.append(samP)
print("\n")
return Sample
[docs]def parse_wellData(filename=None, include_azimuth=False,
utm_zone="49N"):
"""
Function to parse well information in*csv file
Parameters
----------
* filename : str, optional
full path to parser file, The default is None.
* include_azimuth: bool ,
Way to compute azimuth automatically
* utm_zone : str,
set coordinate _utm_WGS84. Defaut is 49N
Raises
------
FileNotFoundError
if typical file deoesnt match the *csv file.
Returns
-------
location: str
Name of location .
WellData : np.ndarray
Specificy the collar Data .
GeoData : np.ndarray
specify the geology data .
SampleData : TYPE
geochemistry sample Data.
:Example:
>>> import numpy as np
>>> dir_=r"F:\OneDrive\Python\CodesExercices\ex_avgfiles\modules"
>>> parse_=parse_wellData(filename='Drill&GeologydataT.csv')
>>> print("NameOflocation:\n",parse_[0])
>>> print("WellData:\n",parse_[1])
>>> print("GeoData:\n",parse_[2])
>>> print("Sample:\n",parse_[3])
"""
identity=["DH_Hole (ID)","DH_East","DH_North","DH_Dip",
"Elevation" ,'DH_Azimuth',"DH_Top","DH_Bottom",
"DH_PlanDepth","DH_Decr","Mask "]
car=",".join([ss for ss in identity])
# print(car)
#ckeck the if it is the correct file
_flag=0
if filename is None :
filename=[file for file in os.listdir(os.getcwd()) \
if (os.path.isfile(file)and file.endswith(".csv"))]
# print(filename)
if np.iterable (filename):
for file in filename :
with open (file,"r", encoding="utf-8") as f :
data=f.readlines()
_flag=1
else :
_logger.error('The {0} doesnt not match the *csv file'
' You must convert file on *.csv format'.format(filename))
warnings.error("The input file is wrong ! only *.csv file "
" can be parsed.")
elif filename is not None :
assert filename.endswith(".csv"), "The input file {0} is not in *.csv format"
with open (filename,'r', encoding='utf-8') as f:
data=f.readlines()
_flag=1
if _flag==1 :
try :
# print(data[0])
head=data[0].strip("'\ufeff").strip('\n').split(',')[:-1]
head=_nonevalue_checker(list_of_value=head,
value_to_delete='')
chk=[1 for ii ,value in enumerate( head) if value ==identity[ii]]
# data[0]=head
if not all(chk):
_logger.error('The {0} doesnt not match the correct file'\
' to parse drill data'.format(filename))
warnings.warn("The input file is wrong ! must "\
"input the correct file to be parsed.")
except Exception as e :
raise FileNotFoundError("The *csv file does no match the well file",e)
# process to parse all data
# coll,geol,samp=[],[],[]
for ss, elm in enumerate (data):
elm=elm.split(',')
for ii, value in enumerate(elm):
if value=='' or value=='\n':
elm=elm[:ii]
data[ss]=elm
data[0]=head
[data.pop(jj)for jj, val in enumerate(data) if val==[]]
if data[-1]==[]:
data=data[:-1]
## final check ###
safeData=_nonelist_checker(data=data, _checker=True ,
list_to_delete=['\n'])
data=safeData[2]
# identify collar , geology dans sample data
comp=0
for ss , elm in enumerate(data):
if elm[0].lower()=='geology':
coll=data[:ss]
comp=ss
if elm[0].lower() =="sample":
geol=data[comp+1:ss]
samp=data[ss+1:]
# build numpy data array
collar_list=coll[1:]
# print(collar_list)
collar_ar=np.zeros((len(collar_list), len(identity)),dtype='<U12')
for ii, row in enumerate(collar_ar):
collar_ar[ii:,:len(collar_list[ii])]= np.array(collar_list[ii])
bottom_ar=collar_ar[:,7]
# print(bottom_ar)
geol_list=geol[1:]
geol_ar=_order_well (data=geol_list,bottom_value=bottom_ar)
samp_list=samp[1:]
samp_ar=_order_well (data=samp_list,bottom_value=bottom_ar)
name_of_location =filename[:-3]
# find Description
DH_PlanDepth=collar_ar[:,8]
DH_Decr=collar_ar[:,9]
lenloc=len(name_of_location)
for ss , singleArray in enumerate (DH_PlanDepth):
# print(singleArray)
if singleArray == ''or singleArray is None :
singleArray=name_of_location[:-int(lenloc/2)]+collar_ar[ss,0][-2:]
DH_PlanDepth[ss]=singleArray
if DH_Decr[ss] ==''or DH_Decr[ss] is None :
DH_Decr[ss] = "wdanx"+collar_ar[ss,0][0]+\
name_of_location.lower()[0]+collar_ar[ss,0][-1]
collar_ar[:,8]=DH_PlanDepth
collar_ar[:,9]=DH_Decr
if include_azimuth==False:
DH_Azimuth=np.full((collar_ar.shape[0]),0)
elif include_azimuth== True:
DH_Azimuth=compute_azimuth(easting = np.array([np.float(ii)
for ii in collar_ar[:,1]]),
northing = np.array([np.float(ii)
for ii in collar_ar[:,2]]),
utm_zone=utm_zone, extrapolate=True)
collar_ar[:,5]=DH_Azimuth
name_of_location=name_of_location.capitalize()
WellData,GeoData,SampleData=collar_ar,geol_ar, samp_ar
return (name_of_location, WellData,GeoData,SampleData)
def _nonelist_checker(data, _checker=False ,
list_to_delete=['\n']):
"""
Function to delete a special item on list in data.
Any item you want to delete is acceptable as long as item is on a list.
Parameters
----------
* data : list
container of list. Data must contain others list.
the element to delete should be on list.
* _checker : bool, optional
The default is False.
* list_to_delete : TYPE, optional
The default is ['\n'].
Returns
-------
_occ : int
number of occurence.
num_turn : int
number of turns to elimate the value.
data : list
data safeted exempt of the value we want to delete.
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04', 'sample02', 'sample03'],
... ['sample04'], [], ['\n'],
... ['S01', '98.62776918', '204.7500461', '420.0266651'], ['prt'],
... ['pup', 'pzs'],[], ['papate04', '\n'],
... ['S02', '174.4293956'], [], ['400.12', '974.8945704'],
... ['pup', 'prt', 'pup', 'pzs', '', '\n'],
... ['saple07'], [], '', ['sample04'], ['\n'],
... [''], [313.9043882], [''], [''], ['2'], [''], ['2'], [''], [''], ['\n'],
... [''], ['968.82'], [], [],[], [''],[ 0.36], [''], ['\n']]
>>> ss=_nonelist_checker(data=listtest, _checker=True,
... list_to_delete=[''])
>>> print(ss)
"""
_occ,num_turn=0,0
if _checker is False:
_occ=0
return _occ, num_turn, data
while _checker is True :
if list_to_delete in data :
for indix , elem_chker in enumerate(data):
if list_to_delete == elem_chker :
_occ=_occ+1
del data[indix]
if data[-1]==list_to_delete:
_occ +=1
# data=data[:-1]
data.pop()
elif list_to_delete not in data :
_checker =False
num_turn+=1
return _occ,num_turn,data
def _order_well (data,**kwargs):
"""
Function to reorganize value , depth rock and depth-sample
the program controls the input depth value and compare it .
with the bottom. It will pay attention that bottom depth must
be greater or egual of any other value. In the same time ,
the program check if value entered are sorted on ascending order .
well must go deep ( less value to great value). Negative values of
depths are not acceptable.
Parameters
----------
* data : list,
data contains list of well thickness and rock description .
* bottom_value : np.ndarray float
value of bottom . it may the basement extrapolation.
default is 1.023 km
Returns
-------
np.ndarray
data, data aranged to [DH_Hole, DH_From, DH_To, Rock.] arrays
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04','Rock01', 'Rock02', 'Rock03', 'Rock04'],
>>> ['S01', '0.0', '98.62776918', '204.7500461','420.0266651',
... 'GRT', 'ATRK', 'GRT', 'ROCK'],
>>> ['S02', '174.4293956', '313.9043882', '400.12', '974.8945704',
... 'GRT', 'ATRK', 'GRT', 'ROCK']]
>>> print(listtest[1:])
>>> ss=_order_well(listtest[1:])
>>> print(ss)
"""
this_function_name=inspect.getframeinfo(inspect.currentframe())[2]
bottomgeo=kwargs.pop("bottom_value", None)
# bottomsamp=kwargs.pop("sample_bottom_value",None)
# if type(bottomgeo) is np.ndarray :_flag=1
# else : _flag=0
temp=[]
_logger.info ("You pass by {0} function! "
"Thin now , everything is ok. ".format(this_function_name))
for jj, value in enumerate (data):
thickness_len,dial1,dial2=intell_index(datalist=value[1:]) #call intell_index
value=np.array(value)
dh_from=value[1:thickness_len+1]
dh_to =np.zeros((thickness_len,), dtype="<U12")
# ---- check the last value given
max_given_bottom=np.max(np.array([np.float(ii) for ii in dh_from]))
# ----it may be less than bottom value .
dh_geo=value[thickness_len+1:]
dh_to=dh_from[1:]
# if _flag==0 :
if bottomgeo[jj] =="" or bottomgeo[jj]==None :
bottomgeoidx=1023.
if max_given_bottom > bottomgeoidx:
_logger.warn("value {0} is greater than the "
"Bottom depth {1}".format(max_given_bottom ,
bottomgeoidx))
warnings.warn (
"Given values have a value greater than the depth !")
dh_to=np.append(dh_to,bottomgeoidx)
else: #elif :_flag==1 : # check the bottom , any values given
# must be less or egual to depth not greater.
if max_given_bottom> np.float(bottomgeo[jj]):
_logger.warn("value {0} is greater than the Bottom "
"depth {1}".format(max_given_bottom ,
bottomgeo[jj]))
warnings.warn (
"Given values have a value greater than the depth !")
dh_to=np.append(dh_to,bottomgeo[jj])
dh_hole=np.full((thickness_len),value[0])
temp.append(np.concatenate((dh_hole.reshape((dh_hole.shape[0],1)),
dh_from.reshape((dh_from.shape[0],1)),
dh_to.reshape((dh_to.shape[0],1)),
dh_geo.reshape((dh_geo.shape[0],1))),axis=1 ))
data=concat_array_from_list(list_of_array=temp, concat_axis=0)
return data
[docs]def intell_index (datalist,assembly_dials =False):
"""
function to search index to differency value to string element like
geological rocks and geologicals samples. It check that value are sorted
in ascending order.
Parameters
----------
* datalist : list
list of element : may contain value and rocks or sample .
* assembly_dials : list, optional
separate on two list : values and rocks or samples. The default is False.
Returns
-------
index: int
index of breaking up.
first_dial: list ,
first sclice of value part
secund_dial: list ,
second slice of rocks or sample part.
assembly : list
list of first_dial and second_dial
:Example:
>>> import numpy as np
>>> listtest =[['DH_Hole', 'Thick01', 'Thick02', 'Thick03',
... 'Thick04','Rock01', 'Rock02', 'Rock03', 'Rock04'],
... ['S01', '0.0', '98.62776918', '204.7500461','420.0266651','520', 'GRT',
... 'ATRK', 'GRT', 'ROCK','GRANODIORITE'],
... ['S02', '174.4293956', '313.9043882','974.8945704', 'GRT', 'ATRK', 'GRT']]
>>> listtest2=[listtest[1][1:],listtest[2][1:]]
>>> for ii in listtest2 :
>>> op=intell_index(datalist=ii)
>>> print("index:\n",op [0])
>>> print('firstDials :\n',op [1])
>>> print('secondDials:\n',op [2])
"""
# assembly_dials=[]
max_=0 # way to check whether values are in sort (ascending =True) order
# because we go to deep (first value must be less than the next none)
for ii, value in enumerate (datalist):
try :
thick=float(value)
if thick >= max_:
max_=thick
else :
_logger.warning("the input value {0} is less than the previous one."\
" Please enter value greater than {1}.".format(thick, max_) )
warnings.warn("Value {1} must be greater than the previous value {0}."\
" Must change on your input data.".format(thick,max_))
except :
# pass
indexi=ii
break
first_dial=datalist[:indexi]
second_dial =datalist[indexi:]
if assembly_dials:
assembly_dials=[first_dial,second_dial]
return indexi, assembly_dials
return indexi, first_dial, second_dial
def _nonevalue_checker (list_of_value, value_to_delete=None):
"""
Function similar to _nonelist_checker. the function deletes the specific
value on the list whatever the number of repetition of value_to_delete.
The difference with none list checker is value to delete
may not necessary be on list.
Parameters
----------
* list_of_value : list
list to check.
* value_to_delete : TYPE, optional
specific value to delete. The default is ''.
Returns
-------
list
list_of_value , safe list without the deleted value .
:Example:
>>> import numpy as np
>>> test=['DH_Hole (ID)', 'DH_East', 'DH_North',
... 'DH_Dip', 'Elevation ', 'DH_Azimuth',
... 'DH_Top', 'DH_Bottom', 'DH_PlanDepth', 'DH_Decr',
... 'Mask', '', '', '', '']
>>> test0= _nonevalue_checker (list_of_value=test)
>>> print(test0)
"""
if value_to_delete is None :
value_to_delete =''
if type (list_of_value) is not list :
list_of_value=list(list_of_value)
start_point=1
while start_point ==1 :
if value_to_delete in list_of_value :
[list_of_value.pop(ii) for ii, elm in\
enumerate (list_of_value) if elm==value_to_delete]
elif value_to_delete not in list_of_value :
start_point=0 # not necessary , just for secure the loop.
break # be sure one case or onother , it will break
return list_of_value
def _strip_item(item_to_clean, item=None, multi_space=12):
"""
Function to strip item around string values. if the item to clean is None or
item-to clean is "''", function will return None value
Parameters
----------
* item_to_clean : list or np.ndarray of string
List to strip item.
* cleaner : str , optional
item to clean , it may change according the use. The default is ''.
* multi_space : int, optional
degree of repetition may find around the item. The default is 12.
Returns
-------
list or ndarray
item_to_clean , cleaned item
:Example:
>>> import numpy as np
>>> new_data=_strip_item (item_to_clean=np.array([' ss_data',' pati ']))
>>> print(np.array([' ss_data',' pati ']))
... print(new_data)
"""
if item==None :item = ' '
cleaner =[(''+ ii*'{0}'.format(item)) for ii in range(multi_space)]
if type(item_to_clean ) != list :#or type(item_to_clean ) !=np.ndarray:
if type(item_to_clean ) !=np.ndarray:
item_to_clean=[item_to_clean]
if item_to_clean in cleaner or item_to_clean ==['']:
warnings.warn ('No data found in <item_to_clean :{}> We gonna return None.')
return None
try :
multi_space=int(multi_space)
except :
raise TypeError('argument <multplier> must be'\
' an integer not {0}'.format(type(multi_space)))
for jj, ss in enumerate(item_to_clean) :
for space in cleaner:
if space in ss :
new_ss=ss.strip(space)
item_to_clean[jj]=new_ss
return item_to_clean
def _cross_eraser (data , to_del, deep_cleaner =False):
"""
Function to delete some item present in another list. It may cheCk deeper
Parameters
----------
* data : list
Main data user want to filter.
* to_del : list
list of item you want to delete present on the main data.
* deep_cleaner : bool, optional
Way to deeply check. Sometimes the values are uncleaned and
capitalizeed . this way must not find their safety correspondace
then the algorth must clean item and to match all at
the same time before erasing. The *default* is False.
Returns
-------
list
data , list erased.
:Example:
>>> data =['Z.mwgt','Z.pwgt','Freq',' Tx.Amp','E.mag',' E.phz',
... ' B.mag',' B.phz',' Z.mag', ' Zphz ']
>>> data2=[' B.phz',' Z.mag',]
... remain_data =cross_eraser(data=data, to_del=data2,
... deep_cleaner=True)
>>> print(remain_data)
"""
data , to_del=_strip_item(item_to_clean=data), _strip_item(item_to_clean=to_del)
if deep_cleaner :
data, to_del =[ii.lower() for ii in data], [jj.lower() for jj in to_del]
for index, item in enumerate(data):
while item in to_del :
del data[index]
if index==len(data)-1 :
break
return data
def _remove_str_word (char, word_to_remove, deep_remove=False):
"""
Small funnction to remove a word present on astring character
whatever the number of times it will repeated.
Parameters
----------
* char : str
may the the str phrases or sentences . main items.
* word_to_remove : str
specific word to remove.
* deep_remove : bool, optional
use the lower case to remove the word even the word is uppercased
of capitalized. The default is False.
Returns
-------
str
char , new_char without the removed word .
:Example:
>>> from pycsamt.utils import func_utils as func
>>> ch ='AMTAVG 7.76: "K1.fld", Dated 99-01-01,AMTAVG, Processed 11 Jul 17 AMTAVG'
>>> ss=func._remove_str_word(char=ch, word_to_remove='AMTAVG', deep_remove=False)
>>> print(ss)
"""
if type(char) is not str : char =str(char)
if type(word_to_remove) is not str : word_to_remove=str(word_to_remove)
if deep_remove == True :
word_to_remove, char =word_to_remove.lower(),char.lower()
if word_to_remove not in char :
return char
while word_to_remove in char :
if word_to_remove not in char :
break
index_wr = char.find(word_to_remove)
remain_len=index_wr+len(word_to_remove)
char=char[:index_wr]+char[remain_len:]
return char
[docs]def stn_check_split_type(data_lines):
"""
Read data_line and check for data line the presence of
split_type < ',' or ' ', or any other marks.>
Threshold is assume to be third of total data length.
:params data_lines: list of data to parse .
:type data_lines: list
:returns: The split _type
:rtype: str
:Example:
>>> from pycsamt.utils import func_utils as func
>>> path = os.path.join(os.environ["pyCSAMT"],
'csamtpy','data', K6.stn)
>>> with open (path, 'r', encoding='utf8') as f :
... data= f.readlines()
>>> print(func.stn_check_split_type(data_lines=data))
"""
split_type =[',', ':',' ',';' ]
data_to_read =[]
if isinstance(data_lines, np.ndarray): # change the data if data is not dtype string elements.
if data_lines.dtype in ['float', 'int', 'complex']:
data_lines=data_lines.astype('<U12')
data_lines= data_lines.tolist()
if isinstance(data_lines, list):
for ii, item in enumerate(data_lines[:int(len(data_lines)/3)]):
data_to_read.append(item)
data_to_read=[''.join([str(item) for item in data_to_read])] # be sure the list is str item .
elif isinstance(data_lines, str): data_to_read=[str(data_lines)]
for jj, sep in enumerate(split_type) :
if data_to_read[0].find(sep) > 0 :
if data_to_read[0].count(sep) >= 2 * len(data_lines)/3:
if sep == ' ': return None # use None more conventional
else : return sep
[docs]def minimum_parser_to_write_edi (edilines, parser = '='):
"""
This fonction validates edifile for writing , string with egal.
We assume that dictionnary in list will be for definemeasurment
E and H fied.
:param edilines: list of item to parse
:type edilines: list
:param parser: the egal is use to parser edifile .
can be changed, default is `=`
:type parser: str
"""
if isinstance(edilines,list):
if isinstance(edilines , tuple) : edilines =list(edilines)
else :raise TypeError('<Edilines> Must be on list')
for ii, lines in enumerate(edilines) :
if isinstance(lines, dict):continue
elif lines.find('=') <0 :
raise 'None <"="> found on this item<{0}> of the edilines list. list can not'\
' be parsed.Please put egal between key and value '.format(edilines[ii])
return edilines
[docs]def round_dipole_length(value, round_value =5.):
"""
small function to graduate dipole length 5 to 5. Goes to be reality and
simple computation .
:param value: value of dipole length
:type value: float
:returns: value of dipole length rounded 5 to 5
:rtype: float
"""
mm = value % round_value
if mm < 3 :return np.around(value - mm)
elif mm >= 3 and mm < 7 :return np.around(value -mm +round_value)
else:return np.around(value - mm +10.)
[docs]def keepmin(array):
""" Keep the minimum in array and array value"""
os= np.where(array==array.min())[0]
if len(os)>1 :
os= os[0]
return int(os), array[int(os)]
[docs]def get_closest_value (values_range, input_value):
"""
Function to select the closest values when input values is
not in the values range. We assume that value types are arrays.
If the same value is repeated, should take the first index and
the value at that index.
:param values_range: values to get
:type values_range: array_like
:param input_value: specific value
:type input_value: float,
:returns: the closest value and its index
:rtye: float
"""
values_range = np.array(values_range)
# if all element less than zero than convert
#in put value to negative value
if np.all(values_range <0) :
if input_value >0 :
input_value *=-1 # for depth select purpose
if input_value < values_range.min():
print('--> ! Input value ={0} is out the range,'
' min value = {1}. Value should be reset to ={1}.'.
format(input_value, values_range.min())
)
warnings.warn('Input value ={0} is out '
'the range ! min value = {1}'.
format(input_value, values_range.min())
)
_logger.debug('Input value ={0} is out '
'the range ! min value = {1}'.
format(input_value, values_range.min())
)
input_value = values_range.min()
elif input_value > values_range.max():
warnings.warn('Input value ={0} is out '
'the range ! max value = {1}'.
format(input_value, values_range.max())
)
_logger.debug('Input value ={0} is out '
'the range ! max value = {1}'.
format(input_value, values_range.max()))
input_value = values_range.max()
print('!--> Input value ={0} is out '
'the range , min value = {1}. Value'
' should be reset to ={1}.'.format(
input_value, values_range.max()))
if input_value in values_range :
indexes,*_= np.where(values_range==input_value)
if len(indexes)==1 :
index =int(indexes )
# mean element is repeated then take the first index
elif len(indexes)>1 :
index = int(indexes)[0]
value = values_range[index]
return value , index
elif values_range.min() < input_value < values_range.max():
# values_range = sorted(values_range)
for ii, xx in enumerate(values_range):
if xx > input_value :
#compute distance :
# make the diffence between distances
d0 = abs(input_value-xx)
# need to take the short
d1= abs(values_range[ii-1]-input_value)
if d0 < d1 :
return xx , ii
elif d0> d1 :
return values_range[ii-1], ii-1
[docs]def geo_length_checker(main_param,
optional_param,
force =False,
param_names =('input_resistivities',
'input_layers'),
**kws):
"""
Geo checker is a function to check the differents length
of different geoparams.
The length of optional params should depend of the length of main params.
Therefore if the length of optional params is larger than the length of
the main params, the length of optional params will be reduced to
the length of main params.Otherwise if the length of optional params
is shorther than the length of the main params, will filled it either
with "None" if dtype param is string or 0. is float or 0 if integer.
If `force` is set ``True``, shoud raise errors if the main params and
the optional params have are not the same length.
Parameters
------------
* main_param : array_like, list
main parameter that must took
its length as reference length
* optional params : array_like, list
optional params, whom length depend
to the length of main params
* param_names : tuple or str
names of main params and optional params
so to generate error if exits.
* fill_value: str, float, optional
Default value to fill thearray in the case where
the length of optional param is
less than the length of the main param .If None ,
will fill according to array dtype
Returns
--------
array_like
optional param truncated according to the man params
"""
add_v =kws.pop('fill_value', None)
if isinstance(main_param, (str, float, int)):
main_param = np.array([main_param])
if isinstance(optional_param, (str, float, int)):
optional_param = np.array([optional_param])
if isinstance(main_param, (list, tuple)):
main_param =np.array(main_param)
if isinstance(optional_param, (list, tuple)):
optional_param =np.array(optional_param)
mes=''
if len(optional_param) > len(main_param):
mes ="".join(["---> Note ! {0} will be truncated ",
"to length = {1}as the same length of {2} ."])
warnings.warn(mes.format(param_names[1],
len(main_param),param_names[0] ))
optional_param= optional_param[:len(main_param)]
if force is True :
mess =''.join(['--> `force` argument is set ``True``,',
' Can not truncate {0} = {1} to fit the ',
'length of {2} = {3}.'])
raise CSex.pyCSAMTError_parameter_number(
mess.format(param_names[1],
len(param_names[1]),
param_names[0],
len(param_names[0])))
elif len(optional_param) < len(main_param) :
if force is True :
mess =''.join([ '--> `force` argument is set ``True``,',
' Can not fill the value of {0} ',
'to match the length of {1} = {2}.'])
raise CSex.pyCSAMTError_parameter_number(
mess.format(param_names[1], param_names[0],
len(param_names[0])))
if add_v is not None :
# repeat the value to add
add_v =[add_v for vv in range(
len(main_param)-len(optional_param))]
add_v = np.array(add_v)
if add_v is None :
if optional_param.dtype not in [ 'float', 'int'] :
add_v =['None' for i in range(
len(main_param)-len(optional_param))]
else :
for type_param, fill_value in zip(
[ 'float', 'int'],[ 0., 0] ):
if type_param == optional_param.dtype :
add_v =[fill_value for i in range(
len(main_param)-len(optional_param))]
mes =''.join(["--> Length of {0} is ={1} which ",
"length of {2} is ={3}. We'll add {4}",
"to fill {5} value."
])
warnings.warn(mes.format(param_names[1],
len(optional_param),
param_names[0],
len(main_param), add_v[0], param_names[1]))
optional_param= optional_param.tolist()
optional_param.extend(add_v)
return np.array(optional_param)
[docs]def fr_en_parser (f, delimiter =':'):
""" Parse the translated data file.
:param f: translation file to parse
:param delimiter: str, delimiter
:return: generator obj, composed of a list of
french and english Input translation.
:Example:
>>> file_to_parse = 'pme.parserf.md'
>>> path_pme_data = r'C:/Users\Administrator\Desktop\__elodata
>>> data =list(BS.fr_en_parser(
os.path.join(path_pme_data, file_to_parse)))
"""
is_file = os.path.isfile (f)
if not is_file:
raise IOError(f'Input {f} is not a file. Please check your file.')
with open(f, 'r', encoding ='utf8') as ft:
data = ft.readlines()
for row in data :
if row in ( '\n', ' '):
continue
fr, en = row.strip().split(delimiter)
yield([fr, en])
[docs]def convert_csvdata_from_fr_to_en(csv_fn, pf, destfile = 'pme.en.csv',
savepath =None, delimiter =':'):
""" Translate variable data from french csva data to english with
varibale parser file.
:param csv_fn: data collected in csv format
:param pf: parser file
:param destfile: str, Destination file, outputfile
:param savepath: [Path-Like object, save data to a path
:Example:
# to execute this script, we need to import the two modules below
>>> import os
>>> import csv
>>> path_pme_data = r'C:/Users\Administrator\Desktop\__elodata
>>> datalist=convert_csvdata_from_fr_to_en(
os.path.join( path_pme_data, _enuv2.csv') ,
os.path.join(path_pme_data, pme.parserf.md')
savefile = 'pme.en.cv')
"""
# read the parser file and separed english from french
parser_data = list(fr_en_parser (pf,delimiter) )
with open (csv_fn, 'r', encoding ='utf8') as csv_f :
csv_reader = csv.reader(csv_f)
csv_data =[ row for row in csv_reader]
# get the index of the last substring row
ix = csv_data [0].index ('Industry_type')
# separateblock from two
csv_1b = [row [:ix +1] for row in csv_data]
csv_2b =[row [ix+1:] for row in csv_data ]
# make a copy of csv_1b
csv_1bb= deepcopy(csv_1b)
for ii, rowline in enumerate( csv_1bb[3:]) : # skip the first two rows
for jj , row in enumerate(rowline):
for (fr_v, en_v) in parser_data:
# remove the space from french parser part
# this could reduce the mistyping error
fr_v= fr_v.replace(
' ', '').replace('(', '').replace(
')', '').replace('\\', '').lower()
# go for reading the half of the sentence
row = row.lower().replace(
' ', '').replace('(', '').replace(
')', '').replace('\\', '')
if row.find(fr_v[: int(len(fr_v)/2)]) >=0:
csv_1bb[3:][ii][jj] = en_v
# once translation is done, concatenate list
new_csv_list = [r1 + r2 for r1, r2 in zip(csv_1bb,csv_2b )]
# now write the new scv file
if destfile is None:
destfile = f'{os.path.basename(csv_fn)}_to.en'
destfile.replace('.csv', '')
with open(f'{destfile}.csv', 'w', newline ='',encoding ='utf8') as csvf:
csv_writer = csv.writer(csvf, delimiter=',')
csv_writer.writerows(new_csv_list)
# for row in new_csv_list:
# csv_writer.writerow(row)
savepath = cpath(savepath , '__pme')
try :
shutil.move (f'{destfile}.csv', savepath)
except:pass
return new_csv_list
# if __name__=="__main__" :
# parse_=parse_wellData(filename='shimenDH.csv',
# include_azimuth=True,utm_zone='49N')
# print("NameOflocation:\n",parse_[0])
# print("WellData:\n",parse_[1])
# print("GeoData:\n",parse_[2])
# print("Sample:\n",parse_[3])
# data =['Z.mwgt','Z.pwgt','Freq',' Tx.Amp','E.mag',' E.phz',
# ' B.mag',' B.phz',' Z.mag', ' Zphz ']
# data2=[' B.phz',' Z.mag',]
# # cleaner =[(''+ ii*'*') for ii in range(7)]
# print(_cross_eraser(data=data2, to_del=data))
# print(_cross_eraser(data=data, to_del=data2))
# # print(_strip_item(item_to_clean=data, item=' '))
# # # print(cleaner)
# ts ='50.0'
# ch ='AMTAVG 7.76: "K1.fld", Dated 99-01-01,AMTAVG, Processed 11 Jul 17 AMTAVG'
# ss=_remove_str_word(char=ts, word_to_remove='m', deep_remove=False)