# -*- coding: utf-8 -*-
"""
===============================================================================
Copyright © 2021 Kouadio K.Laurent
This file is part of pyCSAMT.
pyCSAMT is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyCSAMT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with pyCSAMT. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
.. _module-Dispatcher::`csamtpy.ff.processing.callffunc`
:synopsis: Typical call files functions
call computations functions , AGSO and dispatch for purposes
Created on Thu Nov 26 20:55:39 2020
@author: @Daniel03
"""
import os,warnings
# import warnings
import numpy as np
import pandas as pd
from csamtpy.utils import exceptions as CSex
from csamtpy.utils._csamtpylog import csamtpylog
from csamtpy.utils import func_utils as func
_logger=csamtpylog.get_csamtpy_logger(__name__)
#*** file from _geocodes folder :AGSO & AGSO.STCODES ******
def _code_strata():
return ['code', 'label', 'name', 'pattern',
'size', 'density', 'thickness', 'color']
[docs]def agso_data ():
"""
Geological data codes processing
.. deprecated:: function will later deprecated
"""
agsofiles =['AGSO.csv', 'AGSO_STCODES.csv']
for file in agsofiles :
path_to_agsofile=os.path.join(os.environ["pyCSAMT"], 'geodrill',
"_geocodes", file)
with open(path_to_agsofile,'r', encoding='utf8') as f:
fgeo=f.readlines()
for ss, stratum in enumerate(fgeo):
stratum=stratum.strip('\n').split(',')
fgeo[ss]=stratum
if file =='AGSO.csv':
fagso=fgeo
elif file =='AGSO_STCODES.csv':
fagso_stcodes=fgeo
fagso_stcodes[0]=fagso[0]
else :
_logger.warn("Geocodes files < AGS0 & AGS0_STCODES> Not found !")
CSex.pyCSAMTError_file_handling('Geocodes files :< AGS0 & AGS0_STCODES> not found.'\
'check the right path to _geocodes folder. ')
return fagso, fagso_stcodes
[docs]def set_stratum_on_dict():
"""
Process to put geocodes_strata and geocodes_structures into dictionnaries
better way to go on metaclasses merely. Thus each keys of dictionary will be
its own object.
Returns
-------
strata_dict : dict
Disctionnary of geostrata
structures_dict : dict
Dictionnary of geostructures.
"""
code_strata=_code_strata()
drop_bar=[' / ',' ', '/','-']
attr_dict={}
strata_dict, structures_dict={},{}
for iiagso in range(2):
fstrata=agso_data()[iiagso][1:]
for ss , elem in enumerate(fstrata):
for jj, attribute in enumerate(code_strata):
attr_dict["{0}".format(attribute)]=elem[jj]
stratum_description=elem[2]
for db in drop_bar :
if db in stratum_description :
stratum_description=stratum_description.replace(db,'_')
stratum_description=stratum_description.lower()
if stratum_description.startswith('"') or stratum_description.endswith('"'):
stratum_description=stratum_description[1:-1]
if iiagso ==1 :
structures_dict['{0}'.format(stratum_description)]=attr_dict
else :
strata_dict['{0}'.format(stratum_description)]=attr_dict
attr_dict={}
return strata_dict, structures_dict
#*** end call file from _geocodes folder :AGSO & AGSO.STCODES ******
#***start Avg files extract ****
def _validate_avg_file(avg_dataLines):
"""
Function to validate and readZonge Avg FILE :Plainty file _F1 and the Astatic file
_F2. Both files can be read and separatedd by the program. if Avg file doesnt match
any of them , the programm will generate errors .
Parameters
-----------
* avag_dataLines: list
list of avgData for all lines.
Returns
--------
int ,
type of AVG Zonge file , _F1 & _F2:
"""
remark =['\\ AMTAVG','$ ASPACE', '$ XMTR ']
count=0
import inspect
func_name = inspect.getframeinfo(inspect.currentframe())[2]
_logger.info ('Check the Avg_Datalinesfile %s',func_name )
if avg_dataLines is not ['']:
filehead="".join([ss for ss in avg_dataLines[:3]])
#------------------checker section ------------------------
#---> check if astatic file is on the head_section
if filehead.find('astatic'.upper())>1:
_FT=2
elif 'astatic'.upper() not in filehead:
for ss in remark :
if filehead.find(ss) > 0 :count +=1
#---> that mean all the remak elemt are on the head_section
if count >=2 :
_FT=1
else:
_logger.error('Wrong Avg file. Must respect the typical zonge file.')
raise CSex.pyCSAMTError_avg_file('Type of AVg file provide is Wrong. No match'\
'the typical Zonge engeneering file.')
else :
_logger.error('No Zonge Engeneering files exists. No data found on that curent AVG file.'\
'Check the better path to Avgfiles right file.')
raise CSex.pyCSAMTError_avg_file('Check the The type of Avg file.')
#-------------------end checker -----------------------------
return _FT
[docs]def straighten_cac2CSfile (data_array, component_column_section = None):
"""
Sometimes head_sections of file _F2(CAC2CSAMT) provided is little
different colunms section name according to different version .
it 's better to filter and to check before returning the
correct informations we need.
Parameters
------------
* data_array : ndarray
data from AVG astatic file
* component_column_section : list
astactic file column comps provided
Returns
--------
array_we_need :ndarray
same infos present in the plainty /1 Avg file
array_other_comp : pd.Core.DataFrame
infos include through Astatic softwares
very usefull therefore we keep it .
"""
_comp_not_in_F1, _comp_we_need=['Z.mwgt','Z.pwgt','Z.mag',
'SRes','E.wght',
'H.wght','Z.%err',
'Z.perr','Gdp.Blk',
'Gdp.Chn', 'Gdp.Time'],['skp','Freq',
'Tx.Amp', 'E.mag',
'B.mag','B.phz',
'Z.phz','ARes.mag',
'E.%err','E.perr','B.%err',
'B.perr','Z.perr',
'ARes.%err','E.phz']
if component_column_section is None :
raise CSex.pyCSAMTError_inputarguments('head-section must be on list of string items.')
tem_del, other_del=[],[]
#strip comps _column to check....
component_column_section=func._strip_item(item_to_clean=component_column_section)
#---> put on dataFrame the component wee_need
df=pd.DataFrame(data=data_array, columns= component_column_section)
for ss in component_column_section :
if ss not in _comp_we_need:
tem_del.append(ss)
if ss in _comp_we_need :
other_del.append(ss)
df_we_need=df.drop(tem_del, axis=1)
df_we_need.reset_index(drop=True, inplace=True)
array_we_need= df_we_need.to_numpy()
#--->> _keep on other dataframe the component we dont need.
# other_comp_list =func._cross_eraser(data=component_column_section, to_del=tem_del,
# deep_cleaner=False)
df_other_comp=df.drop(other_del, axis=1)
df_other_comp.reset_index(drop=True, inplace=True)
#--> add skpi value if not in component section :
if 'skp' not in [ii.lower() for ii in component_column_section]: # to spare a manner 'Skp' is written.
skp_value =2 # give a certitude value that data were good quality :
skp_array=np.full((array_we_need.shape[0],1),skp_value)
array_we_need=np.concatenate((skp_array,array_we_need), axis=1)
return array_we_need , df_other_comp
[docs]def truncated_data (data, number_of_reccurence, **kwargs):
"""
Function to truncate all data according to number of frequency.
Parameters
----------
* data : list, or nd.array
data must be truncate.
* number_of_freq : int
number of frequency imaged.
Returns
-------
list
loc_list , data truncated on list.
"""
if type(data) is list :
data =np.array(data)
value_lengh= data.shape[0]
# index_loc = [np.int(ss) for ss in np.linspace(0, value_lengh, number_of_station)]
index_loc = [ss for ss in np.arange(0,value_lengh, number_of_reccurence)]
loc_list=[]
for ss, value in enumerate(index_loc) :
if ss ==len(index_loc)-1 :
loc_list.append(data[value:])
break
loc_list.append(data[value:index_loc[ss+1]])
return loc_list
def _numbering_station(number_of_station, number_of_freq):
"""
small function to numbering stations .
Prameters
---------
* number_of_station : int
number of station found on the site.
* number_of freq : int
number of frequency found for survey at each station.
Returns
--------
numbsta : str
station numbered
poly_sta : list
multplie station for each frequency for each stations.
"""
numbsta =['S{:02}'.format(ii) for ii in range (number_of_station)]
poly_sta=numbsta*number_of_freq
poly_sta.sort()
return numbsta , poly_sta
[docs]def zstar_array_to_nan (zstar_array, nan_value=np.nan, keep_str =False):
"""
Parameters
----------
* zstar_array : ndarray
array contain unfloat converter value. the unconverter value can be a '*'
* nan_value : float or np.nan type
the nan_value could be any value either int, float or str.
The *default* is np.nan.
* keep_str : bool, optional
keep the str item on your array. f keep_str is set to
false and the type nan_value is str , the program will force 'keep_str_' to True
to allow converter .
The *default* is False.
Returns
-------
ndarray
zstrar_array converted .
"""
for kk , value in enumerate(zstar_array):
try : zstar_array[kk] =float(value)
except :zstar_array[kk]=nan_value
if type(nan_value) is str : keep_str=True
if keep_str : return zstar_array
return np.array([float(kk) for kk in zstar_array])
[docs]def get_array_from_reffreq ( array_loc, freq_array,reffreq_value, stnNames=None):
"""
Get array value at special frequency
Parameters
------------
* array_loc : dict ,
dictionnary of stations , array_value e.g: S00:(ndarray,1) rho_values
* freq_array : (ndarray,1)
frequency array for CSAMT survey
* reffreq_value : int or float
the value of frequency user want to get the value
* stnNames : list
list of stations names .
Returns
---------
array_like
an array of all station with reffreq_value .
e.g reffreq_value =1024. it return all value of the array at 1024Hz frequency .
"""
if stnNames is None : raise CSex.pyCSAMTError_station('You may at least specify '\
'the array or list of stations-Names or station id.')
arrayObj,freqObj,rfObj,stnObj =array_loc ,freq_array,reffreq_value,stnNames
def get_reffreq_index(freq_array, reffreq_value):
"""
get the index of reference index. From this index ,All array will filter data at this reffreq
value .
"""
for ii, freq in enumerate(freq_array):
if freq == reffreq_value:
index_rf = ii
break
return index_rf
return np.array([values[get_reffreq_index(freq_array=freqObj, reffreq_value=rfObj)] \
for stn in stnObj for keys, values in arrayObj.items() if stn==keys])
[docs]def relocate_on_dict_arrays(data_array, number_of_frequency, station_names =None):
"""
Put data arrays on dictionnary where keys is each station and value the array of that station.
if station_names is None , program will create name of station. if station_names is given ,
function will sorted stations names . please make sure to provide correctly station according
the disposal you want .
:param number_of_frequency: array of frequency during survey
:type number_of_frequency: array_like
:param station_names: list of station
:type station_names: list of array_like
:returns: infos at data stations
:rtype: dict
"""
if station_names is not None :
if type(station_names) is list :
assert len(station_names) == (data_array.size / number_of_frequency),\
CSex.pyCSAMTError_station('Number of Station provided must be <{0}> not <{1}>'.format(np.int(data_array.size / number_of_frequency)),
len(station_names))
else :
assert station_names.size == data_array.size / number_of_frequency, \
CSex.pyCSAMTError_station('Number of Station provided must be <{0}> not <{1}>'.format(np.int(data_array.size / number_of_frequency)),
station_names.size)
station_names =station_names.tolist()
NUM_STN=sorted(station_names)
elif station_names is None :
station_length =np.int(data_array.size / number_of_frequency )
NUM_STN=sorted((_numbering_station(number_of_station=station_length ,
number_of_freq=number_of_frequency))[0])
LIST_BROK = truncated_data(data=data_array, number_of_reccurence=number_of_frequency)
return { key: value for key, value in zip (NUM_STN, LIST_BROK)}
[docs]def dipole_center_position (dipole_position=None):
"""
Generaly positions are taken at each electrode of dipole to that to easy correct data for ploting and for
noise correction , we adjust coordinate by taking the center position that means ,
the number of points will be substract to one.
:param dipole_position: postion array at each electrodes.
:type dipole_position: array_like
:returns: centered position value array
:rtype: array_like
"""
if dipole_position is None : raise CSex.pyCSAMTError_inputarguments('NoneType can not be compute.'\
' Please provide the right values.')
try : dipole_position= np.array([float(pp) for pp in dipole_position])
except : raise CSex.pyCSAMTError_float('Cannot convert position values. '\
'Position must be a number not str.Please check your data!.')
# temp_pospp =[(dipole_position[pp]+dipole_position[pp-1])/2 for \
# pp, pos in enumerate( dipole_position) if ( pp>0 and pp <= dipole_position.size) ]
return np.array([(dipole_position[pp]+dipole_position[pp-1])/2 for \
pp, pos in enumerate( dipole_position) if ( pp>0 and pp <= dipole_position.size) ])
[docs]def get_matrix_from_dict(dict_array, flip_freq =False ,
freq_array= None , transpose =False ):
"""
Function to collect dictionnary station data with values as array
and build matrix along the line, The rowlines is assumed to be frequency
and colums as stations names. If `freq_array` is provided , `flip_freq` is
not usefull.
Parameters
----------
dict_array : dict
stations dictionnary array, keys are stations names and values are
stations values on array_like.
transpose : bool, optional
transpose matrix , if true wwill transpose data and
stations should be read as rowlines and frequency as columns.
The default is False.
freq_array : array , optional
array of frequency, if provided will check if frequency are range from
highest to lowest .Defalut is True
flip_freq :bool, optional
set to false if you want frequency to be lowest to highest otherwise
frequency are sorted Highest to lowest. Default is False
Returns
-------
ndarray(len(stations,), len(frequency))
matrix of dictionnary values
bool,
flip_freq, ascertain if frequency array is flipped or not.
"""
if not isinstance(dict_array, dict):
raise CSex.pyCSAMTError_processing('Main argument `dict array`'
' should be dictionnary'
', not {0}'.type(dict_array))
if freq_array is not None :
if isinstance(freq_array, (list, tuple)):
freq_array =np.array(freq_array)
try :
freq_array=freq_array.astype('float')
except :
raise CSex.pyCSAMTError_float('Could not convert array to value !')
# now check the order
if freq_array[0] < freq_array[-1]:
freq_array = freq_array[::-1]
flip_freq = True # set flip to True to flip data values
tem= [values for stn , values in dict_array.items()]
matrix = func.concat_array_from_list(tem, concat_axis=1)
if flip_freq is True : matrix =matrix [::-1] # reverse matrix
if transpose is True : matrix =matrix.T # Transpose matrix
return matrix, flip_freq
[docs]def plot_reference_frequency (reference_array, frequency_array,
data_array,
station_names =None , **kwargs):
"""
Function to plot reference frequency
Parameters
----------
reference_array : array_like,
array_of average_impedance Z_avg.
frequency_array: array_like
array of frequency on sites
data_array : ndarray,
nadarray of sounding curve, ndarray(len(frequency), len(number_ofstaions).
Returns
-------
viewer
"""
import matplotlib.pyplot as plt
ls =kwargs.pop('ls', '-')
marker =kwargs.pop('marker', 'o')
ms=kwargs.pop('ms', 0.2)
lw =kwargs.pop('lw', 2)
fs =kwargs.pop('fs', 0.8)
if isinstance(data_array, dict):
data_array =sorted(data_array) # sorted ditionnaries
station_names = list(data_array.keys())
data_array = func.concat_array_from_list(list(data_array.values(),
concat_axis=1))
if station_names is None :
station_names =['S{0:02}'.format(ii)
for ii in range(data_array.shape[1])]
fig =plt.figure()
ax =fig.add_subplot(111)
ax.loglog(frequency_array,
reference_array,
c='r', ls =ls, lw=lw, ms=ms *fs ,
marker =marker)
for ii in range(data_array.shape[1]):
ax.loglog(frequency_array, data_array[:, ii])
plt.show()
# if __name__=='__main__':
# # print(numfreq, num_sta)
# ch =['2', '7', '9', 'a', 'g', 'e', '*', '9','2', '7', '9', '*', 'g', 'e', 'geth', '9']
# array =np.array(ch)
# test=zstar_array_to_nan(zstar_array=array, keep_str=False)