# -*- coding: utf-8 -*-
# Created on Wed Nov 18 16:45:35 2020
# Author: Kouadio K.Laurent<etanoyau@gmail.com>
# Licence: LGPL
"""
.. _module-avg:: `pycsamt.ff.core.avg`
:synopsis: Module to deal with AVG file. each section of Avg file is a class
Avg file from Zonge Engeneering company is two types . module will read
the main file type as Type 1 . the othe type of AVG will rewrite on t
the main type. the module can expor that file for other use .
"""
import os
# import re
import warnings
import shutil
import numpy as np
import pandas as pd
from datetime import (datetime, timezone)
from pycsamt.utils import _p as infOS
from pycsamt.utils.decorator import deprecated
from pycsamt.ff.site import Site
from pycsamt.utils import zcalculator as Zcc
from pycsamt.utils._csamtpylog import csamtpylog
from pycsamt.utils import avg_utils as cfunc
from pycsamt.utils import func_utils as func
from pycsamt.utils import exceptions as CSex
_logger=csamtpylog.get_csamtpy_logger(__name__)
[docs]class Avg (object):
"""
A super class of all content of AVG Zonge file
Deal with Zonge Engeering Avg file .
Arguments
----------
**data_fn**: str
path to AVG file
================ =========== =======================================
Attributes Type Explanation
================ =========== =======================================
Header class container of Header informations
Data_section class container of all Data informations
Skip_flag str value to jugde the quality of data
can filled automatically . If no skp
value is provide , the program will
judge quality of provided data
and set to him a score .
================ =========== =======================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> Avg_object =Data(data_array=avg_data )
>>> stn=DATA.Station.name
>>> freq=DATA.Frequency.value
"""
encodage ='utf8'
def __init__(self, data_fn=None , **kwargs):
self._logging =csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self.data_fn =data_fn
self.Header=Header()
self.Data_section =Data()
self.Skip_flag =Skip_flag()
self.profile_fn =kwargs.pop('profile_fn', None)
self._f1_labels , self._f2_labels=['skp','Station',
' Freq','Comp',
' Amps','Emag',
'Ephz','Hmag',
'Hphz','Resistivity',
'Phase','%Emag',
'sEphz','%Hmag',
'sHphz','%Rho',
'sPhz'],['skp',
'Freq','Tx.Amp',
'E.mag','E.phz',
'B.mag','B.phz',
'Z.mag','Z.phz',
'ARes.mag','SRes',
'E.wgt', 'B.wgt',
'E.%err','E.perr',
'B.%err','B.perr',
'Z.%err','Z.perr',
'ARes.%err']
self._zmark='$'
self.EM_zonge2j = {'ExHx':'RXX',
'ExHy':'RXY',
'EyHx':'RYX',
'EyHy':'RYY'}
for key in list(kwargs.keys()):
self.__setattr__(key, kwargs[key])
if self.data_fn is not None :
self._read_avg_file ()
def _read_avg_file (self, data_fn=None ):
"""
Method to read avg file and set each objects .
The main source protocole to transfert data into corresponding classes.
Parameters
------------
* data_fn: str
path to AVG filename.
"""
if data_fn is not None :
self.data_fn =data_fn
if self.data_fn is None :
raise CSex.pyCSAMTError_avg_file(
'No AVG file found.Please check '
'your pathfile :<{0}>'.format(os.getcwd()))
# print(self.data_fn)
if self.data_fn is not None :
if os.path.isfile(self.data_fn) is False :
raise CSex.pyCSAMTError_AVG('could not find %s', self.data_fn)
self._logging.info('reading and checking the AVG file %s', self.data_fn)
#-------------------------------------------------------------
#self.data_fn[-4:].lower() != '.avg'or
# self.data_fn.endswith('avg'.upper())==False or
if (self.data_fn.endswith('avg') or \
self.data_fn.endswith('avg'.upper())) == False :
warnings.warn(
'<Get more infos about AVG> :{0}'.format(infOS.notion.AVG))
raise CSex.pyCSAMTError_AVG(
'file provided is not an AVGfile.Please check your file!')
elif self.data_fn.endswith('avg') or self.data_fn.endswith('.AVG'):
with open (self.data_fn, 'r', encoding='utf8') as fid :
avg_data_file=fid.readlines()
# ----now chech the avg file --
avg_file_type_num = cfunc._validate_avg_file(avg_dataLines =avg_data_file)
#-----------------------------------
#------------- main AVG file :F1 and keep data infos --------------
if avg_file_type_num == 1:
# --> run the headlines thin to find the word 'skp'
for idx , infos in enumerate (avg_data_file):
if 'skp' in infos :
avg_data_file_infos = avg_data_file[:idx]
break
self.Header.HardwareInfos.set_harware_infos(
[line.strip() for line in avg_data_file_infos
if line.startswith('\\')] )
self.Header.Tx.set_transmitter_properties (
Tx_data =[line.strip() for line in avg_data_file_infos
if line.startswith('$')])
self.Header.Rx.set_receiver_properties (
Rx_data =[line.strip() for line in avg_data_file_infos
if line.startswith('$')])
data_to_array =avg_data_file [5:]
# --- put data on list and array :
data_to_array = np.array([ datum.strip().split()
for datum in data_to_array ])
# self.skip.skp_flag==data_to_array[0,0] # fill the skip flag
self.Skip_flag.setandget_skip_flag(skip_flag=data_to_array[0,0])
# delete the skp array and comp _array
data_to_array =np.delete(data_to_array, np.array([0,3]), axis =1 )
self.Data_section._avg_set_components_from_data(
data_array = data_to_array,data_type =avg_file_type_num)
# Skp_flag(skip_flag='2')
#-------------Read ASTATIC file : F2 and keep data infos --------------
elif avg_file_type_num ==2 :
# set zonge Hardwares infos:
zonge_hardw_infos =[line.strip()
for line in avg_data_file
if line.startswith('\\')]
self.Header.HardwareInfos.set_harware_infos(
zonge_hardw_infos=zonge_hardw_infos)
self.Header.Tx.set_transmitter_properties(
Tx_data=[line.strip()for line in avg_data_file
if line.startswith('$Tx')])
for ss, line_avg in enumerate (avg_data_file):
if line_avg.startswith('$Rx'):
# keep the remains data + Rx-properties
temp_data =avg_data_file[ss:]
# truncate the data an keep the infos above thin $Rx
for_surv_configAnnot=avg_data_file[:ss]
break
self.Header.SurveyAnnotation.set_survey_annotations_infos (
survey_annotations_data =for_surv_configAnnot)
self.Header.SurveyConfiguration.set_survey_configuration_infos (
survey_config_data=for_surv_configAnnot)
tem_d, rx_infos, comp_receiv_info=[],[],[]
for jj, item in enumerate(temp_data):
#---> seek the data without the mark
#of the receiver of component list
if item.find('Z.mag')<0 and item.find('$Rx') < 0 :
ss__temp=item.strip().split(',')
for kk, value in enumerate ( ss__temp ) :
try :
ss__temp[kk]=float(value)
except : #--> in the case we meet '*' mean no value .
if value.strip() == '*' :
ss__temp[kk]=np.nan
pass
# if value=='*': ss__temp[kk]="0"
tem_d.append(ss__temp)
if '$Rx' in item :
comp_receiv_info.append(item)
if '$Rx.GdpStn' in item :
item = item.strip().split('=')[-1]
rx_infos.append(item)
if item.find('ARes.mag')> 0 and item.find('SRes')> 0 :
#- keep the head component infos
comp_sect=item.strip().split(',')
#-set receivers infos
self.Header.Rx.set_receiver_properties(Rx_data =comp_receiv_info)
#--> delete the none value on data
for sk , itemm in enumerate(tem_d):
if itemm==['']: tem_d.pop(sk)
data=np.array(tem_d)
data_to_array, other_comps = cfunc.straighten_cac2CSfile (
data_array=data,component_column_section = comp_sect)
freq_counts,array_of_repeat_frequency=np.unique(data_to_array[:,1],
return_counts=True)
station_list=rx_infos * freq_counts.size
station_array_for_data=[float(ii) for ii in station_list]
station_array_for_data.sort()
station_array_for_data=np.array(station_array_for_data)
if station_array_for_data.shape[0] !=data_to_array.shape[0]:
_logger.error('Trouble occured when reading Astatic file. '
'It seems the frequency are not the '
'same length along all stations.' )
warnings.warn_explicit(
'It seems something wrong while reading'
' Astatic file.', category=DeprecationWarning,
filename= func.__name__, lineno=1)
raise CSex.pyCSAMTError_avg_file(
'File might be corrupted. number of frequency must'
' be the same along along all stations. ')
data_to_array =np.delete(data_to_array, 0, axis =1 )
# print(data_to_array.shape)
# print( station_array_for_data.shape)
data_to_array =np.concatenate((station_array_for_data.reshape(
station_array_for_data.shape[0],1),
data_to_array), axis=1)
# data_to_array=np.insert(data_to_array, 1,
# station_array_for_data, axis=1 )
self.Data_section._avg_set_components_from_data(
data_array = data_to_array,
data_type =avg_file_type_num,
add_astatic_data= other_comps)
# -- we assume that if not problem occured thin now ,
# we set skip flag to '2' as good quality data
self.Skip_flag.setandget_skip_flag(skip_flag='2' )
else :
raise CSex.pyCSAMTError_avg_file(
'The number provided doesnt not match Avgfile.')
[docs] def avg_write_2_to_1 (self, data_fn=None , savepath =None ):
"""
Method to rewrite avg Astatic file (F2) to main file F1 .
:param data_fn: ASTATIC FILE
:type data_fn: str
:param savepath: path to save your rewritten file .
:type savepath: str
"""
if data_fn is not None : self.data_fn =data_fn
if self.data_fn is None :
raise CSex.pyCSAMTError_avg_file(
'No file detected. check your right path .')
if self.data_fn is not None :
if os.path.isfile (self.data_fn) is not True :
raise CSex.pyCSAMTError_avg_file(
'No file detected. Please check your right path. ')
#-- check whether the file is astatic file :
if self.data_fn.endswith('avg') or self.data_fn.endswith('.AVG'):
with open (self.data_fn, 'r', encoding='utf8') as fid :
avg_data_file=fid.readlines()
self.checker = cfunc._validate_avg_file(avg_data_file)
if self.checker == 1 :
raise CSex.pyCSAMTError_avg_file(
'Input Avgfile < {0}> is not Zonge ASTATIC file. '
'Please put the right file !'.format(
os.path.basename(self.data_fn)))
else :
raise CSex.pyCSAMTError_AVG(
'No avg file detected ! Please check your file !')
write_lines =['\\']
self._read_avg_file(data_fn=self.data_fn)
HW_dict =self.Header.HardwareInfos.sconfig_dict
for keys, values in HW_dict.items():
if 'astatic' == (str(keys)).lower() or keys=='' :continue
elif type(values) is list:values =''.join([ss for ss in values])
write_lines.append(''.join([keys, ':',values]+[',']))
#add copyleft.
write_lines.append(self.Header.HardwareInfos._writer_copyleft+
': {0}-{1}'.format(datetime.now(), timezone.utc) +'\n')
# start writing configure part.
write_lines.append("".join([self._zmark,'{0:<7}'.format(' ASPACE'),
'=','{0:>7}'.format( self.Header.Rx.rxLength),
self.Header.Rx.unit])+'\n')
write_lines.append("".join([self._zmark,'{0:<7}'.format(' XMTR'),
'=', '{0:>7}'.format(
self.Header.Tx.txGdpStn)])+'\n')
# write_lines.append(''.join(["{0:^9}".format(idx)
# for idx in self._f1_labels])+'\n')
#------write the interligne labels and markers ---------
def draw_fc(format_len =['{:^3}','{:^7}','{:^7}',
'{:^7}','{:^6}','{:^12}',
'{:^8}','{:^12}','{:^9}',
'{:^12}','{:^9}','{:^7}',
'{:^7}','{:^7}','{:^7}',
'{:^7}','{:^7}'] , value_to_format=self._f1_labels):
"""
Drawn formatage chain .
:param format_len: list of formatage indicator , flexible .
:type format_len: list
:param value-to-format: the list of item we want to format.
:type value_to_format: list
.. note:: user can change the disposal of avg_labels.
"""
for ss, item in enumerate(format_len):
value_to_format[ss]=item.format(value_to_format[ss])
return ''.join([new_carf for new_carf in value_to_format])
write_lines.append(draw_fc()+'\n')
write_lines.append('\\')
write_lines.append('++'.join([int(ss) * '-'
for ss in ['1','5','5','5','4','10',
'6','10','7','10','7','5',
'5','5','5','5','5'] ])+'+\n')
#------------start write block data -------------------
stnNames =self.Data_section.Station.names
# not usefull , we replace nan at the bottom .
def nan_to_zstar(nan_array, swaper='*' ):
"""
Swapp np.nan number on the data .
"""
nan_array= np.nan_to_num(nan_array, nan=-999)
return np.where(nan_array==-999, swaper, nan_array)
#--bring back all data ----
stn=np.concatenate(
([self.Data_section.Station.loc[name]
for name in stnNames]), axis=0)
freq=np.concatenate(
([self.Data_section.Frequency.loc[name]
for name in stnNames]), axis=0)
amps=np.concatenate(
([self.Data_section.Amps.loc[name]
for name in stnNames]), axis=0)
emag=np.concatenate(
([self.Data_section.Emag.loc[name]
for name in stnNames]), axis=0)
ephz=np.concatenate(
([self.Data_section.Ephz.loc[name]
for name in stnNames]), axis=0)
bmag=np.concatenate(
([self.Data_section.Hmag.loc[name]
for name in stnNames]), axis=0)
bphz=np.concatenate(
([self.Data_section.Hphz.loc[name]
for name in stnNames]), axis=0)
rho=np.concatenate(
([self.Data_section.Resistivity.loc[name]
for name in stnNames]), axis=0)
phase=np.concatenate(
([self.Data_section.Phase.loc[name]
for name in stnNames]), axis=0)
pcemag=np.concatenate(
([self.Data_section.pcEmag.loc[name]
for name in stnNames]), axis=0)
sephz=np.concatenate(
([self.Data_section.sEphz.loc[name]
for name in stnNames]), axis=0)
pchmag=np.concatenate(
([self.Data_section.pcHmag.loc[name]
for name in stnNames]), axis=0)
shphz=np.concatenate(
([self.Data_section.sHphz.loc[name]
for name in stnNames]), axis=0)
pcrho=np.concatenate(
([self.Data_section.pcRho.loc[name]
for name in stnNames]), axis=0)
sphase=np.concatenate(
([self.Data_section.sPhz.loc[name]
for name in stnNames]), axis=0)
# build skp_flag and fill measure component type ...
skpf =np.full((stn.shape[0],), self.Skip_flag.skip_flag)
comps =np.full((stn.shape[0],),self.Header.Rx.rxComps )
#----> write data line by line ------
for ii in range (skpf.shape[0]):
write_lines.append(''.join(['{0:^3}'.format(skpf[ii]),
'{0:^7}'.format((stn[ii])),
'{0:>7}'.format(freq[ii]),
'{0:^7}'.format(comps[ii]),
'{0:<6}'.format(amps[ii]),
'{:12.4e}'.format(emag[ii]),
'{:>8}'.format(ephz[ii]),
'{:12.4e}'.format(bmag[ii]),
'{:>9}'.format(bphz[ii]),
'{:12.4e}'.format(rho[ii]),
'{:>9}'.format(phase[ii]),
'{:>7}'.format(pcemag[ii]),
'{:>7}'.format(sephz[ii]),
'{:>7}'.format(pchmag[ii]),
'{:>7}'.format(shphz[ii]),
'{:>7}'.format(pcrho[ii]),
'{:>7}'.format(sphase[ii])]))
write_lines.append('\n')
#---- replace np.nan number by zonge star who mean no value ----
for jj, lines in enumerate(write_lines ):
if 'nan' in lines : lines =lines.replace('nan','{:>3}'.format('*'))
write_lines[jj]=lines
#---------------------end nan replacing ----------------
with open(''.join([os.path.basename(self.data_fn)[:-4],'_2_to_1.avg']),
'w', encoding='utf8') as fw:
fw.writelines (write_lines)
#---move to your savepath---
if savepath is not None :
shutil.move(''.join([os.path.basename(
self.data_fn)[:-4],'_2_to_1.avg']),
savepath)
print('---> Your <{0}> is rewritten to <{1}> successfully. <-----'.\
format(os.path.basename(self.data_fn),
''.join([os.path.basename(
self.data_fn)[:-4],'_2_to_1.avg'])))
[docs] def avg_to_jfile (self, avg_data_fn=None , profile_fn =None,
j_extension='dat', utm_zone='49N',**kws):
"""
Method to write avg file to Jfile, convert both files ,
Astatic or plainty avg file to A.G. Jones format.
Parameters
-----------
* avg_data_fn:str
pathLike , path to your avg file
* profile_fn: str
pathLike, path to your profile/station file .
* j_extension: str
Extension type you want to export file .
*Default* is ".dat"
* utm_zone: str
add if station_profile are not referenced yet.
later , it would be removed .
* savepath: str
pathLike , path to save your outpufile
* write_info: bool
write the informations of your input file ,
export informations into Jfile,*Default* is False.
* survey_name: bool,
survey_area
"""
#################################################
from pycsamt.ff.core.j import J_collection
##################################################
savepath =kws.pop('savepath', None)
write_info =kws.pop('writeInfos', False)
survey_name =kws.pop('survey_name', None)
if j_extension is not None and '.' not in j_extension:
j_extension ='.'+j_extension
self._logging.info('write Jformat file')
if avg_data_fn is not None : self.data_fn = avg_data_fn
if self.data_fn is None :
raise CSex.pyCSAMTError_avg_file(
"Could not find any path to read ."
"Please provide your right AVG file.")
if profile_fn is not None : self.profile_fn = profile_fn
if self.profile_fn is None :
raise CSex.pyCSAMTError_station(
'Need absolutely station file.'
' please provide your ".STN" file.')
#---> read the avg file
self._read_avg_file(data_fn=self.data_fn)
#---> read profile_obj from site
#.set_site_info(data_fn=, utm_zone = utm_zone)
site_obj = Site(data_fn=self.profile_fn, utm_zone=utm_zone)
#---> get the list of stations
station_list = sorted(self.Data_section.Station.names)
#---> Build jnames with survey name
if survey_name is None :
survey_name = self.Header.SurveyAnnotation.project_area
j_obj=J_collection()
jstnnames = j_obj.J.jname(number_of_sites = len(station_list),
survey_name=survey_name)
#---> get component and check it with j
self.Header.Rx.rxComps
for key, compvalue in self.EM_zonge2j.items():
if self.Header.Rx.rxComps == key :
jcomp = j_obj.J.jMode(polarization_type = compvalue)
#---> number of periods
jnperiod = self.Data_section.Frequency.numfreq
# frequency value
jperiod = 1/self.Data_section.Frequency.value
# call data section
app_rho_obj = self.Data_section.Resistivity.loc
phase_obj = self.Data_section.Phase.loc
emag_obj =self.Data_section.Emag.loc
hmag_obj =self.Data_section.Hmag.loc
c_var_emag_obj = self.Data_section.pcEmag.loc
c_var_hmag_obj = self.Data_section.pcHmag.loc
c_var_app_rho_obj =self.Data_section.pcRho.loc
std_phase_obj = self.Data_section.sPhz.loc
#---> compute rhomax and rhomin
japp_rho_max, japp_rho_min , jphase_max, jphase_min,jwapp_rho = \
[{} for ii in range(5)] # create 4 empy dicts
for key, value in self.Data_section.Resistivity.loc.items():
japp_rho_max[key]= app_rho_obj[key] + 1* \
Zcc.compute_sigmas_e_h_and_sigma_rho(
pc_emag= c_var_emag_obj[key],
pc_hmag= c_var_hmag_obj[key] ,
pc_app_rho=c_var_app_rho_obj[key],
app_rho= app_rho_obj[key],
emag= emag_obj[key],
hmag=hmag_obj[key])[-1]
japp_rho_min[key]= app_rho_obj[key] - 1 *\
Zcc.compute_sigmas_e_h_and_sigma_rho(
pc_emag= c_var_emag_obj[key],
pc_hmag= c_var_hmag_obj[key] ,
pc_app_rho=c_var_app_rho_obj[key],
app_rho= app_rho_obj[key],
emag= emag_obj[key],
hmag=hmag_obj[key])[-1]
for key, value in self.Data_section.Phase.loc.items():
jphase_max [key]= 180 * (
phase_obj[key] + 1 * std_phase_obj[key]/100)/np.pi
jphase_min [key] = 180 * (
phase_obj[key] - 1 * std_phase_obj[key]/100)/np.pi
#convert phase millirad value in degree :
phase_obj ={key: 180 * phase * 1e-3/np.pi
for key, phase in phase_obj.items()}
# compute jwro and jwpha
#we assume to take weight to 1.
jwapp_rho ={ key:value for key ,
value in zip(station_list,
[np.ones((self.Data_section.Frequency.numfreq,),
dtype='float64')
for ii in range(len(station_list))])}
import copy # use deep copy to copy the same weight and re
jwpha = copy.deepcopy(jwapp_rho)
#-----> starting writing Jfiles <-----
#--- > start writing
write_jlines =[]
if survey_name is None : code_name = 'ks{0:02}-{1}'
elif survey_name is not None : code_name = survey_name[:-3]+'{0:02}-{1}'
codespace= 5*' '
if savepath is None : # create a folder in your current work directory
try :
savepath = os.path.join(os.getcwd(), '_outputJ_')
if not os.path.isdir(savepath):
os.mkdir(savepath)
except :
warnings.warn("It seems the path already exists !")
for ii , stn in enumerate(station_list) :
#-- > write Head j
write_jlines.append(''.join(
[j_obj.J._comment_mark,'{0:} :'.format(j_obj.J.jinfo.progvers) ,
code_name.format(ii, station_list[ii]), codespace,
datetime.now().strftime(j_obj.J.jfmtime),
codespace, 'RECS'])+'\n')
#--> write info :
if write_info :
try :
counter_break=0
lineinfo=[]
#--> write zongeHardware infos
for zongeinfo in [self.Header.HardwareInfos.sconfig_dict,
self.Header.SurveyAnnotation.sconfig_dict,
self.Header.SurveyConfiguration.sconfig_dict,
self.Header.Tx.sconfig_dict,
self.Header.Rx.sconfig_dict]:
lineinfo.append(j_obj.J._comment_mark)
for keyinfo, valueinfo in zongeinfo.items():
if isinstance(valueinfo, list):
valueinfo = ''.join(
[ss for ss in valueinfo if ss !=None])
if 'None' in valueinfo:
valueinfo=valueinfo.replace('None','*')
if valueinfo is not None :
lineinfo.append(
"{0}{1:^2}{2}".format(keyinfo,':',valueinfo))
lineinfo.append('{0:3^}'.format(','))
counter_break +=1
#--> go to line to let visible info on
#jfile more visible.
if counter_break==7 :
lineinfo.append('\n#')
counter_break=0
counter_break=0
write_jlines.append(''.join(lineinfo)+'\n')
lineinfo=[]
except :pass
#--- > write jlabels
write_jlines.append(''.join(['{0:<13}'.format(j_obj.J._jlabels[0]),
'=', "{0:>13.1f}".format(
site_obj.azim[stn]), '\n'] ))
write_jlines.append(''.join(['{0:<13}'.format(j_obj.J._jlabels[1]),
'=', "{0:>13.5f}".format(
site_obj.lat[stn]), '\n'] ))
write_jlines.append(''.join(['{0:<13}'.format(j_obj.J._jlabels[2]),
'=', "{0:>13.5f}".format(
site_obj.lon[stn]), '\n'] ))
write_jlines.append(''.join(['{0:<13}'.format(j_obj.J._jlabels[3]),
'=', "{0:>13.1f}".format(
site_obj.elev[stn]), '\n'] ))
#---> write jnames , components and nperiods
write_jlines.append(''.join(['{0}'.format(jstnnames[ii]),
'{0:>7.1f}'.format(
site_obj.azim[stn]),'\n']))
write_jlines.append('{0}'.format(jcomp)+'\n')
write_jlines.append(' {0}'.format(jnperiod)+'\n')
# now write value :
for jj in range(jnperiod):
write_jlines.append (''.join( [
'{0:<12.3e}'.format(jperiod[jj]),
'{0:^12.3e}'.format(app_rho_obj [stn][jj]),
'{0:^12.2e}'.format(phase_obj[stn][jj]),
'{:^12.3e}'.format(japp_rho_max[stn][jj]),
'{:^12.3e}'.format(japp_rho_min[stn][jj]),
'{:^12.2e}'.format(jphase_max[stn][jj]),
'{:^12.2e}'.format(jphase_min[stn][jj]),
'{:^7.2f}'.format(jwapp_rho[stn][jj]),
'{:^7.2f}'.format(jwpha[stn][jj]),
]))
write_jlines.append('\n')
with open('{0}{1}'.format(station_list[ii],j_extension),
'w', encoding='utf8') as fj:
fj.writelines(write_jlines)
if savepath is not None :
shutil.move ('{0}{1}'.format(
station_list[ii],j_extension), savepath)
write_jlines=[]
print('-'*77)
# if savepath is None :savepath =os.getcwd()
print('---> {0} J-files have been rewritten.\n---> see path:<{1}> '.\
format(len(station_list), savepath))
print('-'*77)
[docs] @deprecated("Use `pycsamt.ff.core.CSAMT.avg2edi ` instead.")
def avg_to_edifile (self, data_fn =None , profile_fn =None ,
savepath =None , apply_filter =None,
reference_frequency =None, number_of_points=7.,
dipole_length=50.,number_of_skin_depth=3., **kwargs):
"""
Method to write avg file to SEG-EDIfile.Convert both files.Astatic
or plainty avg file .if ASTATIC file is provided , will add the filter
and filter values .if avg file is not astatic file , user an apply
filter by setting filter to "tma, ama, or flma".Once apply ,
edifiles will exported by computing resistivities filtered
Parameters
----------
* data_fn : str
full path to avgfile
* savepath : str
outdir to store edifiles if None ,
is your current work directory
* profile_fn: str
full path to station _profile file
* apply_filter: str
add the name of filter to process the
avg file exported in edifiles.
can be [TMA | AMA| FLMA]
TMA - Trimming Moving Average
AMA - Adaptative Moving avarage ,
FLMA - Fixed dipoleLength moving
average (number of point=7)
Dipolelength willbe computed automatically
Returns
--------
obj, edi_obj
edi outputfiles or edi filtered file.
Holdings additionals parameters
==================== ========== =====================================
Optional Type Description
==================== ========== =====================================
reference_frequency float frequency at clean data.Default is
computed automatically
number_of_points=7. int number of point to for weighted
window,. *Default* is 7.
dipole_length float length of dipole in meter. For
CSAMT, *Default* is 50.
number_of_skin_depth float number of skin_depth can be 1 to 10
when using filter AMA.
*Default* is 3.
==================== ========== =====================================
:Example:
>>> from pycsamt.ff.core import avg
>>> avg_obj= avg.Avg()
>>> avg_obj.avg_to_edifile(data_fn= os.path.join(
... path_to_avgfile, avgfile) ,
... profile_fn = os.path.join(
... path_to_avgfile, station_profile_file),
... savepath =save_edipath,
... apply_filter=None )
"""
#######################################################################
from pycsamt.ff.core.edi import (Edi, Hmeasurement, Emeasurement)
from pycsamt.ff.processing import corr
#######################################################################
utm_zone = kwargs.pop('utm_zone', None)
if data_fn is not None : self.data_fn = data_fn
if self.data_fn is None :
raise CSex.pyCSAMTError_avg_file(
"Could not find any path to read ."
"Please provide your right AVG file.")
# export to savepath
if savepath is None : # create a folder in your current work directory
try :
savepath = os.path.join(os.getcwd(), '_outputAVG2EDI_')
if not os.path.isdir(savepath):
os.mkdir(savepath)# mode =0o666)
except :
warnings.warn("It seems the path already exists !")
if profile_fn is not None : self.profile_fn = profile_fn
if self.profile_fn is None :
warnings.warn ('No station profile file will detected.'
' Be aware sure , we will set location longitude ,'
' latitude and elevation to <0.>')
# read avg file
self._read_avg_file(data_fn=self.data_fn)
if self.profile_fn is not None :
site_obj = Site(data_fn=self.profile_fn, utm_zone=utm_zone)
#---> get the list of stations
head_dataid_list = sorted(self.Data_section.Station.names)
#-- compute dipole lenght between station close
dipole_length= self.Data_section.Station.loc[head_dataid_list [1]][0]-\
self.Data_section.Station.loc[head_dataid_list[0]][0]
#--> nfrequency =
nfreq = self.Data_section.Frequency.value.size
#-------------------------------------------------------------------
#---> compute error propagation , phase and resistivity
app_rho_obj = self.Data_section.Resistivity.loc
phase_obj = self.Data_section.Phase.loc
emag_obj =self.Data_section.Emag.loc
hmag_obj =self.Data_section.Hmag.loc
c_var_emag_obj = self.Data_section.pcEmag.loc
c_var_hmag_obj = self.Data_section.pcHmag.loc
c_var_app_rho_obj =self.Data_section.pcRho.loc
std_phase_obj =self.Data_section.sPhz.loc
#self.Header.HardwareInfos.astatic_version
# maker to check whether is plainty avg file or Astatic file
AS_flag =0
if self.Header.HardwareInfos.numfilterfreq is not None :
# self.Header.HardwareInfos.numfilterfreq is not None :
print('---> Reading Zonge `ASTATIC` file !')
AS_rho =self.Data_section.Resistivity.loc_Sres
AS_flag =1
# create 2empty dicts
error_propag_rho, error_propag_phs =[{} for ii in range(2)]
for key, value in self.Data_section.Resistivity.loc.items():
error_propag_rho[key]= app_rho_obj[key] + 1* \
Zcc.compute_sigmas_e_h_and_sigma_rho(
pc_emag= c_var_emag_obj[key],
pc_hmag= c_var_hmag_obj[key] ,
pc_app_rho=c_var_app_rho_obj[key],
app_rho= app_rho_obj[key],
emag= emag_obj[key],
hmag=hmag_obj[key])[-1]
for key, value in self.Data_section.Phase.loc.items():
# std = sPhz / 100 #converted in radians
error_propag_phs [key]= 180 *(
(std_phase_obj[key]/100)/1000)/np.pi
#convert phase millirad value in degree :
phase_obj ={key: 180 * phase * 1e-3/np.pi
for key, phase in phase_obj.items()}
#---------------------------------------------------------------
if apply_filter is not None :
try :
apply_filter= apply_filter.lower()
except:
print("---> ErrorType: Filters' names must a str of"
" `tma` `flma` or `ama`. not {0}".format(
type(apply_filter)))
warnings.warn("TypeError ,Filters' names must be a str of"
" `tma` ,`flma` or `ama`. not {0}".format(
type(apply_filter)))
apply_filter=None
else :
print('{0:-^77}'.format('Filter Infos'))
print('** {0:<27} {1} {2}'.format(
'Filter applied', '=', apply_filter.upper()))
if apply_filter in ['ama', 'tma', 'flma']:
# create processing_obj , unique obj for all filters
corr_obj= corr.shifting(data_fn = self.data_fn,
profile_fn=self.profile_fn)
if apply_filter =='tma':
self._logging.info (
'Computing rho with Trimming moving average (TMA) filter.')
# corr_obj= corr.shifting()
res_TMA= corr_obj.TMA(number_of_TMA_points =number_of_points ,
reference_freq=reference_frequency,
)
reference_frequency =corr_obj.referencefreq
print('** {0:<27} {1} {2}'.format('TMA filter points',
'=', number_of_points))
print('** {0:<27} {1} {2} Hz.'.format('Reference frequency',
'=', reference_frequency))
elif apply_filter =='flma':
self._logging.info (
'Computing rho with fixed-length moving average (FLMA) filter.')
res_FLMA = corr_obj.FLMA(number_of_dipole=number_of_points,
reference_freq=reference_frequency)
dipolelength= corr_obj.dipolelength
reference_frequency =corr_obj.referencefreq
print('** {0:<27} {1} {2}'.format('FLMA filter points',
'=', number_of_points))
print('** {0:<27} {1} {2} Hz.'.format('Reference frequency',
'=', reference_frequency))
print('** {0:<27} {1} {2} m.'.format('Dipole length ',
'=', dipolelength))
elif apply_filter == 'ama':
self._logging.info (
'Computing rho with adaptative moving average (AMA) filter.')
res_AMA = corr_obj.AMA(number_of_skin_depth=number_of_skin_depth,
reference_freq=reference_frequency)
dipolelength= corr_obj.dipolelength
reference_frequency =corr_obj.referencefreq
print('** {0:<27} {1} {2}'.format('Number of skin depths',
'=', int(number_of_skin_depth)))
print('** {0:<27} {1} {2} Hz.'.format('Reference frequency',
'=', reference_frequency))
print('** {0:<27} {1} {2} m.'.format('Dipole length ',
'=', dipolelength))
elif apply_filter is not None and apply_filter not in ['tma, ama, flma']:
warnings.warn('filter %s provided is UNrecognizing.'
' Pogram worsk currently with : TMA or FLMA ,'
' Please provided the right filters'
' for computing.'% apply_filter)
raise CSex.pyCSAMTError_processing(
'Filters provided is not acceptable.'
' Recognized filters are "TMA","AMA" AND "FLMA"')
if apply_filter is None :
if self.Header.HardwareInfos.numfilterfreq is None\
and self.Header.HardwareInfos.freq_filter is None :
avgfilter, descritipfilter = '', ""
elif self.Header.HardwareInfos.astatic_version is not None :
avgfilter ="{0}.{1}.Filter =TMA for 5pts.".format(
'Astatic',self.Header.HardwareInfos.astatic_version )
descritipfilter = self.Header.HardwareInfos.sconfig_dict['']
else : avgfilter, descritipfilter = '', ""
if apply_filter is not None :
msf=''
if apply_filter =='flma':
descritipfilter = ' {0} dipoles for FLMA filter'\
' at {1} hertz with {2} m dipole length'.format(
int(number_of_points), reference_frequency, dipolelength)
msf='dip'
elif apply_filter =='tma':
descritipfilter = ' {0} points for TMA filter at {1} hertz'.format(
int(number_of_points), reference_frequency)
msf='pts'
elif apply_filter =='ama':
descritipfilter = ' {0} skin depths for AMA filter at {1} hertz'.format(
int(number_of_skin_depth), reference_frequency)
number_of_points=number_of_skin_depth
msf='sdepths'
avgfilter ="{0}.{1}.Filter ={2} for {3} {4}.".format(
'pycsamt','v1.0.01', apply_filter.upper(),
int(number_of_points) , msf)
if self.profile_fn is None :
#---> set to 0. lon , lat and elev if profile _fn is not provided
#--> create dictionnary of zero value of lat and lon
import copy
#head_edi_lon = np.full((len(head_dataid_list),), 0., dtype=np.float)
head_edi_lon = {stn: value for stn , value in zip (
head_dataid_list,
np.zeros_like(len(head_dataid_list), dtype=np.float))}
head_edi_lat = copy.deepcopy(head_edi_lon)
head_edi_elev = copy.deepcopy(head_edi_lon)
elif self.profile_fn is not None :
head_edi_lon = site_obj.lon
head_edi_lat = site_obj.lat
head_edi_elev = site_obj.elev
#------------------------START SETTING EDI ATTRIBUTE ----------------
# from pycsamt.utils import gis_tools as gis
# for stn in head_dataid_list : #loop for all station or dataid
for ii, stn in enumerate(head_dataid_list):
#create Ediobj for eac datalist
edi_obj =Edi()
#====> set edi_obj Header attributes
edi_obj.Head.dataid= head_dataid_list[ii] # ii mean for all the list
edi_obj.Head.acqby = 'Zonge Engineering'
edi_obj.Head.acqdate = self.Header.HardwareInfos.dated
edi_obj.Head.fileby = "AMTAVG_v.{0}".format(
self.Header.HardwareInfos.version)
if self.Header.SurveyAnnotation.project_name is None :
self.Header.SurveyAnnotation.__setattr__(
'project_name ',os.path.basename(self.data_fn))
edi_obj.Head.loc = os.path.basename(self.data_fn)[:-4]
edi_obj.Head.filedate = self.Header.HardwareInfos.processed
edi_obj.Head.prospect = self.Header.SurveyAnnotation.contractor_name
edi_obj.Head.county = self.Header.SurveyAnnotation.project_area
edi_obj.Head.lat = head_edi_lat[stn]
edi_obj.Head.long = head_edi_lon[stn]
edi_obj.Head.elev = round(head_edi_elev[stn],2)
edi_obj.Head.maxsect =1000
#=====> set edi_obj_info .
# edi_obj.Info.maxinfo = 999
edi_obj.Info.filter =self.Header.HardwareInfos.freq_filter
edi_obj.Info.Source.__setattr__(
'project', self.Header.SurveyAnnotation.project_name)
edi_obj.Info.Source.__setattr__('survey', head_dataid_list[ii])
edi_obj.Info.Source.__setattr__('sitename', head_dataid_list[ii])
edi_obj.Info.Processing.__setattr__('processedby', 'pyCSAMT' )
edi_obj.Info.Processing.ProcessingSoftware.__setattr__(
'name', edi_obj.Head.fileby )
#====> definemeas
edi_obj.DefineMeasurement.maxchan =4
edi_obj.DefineMeasurement.maxrun = len(head_dataid_list)
edi_obj.DefineMeasurement.__setattr__('reftype' ,'CARTesien')
edi_obj.DefineMeasurement.__setattr__('reflat',edi_obj.Head.lat )
edi_obj.DefineMeasurement.__setattr__('reflong', edi_obj.Head.long)
edi_obj.DefineMeasurement.__setattr__('refelev',round(edi_obj.Head.elev,2))
# creating xxmeas object
codeID_dec = '{0}'.format((ii+1)/edi_obj.Head.maxsect)
# codeID= '{0:04}{1}'.format(ii * 10 + 1 , codeID_dec[1:] )
edi_obj.DefineMeasurement.__setattr__(
'meas_ex',
Emeasurement(**{'id':'{0:04}{1}'.format(ii * 10 + 1 ,
codeID_dec[1:] ),
'chtype':'EX',
'x': -(dipole_length/2),
'y': 0.,
'x2':dipole_length/2 ,
'y2':0,
'acqchan': '0.00',
'filter':avgfilter,
}))
edi_obj.DefineMeasurement.__setattr__(
'meas_ey',
Emeasurement(**{'id':'{0:04}{1}'.format(ii * 10 + 2 ,
codeID_dec[1:]),
'chtype':'EY',
'x':0.,
'y': -(dipole_length/2),
'x2':0.,
'y2':dipole_length/2 , 'acqchan': '0.00',
'filter':avgfilter}))
edi_obj.DefineMeasurement.__setattr__(
'meas_hx',
Hmeasurement(**{'id':'{0:04}{1}'.format(ii * 10 + 3 ,
codeID_dec[1:] ),
'chtype':'HX',
'x':0.,
'y': 0.,
'x2':0.,
'y2':0. , 'acqchan': '0.00',
'filter':avgfilter}))
edi_obj.DefineMeasurement.__setattr__(
'meas_hy',
Hmeasurement(**{'id':'{0:04}{1}'.format(ii * 10 + 4 ,
codeID_dec[1:]),
'chtype':'HY',
'x':0.,
'y': 0.,
'x2':0.,
'y2':0. , 'acqchan': '0.00',
'filter':avgfilter}))
#====> EMAPSECT
edi_obj.MTEMAP.sectid = stn
edi_obj.MTEMAP.__setattr__('nfreq', nfreq)
edi_obj.MTEMAP.__setattr__('maxblks', 64)
edi_obj.MTEMAP.ndipole= len(head_dataid_list) - 1
edi_obj.MTEMAP.type = descritipfilter
edi_obj.MTEMAP.__setattr__('hx', '{0:04}{1}'.format(
ii * 10 + 3 , codeID_dec[1:] ))
edi_obj.MTEMAP.__setattr__('hy', '{0:04}{1}'.format(
ii * 10 + 4 , codeID_dec[1:] ))
edi_obj.MTEMAP.__setattr__('chksum', 64 * nfreq)
# Frequency blocks , impendance and resistivity blocs
edi_obj.Z.freq = self.Data_section.Frequency.value
# set phase and resistitivity including error propagation
# compute error propagation
#--> initialize ndarray(nfreq, 2, 2)
res_array = np.zeros((nfreq, 2,2 ), dtype = np.float)
res_array_err = np.zeros((nfreq, 2,2 ), dtype = np.float)
phs_array = np.zeros((nfreq, 2,2 ), dtype = np.float)
phs_array_err = np.zeros((nfreq, 2,2 ), dtype = np.float)
#dictionnary of components . we set only component into XY .
res_array [:, 0 , 1 ] = app_rho_obj[stn]
res_array_err [:, 0 , 1] = error_propag_rho [stn]
phs_array[: , 0, 1] = phase_obj[stn]
phs_array_err [:, 0, 1] = error_propag_phs[stn]
if AS_flag ==1 :
res_as_array = np.zeros((nfreq, 2,2 ), dtype = np.float)
res_as_array [:, 0, 1] = AS_rho [stn]
if apply_filter is not None :
fres_array = np.zeros((nfreq, 2,2 ), dtype = np.float)
if apply_filter.lower() =='tma' :
fres_array [: , 0 , 1] = res_TMA[stn]
elif apply_filter.lower() =='flma' :
fres_array [: , 0 , 1] = res_FLMA[stn]
elif apply_filter.lower() =='ama' :
fres_array [: , 0 , 1] = res_AMA[stn]
#---> computing z with resistivities , phase by using propagrations errors
edi_obj.Z.set_res_phase(res_array = res_array, phase_array=phs_array,
freq= edi_obj.Z.freq,
res_err_array=res_array_err,
phase_err_array=phs_array_err)
# write edifiles ...
if AS_flag ==1 :
edi_obj.write_edifile(savepath = savepath,
add_filter_array =res_as_array )
elif apply_filter is not None:
edi_obj.write_edifile(savepath = savepath,
add_filter_array = fres_array )
else : edi_obj.write_edifile(savepath = savepath)
print('-'*77)
print('---> {0} Edi-files have been rewritten.\n---> see path:<{1}> '.\
format(len(head_dataid_list), savepath))
print('-'*77)
[docs]class SurveyAnnotation (object) :
"""
Class for survey annotations.
Arguments
-----------
**survey_annotations_data**: list , str
path to your annotation file or
Filled automatically
====================== ========= =====================================
Attributes Type Explanation
====================== ========= =====================================
project_name str project name
project_area str project area
custumer_name str company name
contractor_name str contractor name
projet_label str label or number
acqdate str data acquisition date
sconfig_dict dict survey annotation
dictionnary
====================== ========= =====================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> avg_obj= Avg(data_fn=path)
>>> surv_area= avg_obj.Header.SurveyAnnotation.project_area
... survey_area
... avg_obj.Header.SurveyAnnotation.sconfig_dict['Job.By']
"""
def __init__(self, survey_annotations_data =None , **kwargs):
self.zon_serv_annotations = survey_annotations_data
self.project_name ='CSAMT_survey'
self.project_area =None
self.custumer_name ='Zonge Engineering"'
self.contractor_name ='Zonge'
self.project_label = 'pyCSAMT'
self.acqdate ='{0}-{1}'.format(datetime.now(), timezone.utc)
self.sconfig_dict={'Job.Name':self.project_name ,
'Job.Area':self.project_area ,
'Job.For':self.custumer_name,
'Job.By':self.contractor_name,
'Job.Number':self.project_label ,
'Job.Date': self.acqdate,
}
for keys in list(kwargs.keys()):
if keys not in SurveyAnnotation.__dict__.keys():
self.__setattr__(keys, kwargs[keys])
self.sconfig_dict.__setitem__(keys, kwargs[keys])
[docs] def set_survey_annotations_infos (self, survey_annotations_data =None):
"""
Method to set _survey annotations informations .
Parameters
----------
* survey_annotation : list
container of survey annotations infos.
"""
_logger.info('Reading and setting zonge survey Annotations infos !')
if survey_annotations_data is not None :
self.zon_serv_annotations = survey_annotations_data
if self.zon_serv_annotations is None :
CSex.pyCSAMTError_inputarguments(
'No survey_annotations informations found!')
if type(self.zon_serv_annotations ) is not list:
try :
if os.path.isfile(self.zon_serv_annotations ) is True :
with open (self.zon_serv_annotations , 'r',
encoding ='utf8') as fid :
self.zon_serv_annotations =fid.readlines()
except : raise CSex.pyCSAMTError_Header(
"Please check your annotation data ! "
"Must be be a list or file ")
for zonsurva in self.zon_serv_annotations :
zonsurva =zonsurva.strip().split('=')
if 'job.name' in zonsurva[0].lower():
if zonsurva[1] !='':
self.project_name=zonsurva[1]
if 'job.area' in zonsurva[0].lower() :
self.project_area =zonsurva[1]
if 'job.for' in zonsurva[0].lower() :
if zonsurva[1] !='' :
self.custumer_name = zonsurva[1]
if 'job.by'in zonsurva[0].lower() :
if zonsurva[1] !='' :
self.contractor_name = zonsurva[1]
if 'job.number'in zonsurva[0].lower() :
if zonsurva[1] !='' :
self.project_label= zonsurva[1]
if 'job.date'in zonsurva[0].lower() :
if zonsurva[1] !='' :
self.acqdate = zonsurva[1]
self.sconfig_dict={'Job.Name':self.project_name ,
'Job.Area':self.project_area ,
'Job.For':self.custumer_name,
'Job.By':self.contractor_name,
'Job.Number':self.project_label ,
'Job.Date': self.acqdate,
}
[docs]class SurveyConfiguration(object) :
"""
Class for survey Survey configuration.
Arguments
---------
**survey_config_data** : list
configuration list
================= ========== ============================================
Attributes Type Explanation
================= ========== ============================================
surveyType str survey type (CSAMT, TEM, CR, TDIP)
surveyArray str array type(for CSAMT:Scalar, Vector, Tensor)
lineName str line label
lineNumber float line number
azimuth str line azimuth ( deg E of N)
stnGdpBeg float first GDP station number
StnGdpInc float GDP station number increment
stnBeg float possibly rescaled first station number
stnInc float possibly rescaled station number increment
stnLeft float rescaled station number on left edge of
pseudosection plot
stnRight float rescaled station number on right edge of
pseudosection plot
unitLength float length units (m,ft)
unitEmag str field units (nV/m,nV/Am)
unitHfield str field units (pT,pT/A
unitPhase str units (mrad,deg)
sconfig_dict dict surveyconfiguration dict elmts
================= ========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> avg_obj= Avg(data_fn=path)
>>> surv_nameLine= avg_obj.Header.SurveyConfiguration.lineName
... surv_nameLine
... avg_obj.Header.SurveyConfiguration.sconfig_dict['Stn.Right']
"""
def __init__(self, survey_config_data =None , **kwargs):
self.zon_surv_config = survey_config_data
self.surveyType ='CSAMT, TEM, CR, TDIP'
self.surveyArray ='CSAMT: Scalar, Vector, Tensor'
self.lineName='pyCSAMTline'
self.lineNumber='float:00'
self.azimuth =' 0 deg E of N'
self.stnGdpBeg=None
self.stnGdpInc =None
self.stnBeg =None
self.stnInc=None
self.stnLeft=None
self.stnRight =None
self.unitLength ='m,ft'
self.unitEmag ='nV/mA or micoV/kmA'
self.unitHfield ='pT, picoT/A'
self.unitPhase ='mrad,deg'
self.sconfig_dict= {'Survey.Type':self.surveyType,
'Survey.Array':self.surveyArray ,
'Line.Name':self.lineName,
'Line.Number':self.lineNumber ,
'Line.Azimuth':self.azimuth,
'Stn.GdpBeg':self.stnGdpBeg ,
'Stn.GdpInc':self.stnGdpInc,
'Stn.Beg':self.stnBeg ,
'Stn.Inc':self.stnInc ,
'Stn.Left':self.stnLeft ,
'Stn.Right':self.stnRight,
'Unit.Length':self.unitLength,
'Unit.E':self.unitEmag,
'Unit.B':self.unitHfield,
'Unit.Phase':self.unitPhase,
}
for keys in list(kwargs.keys()):
# lista=[key for key in Test.__dict__.keys() if not
#(key.startswith('__') and key.endswith('__'))]
if keys not in SurveyConfiguration.__dict__.keys():
self.__setattr__(keys, kwargs[keys])
self.sconfig_dict.__setitem__(keys, kwargs[keys])
[docs] def set_survey_configuration_infos (self, survey_config_data =None):
"""
Method to set _survey configurations informations .
Parameters
----------
* survey_config_data : list or pathLike str
container of survey configurations infos.
"""
_logger.info('Reading and setting survey configurations informations!')
if survey_config_data is not None :
self.zon_surv_config = survey_config_data
if self.zon_surv_config is None :
raise CSex.pyCSAMTError_Header("No Survey configuration data found."
" Please check your configuration file.")
if type(self.zon_surv_config) is not list:
try :
if os.path.isfile(self.zon_surv_config) is True :
with open (self.zon_surv_config, 'r', encoding ='utf8') as fid :
self.zon_surv_config=fid.readlines()
except : raise CSex.pyCSAMTError_Header(
"your configuration data must be either a list or a file.")
# print(self.zon_surv_config)
for configInfos in self.zon_surv_config :
configInfos=configInfos.strip().split('=')
if 'survey.type' in configInfos[0].lower():
if configInfos[1] != '':
self.surveyType=configInfos[1]
if 'survey.array' in configInfos[0].lower():
if configInfos[1] != '':
self.surveyArray=configInfos[1]
if 'line.name' in configInfos[0].lower():
if configInfos[1] != '':
self.lineName=configInfos[1]
if 'line.number' in configInfos[0].lower():
if configInfos[1] != '':
self.lineNumber=configInfos[1]
if 'line.azimuth' in configInfos[0].lower():
if configInfos[1] != '':
self.azimuth=configInfos[1]
if 'stn.gdpbeg' in configInfos[0].lower():
self.stnGdpBeg=configInfos[1]
if 'stn.gdpinc' in configInfos[0].lower():
self.stnGdpInc=configInfos[1]
if 'stn.beg' in configInfos[0].lower():
self.stnBeg=configInfos[1]
if 'stn.inc' in configInfos[0].lower():
self.stnInc=configInfos[1]
if 'stn.left' in configInfos[0].lower():
self.stnLeft=configInfos[1]
if 'stn.right' in configInfos[0].lower():
self.stnRight=configInfos[1]
if 'stn.length' in configInfos[0].lower():
self.unitLength=configInfos[1]
if 'unit.e' in configInfos[0].lower():
if configInfos[1] != '':
self.unitEmag=configInfos[1]
if 'unit.b' in configInfos[0].lower():
if configInfos[1] != '':
self.unitHfield=configInfos[1]
if 'unit.phase' in configInfos[0].lower():
if configInfos[1] != '':
self.unitPhase=configInfos[1]
self.sconfig_dict= {'Survey.Type':self.surveyType,
'Survey.Array':self.surveyArray ,
'Line.Name':self.lineName,
'Line.Number':self.lineNumber ,
'Line.Azimuth':self.azimuth,
'Stn.GdpBeg':self.stnGdpBeg ,
'Stn.GdpInc':self.stnGdpInc,
'Stn.Beg':self.stnBeg ,
'Stn.Inc':self.stnInc ,
'Stn.Left':self.stnLeft ,
'Stn.Right':self.stnRight,
'Unit.Length':self.unitLength,
'Unit.E':self.unitEmag,
'Unit.B':self.unitHfield,
'Unit.Phase':self.unitPhase,
}
[docs]class TransmitterProperties(object):
"""
Class for transmitter properties.
Arguments
-----------
**data_fn** : str
path to avgfile
================= ========== ============================================
Attributes Type Explanation
================= ========== ============================================
txType str source type (for NSAMT /CSAMT:Natural,
Bipole,Loop)
txGdpStn str transmitter ID from GDP Tx field
(GDP stn #) (alias = XMTR)
txStn str rescaled Tx ID
(rescaled client stn #)
txCenter float transmitter center-point easting,
northing,elevation
(float, length units)
(aliases = TxCX, TxCY)
txHPR int transmitter orientation
heading, pitch,
roll (Tx heading alias = TxBrg)
(heading = bipole azimuth, pitch=0, roll=0
for z+up or 180 for z positive down)
txLength str transmitter bipole length or square
loop width
(positive float, length units)
sconfig_dict dict params config dictionnary
================= ========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> path= os.path.join(os.environ['pyCSAMT'],
... 'pycsamt', data, 'LCS01.AVG')
>>> avg_obj= Avg(data_fn=path)
>>> avg_obj.Header.Tx.txGdpStn
>>> avg_obj.Header.Tx.txStn
>>> avg_obj.Header.Tx.sconfig_dict['Tx.Type']
"""
def __init__(self, **kwargs):
self.txType ='NSAMT /CSAMT: Natural, Bipole,Loop'
self.txGdpStn ='XMTR '
self.txStn='Tx ID'
self.txCenter='TxCX, TxCY'
self.txHPR='bipole azimuth, pitch=0, roll=0 for z+up or 180 for z positive down'
self.txLength='positive float, length units'
for keys in list(kwargs.keys()):
if keys not in TransmitterProperties.__dict__.keys():
self.__setattr__(keys, kwargs[keys])
self.sconfig_dict.__setitem__(keys, kwargs[keys])
[docs] def set_transmitter_properties(self, Tx_data =None):
"""
Method to set_Tx_properties.
:param tx_data: list of Tx-infos from AVG filename
:type tx_data:list
"""
if Tx_data is None :
CSex.pyCSAMTError_inputarguments('No Tx-proprerties found !')
if Tx_data is not None :
for tx_infos in Tx_data :
if '$ XMTR' in tx_infos or 'Tx.GdpStn' in tx_infos:
self.txGdpStn=np.float(tx_infos.strip().split('=')[-1])
if '$Tx.Stn' in tx_infos :
self.txStn =np.float(tx_infos.strip().split('=')[-1])
if 'Tx.Type' in tx_infos :
self.txType= tx_infos.strip().split('=')[-1]
if 'Tx.HPR' in tx_infos :
self.txHPR=[float(ss) for ss in
tx_infos.strip().split('=')[-1].split(',')]
if 'Tx.Length'in tx_infos :
self.txLength=float(tx_infos.strip().split('=')[-1].split()[-2])
# --> set configuration -dictionnary
self.sconfig_dict={'Tx.Type':self.txType,
'Tx.GdpStn':self.txGdpStn,
'Tx.Stn':self.txStn,
'Tx.Center':self.txCenter,
'Tx.HPR': self.txHPR,
'Tx.Length':self.txLength,
}
[docs]class ReceiverProperties(object):
"""
Class for receiver properties.
Arguments
----------
**Rx_data** : list or str
Filled automatically or path to your
receiver properties file.
================= ========== ============================================
Attributes Type Explanation
================= ========== ============================================
rxGdpStn str receiver GDP station number (alias = Station)
rxStn str receiver client station number
rxHPR int EM component heading, pitch, roll
(floats, Rx heading alias = RxBrg, ExAzm)
(heading = x component azimuth in degrees
east of north, pitch = x component
orientation in degrees. up from
horizontal, pitch = z cmp rotation about
x cmp, 0=z+up, 180 = z+ down)
rxLength str E-field dipole length or loop widths(
positive float(s),length units)
(alias = aspace)
(positive float, length units)
rxComps str EM component/impedance label(ExHx, ExHy,
EyHx, EyHy, Zxx, Zxy, Zyx, Zyy, Zvec, Zdet)
sconfig_dict dict parameters dict config
================= ========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> path= os.path.join(os.environ['pyCSAMT'],
'pycsamt', data, 'LCS01.AVG')
>>> avg_obj= Avg(data_fn=path)
... avg_obj.Header.Rx.txGdpStn
... avg_obj.Header.Rx.txStn
... avg_obj.Header.Rx.sconfig_dict['Tx.Length']
"""
def __init__(self, Rx_data=None , **kwargs):
self.rx_data =Rx_data
self.rxGdpStn =None
self.rxStn=None
self.rxHPR=None
self.rxLength=None
self.rxComps='ExHy'
self.unit=None
self.sconfig_dict={'Rx.GdpStn':self.rxGdpStn,
'Rx.Stn':self.rxStn,
'Rx.HPR':self.rxHPR ,
'Rx.Length': self.rxLength,
'Rx.Cmp':self.rxComps,
}
for keys in list(kwargs.keys()):
if keys not in ReceiverProperties.__dict__.keys():
self.__setattr__(keys, kwargs[keys])
self.sconfig_dict.__setitem__(keys, kwargs[keys])
[docs] def set_receiver_properties(self, Rx_data =None ):
"""
Methods to set Receivers - properties infos from AVG files.
Parameters
----------
* Rx_data : list, optional
container infos of receivers. The default is None.
"""
if Rx_data is not None :
self.rx_data = Rx_data
if self.rx_data is None :
CSex.pyCSAMTError_inputarguments('No Zonge Rx-proprerties found !')
if type(self.rx_data) is not list :
try :
if os.path.isfile(self.rx_data) ==True :
with open(self.rx_data, 'r', encoding='utf-8') as frx:
self.rx_data =frx.readlines()
except : raise CSex.pyCSAMTError_Header(
'Argument provided for rx_data are wrong !'
' Must be a list of file.')
for rx_infos in self.rx_data :
if '$Rx.Stn' in rx_infos :
self.rxStn =np.float(rx_infos.strip().split('=')[-1])
if 'Rx.Cmp' in rx_infos :
self.rxComps= rx_infos.strip().split('=')[-1]
if 'Rx.HPR' in rx_infos :
self.rxHPR=[float(ss) for ss in
rx_infos.strip().split('=')[-1].split(',')]
if ('$ ASPACE' in rx_infos) or ('Rx.Length'in rx_infos) :
#-- it seems sometimes file got the unit at the end either
# 'm' or ft'should remove it before converting on float
tx = rx_infos.strip().split('=')[-1]
if not 46 < ord(rx_infos.strip().split('=')[-1][-1]) < 58 :
tx =func._remove_str_word(
char=tx,word_to_remove=rx_infos.strip().split('=')[-1][-1],
deep_remove=False)
self.rxLength =np.float(tx)
self.unit = rx_infos.strip().split('=')[-1][-1]
else :self.rxLength=np.float(tx)
if '$Rx.GdpStn' in rx_infos :
self.rxGdpStn =np.float(rx_infos.strip().split('=')[-1])
self.sconfig_dict={'Rx.GdpStn':self.rxGdpStn,
'Rx.Stn':self.rxStn,
'Rx.HPR':self.rxHPR ,
'Rx.Length': self.rxLength,
'Rx.Cmp':self.rxComps,
}
[docs]class Skip_flag (object) :
"""
skip flag values
- 0. drop data
- 1. skip data
- 2. good quality of data
Arguments
----------
**skip_flag** : str or int ,
skip_flag value set automatically .
some astatic file doesnt mention the number of
skip_flag .Default is '2'.
:Example:
>>> from pycsamt.ff.core import Avg
>>> path= os.path.join(os.environ['pyCSAMT'],
... 'pycsamt', data, 'LCS01.AVG')
>>> avg_obj= Avg(data_fn=path)
>>> avg_obj.Skip_flag.skip_flag
>>> avg_obj.Skip_flag.get_skip_flag
"""
def __init__(self, skip_flag=None , **kwargs):
self.skip_flag= skip_flag
self.get_skip_flag=None
self.skip_flag_dict ={'2':'Good data quality',
'1': 'Data skipped',
'0' :'Data rejected',
'*': 'No data obtained'}
for keys in list(kwargs.keys()):
setattr(self, keys , kwargs[keys])
[docs] def setandget_skip_flag (self, skip_flag=None):
"""
simple method to set and get skip_flag.
Parameters
----------
* skip_flag: str
"""
if skip_flag is not None :
if skip_flag not in list(self.skip_flag_dict.keys()):
raise CSex.pyCSAMTError_config_file(
'Wrong Input ! skip flag must be'
' among : {0}'.format(list(self.skip_flag_dict.keys())))
self.skip_flag = str(skip_flag)
for keys , values in self.skip_flag_dict.items():
if keys==self.skip_flag :
self.get_skip_flag = values
[docs]class ZongeHardware(object):
"""
Some features of zonge AMTAVG software.
Arguments
---------
**zonge_hardw_infos** : list or str ,
filled automatically or path to your
hardware setting file.
================= ========== =========================================
Attributes Type Explanation
================= ========== =========================================
version str version of AMTAVG software
dated str date of building Avg file
processed str date of data preprocessing
from_fld_file str parent of avg file
astactic_version str zonge astactic sofware version
updated_data str date were data were updated
numfilterfreq int number of point that TMA used to filter
freq_filter float value of TMA filter frequency
end_astatic_file str end file description.
================= ========== =========================================
More can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> path= os.path.join(os.environ['pyCSAMT'],
... 'pycsamt', data, 'LCS01.AVG')
>>> avg_obj= Avg(data_fn=path)
>>> avg_obj.Header.HardwareInfos.freq_filter)
>>> avg_obj.Header.HardwareInfos.sconfig_dict['AMTAVG'])
"""
def __init__(self,zonge_hardw_infos=None , **kwargs):
self.zonge_hardw_infos=zonge_hardw_infos
self.version ='7.76'
self.from_fld_file = None
self.dated=None
self.processed= None
self.astatic_version = 'v3.60'
self.update_data =None
self.numfilterfreq=None
self.freq_filter=None
self.sconfig_dict={'AMTAVG': [self.version,self.from_fld_file],
'Dated':self.dated,
'Processed':self.processed,
'ASTATIC':[self.astatic_version, self.update_data ],
"":'{0} TMA Filter at {1} hertz'.format(
self.numfilterfreq,self.freq_filter )}
self._writer_copyleft ='@by_pyCSAMT'
for key in list (kwargs.keys()):
if key not in ZongeHardware.__dict__.keys():
setattr(key, kwargs[key])
[docs] def set_harware_infos (self, zonge_hardw_infos =None ):
"""
Method to set Harwares infos.
Parameters
----------
* zonge_hardw_infos : list
Hardware informations collected
"""
_logger.info ('Reading and setting Harwware informations !')
if zonge_hardw_infos is not None :
self.zonge_hardw_infos=zonge_hardw_infos
if self.zonge_hardw_infos is None :
raise CSex.pyCSAMTError_Header(
'No informations from hardware found !')
if type (self.zonge_hardw_infos) is not list :
try :
with open(self.zonge_hardw_infos, 'r', encoding='utf8') as fzh :
self.zonge_hardw_infos= fzh.readlines()
except :
raise CSex.pyCSAMTError_Header(
"Harware - infos must be either a list or file !")
for infos in self.zonge_hardw_infos :
if 'AMTAVG' in infos :
tem_h=infos.strip().split(',')
self.processed =func._remove_str_word(char=tem_h[-1],
word_to_remove='Processed')
self.dated=func._remove_str_word(char=tem_h [-2],
word_to_remove='Dated')
self.from_fld_file=tem_h[0].split(':')[-1]
self.version = tem_h[0].split()[2]
elif 'ASTATIC' in infos :
tem_as =infos.strip().split()
self.astatic_version=tem_as[2]
self.update_data =tem_as [-1]
elif 'TMA' in infos :
self.end_astatic_file =infos.strip().strip('\\')
tem_tma=infos.strip().split()
try :
tem_tma= [float(ss) for ss in tem_tma ]
except : pass
self.numfilterfreq = tem_tma[1]
self.freq_filter =tem_tma[-2]
self.sconfig_dict={'AMTAVG': [self.version,self.from_fld_file],
'Dated':self.dated,
'Processed':self.processed,
'ASTATIC':[self.astatic_version, self.update_data ],
"":'{0} TMA Filter at {1} hertz'.format(self.numfilterfreq,
self.freq_filter )}
[docs]class Data (object):
"""
AVG Data informations , Container of all data infos .
Arguments
---------
**data_array** : np.ndarray
Data collected form the site, It contains all the informations
during survey. It could read a user own data build
by classifying data according the main AVG file
================== ============== =======================================
Attributes Type Explanation
================== ============== =======================================
Station (ndarray, 1) station length infos . unit : m
Frequency (ndarray, 1) frequency using during survey ,unit : Hz
Amps (ndarray, 1) current induction values using during
survey , unit : Amp
Emag (ndarray, 1) Electric field magnitude(Unit.nV/m,nV/Am)
Ephz (ndarray, 1) Electric field phase (unit. mrad)
Hmag (ndarray, 1) Magnetic field magnitude (Unit.pT,pT/A)
Hphz (ndarray, 1) Magnetic field phase (unit. mrad)
Resistivity (ndarray, 1) Cagnard apparent Resistivity
magnitude (unit.ohm.m)
Phase (ndarray, 1) Impedance phase (unit. mrad)
Z (ndarray, 1) Impedance magnitude (unit. km/s)
pcEmag (ndarray, 1) relative |E| error (%)
pcHmag (ndarray, 1) relative |H| error (%)
sEphz (ndarray, 1) phase(E) error (unit.mrad)
sHphz (ndarray, 1) phase(H) error (unit.mrad)
sPhz (ndarray, 1) phase(Z) error (unit.mrad)
pcRho (ndarray, 1) relative apparent resistivity
(rho) error (%)
added_astatic_data (ndarray, 1) static_corrected-apparent
resistivity (unit.ohm.m)
set_to_degree bool phase angle value :if set to True ,
valueof phase will be on degree. make
sure to converte the data on radian
anglebefore not in mrad.
================== ============== =======================================
.. notes :: when data come straightforwardly from AVG file ,
data was set respecting the unit provided User must takes account
for data processing .A little bit conversion is needed.
User can use a special function. Normalized_station_value parameter
normalize station value . if set to true station will started name
from 0 to n
...
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> DATA=Data(data_array=avg_data )
>>> stn=DATA.Station.name
>>> freq=DATA.Frequency.value
"""
def __init__(self, data_array=None , **kwargs):
self._logging =csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self._data_array = data_array
self.Station =Station()
self.Frequency=Frequency()
self.Amps =Amps()
self.Emag=Emag()
self.Ephz=Ephz()
self.Hmag=Hmag()
self.Hphz=Hphz()
self.Resistivity=Resistivity()
self.Phase =Phase()
self.Z =Z_Tensor()
self.pcEmag=pcEmag()
self.sEphz=sEphz()
self.pcHmag=pcHmag()
self.sHphz=sHphz()
self.pcRho=pcRho()
self.sPhz=sPhz()
self.added_astatic_data= None
self._number_of_frequencies=None
self._num_of_stations=None
self.normalized_station_value=kwargs.pop('normalized_station_value', False)
self.set_to_degree =kwargs.pop('set_phase_angle_to_deg', False)
self._f = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._data_array is not None :
self._avg_set_components_from_data()
def _avg_set_components_from_data (self, data_array=None ,data_type=None,
add_astatic_data =None ):
"""
Method to set
avg_data_components from Avgfile
Parameters
----------
* data_array : np.ndarray
data recovered from avg file.
It must be on numpy array. The default is None.
* data_type : int, optional
show the type of Avg file,
must be 1 for plainty file 2 for Astatic file.
*Default* is None.
* add_astatic_data : pd.coreDataFrame , optional
added infos from astatic file.
*Default* is None.
"""
if data_array is not None :
self._data_array =data_array
if self._data_array is None :
raise CSex.pyCSAMTError_AvgData('No Data from avgfile to read.')
if data_type is not None :
self._f =data_type
if self._f not in [1,2]:
raise CSex.pyCSAMTError_AvgData(' May check your avg Datafile.')
number_of_freq_array , rep = \
np.unique(self._data_array[:,1], return_counts=True)
number_of_station_array , rep = \
np.unique(self._data_array[:,0], return_counts=True)
self._number_of_frequencies= number_of_freq_array.size
self._number_of_stations = number_of_station_array.size
# print(self.numfreq, self.numstation)
if add_astatic_data is not None :
added_astatic_data=add_astatic_data
pdf_Z= added_astatic_data.drop('SRes', axis=1)
pdf_Z.reset_index(drop=True, inplace =True )
self.Z._read_zAS_data(
z_abs_array =pdf_Z ,
number_of_frequencies= self._number_of_frequencies,
number_of_stations=self._number_of_stations )
#--> set asticatic
self.added_astatic_data=added_astatic_data['SRes'].to_numpy()
def zstar_array_to_nan (zstar_array, nan_value=np.nan, keep_str =False):
"""
Parameters
----------
* zstar_array : ndarray
array contain unfloat converter value.
the unconverter value can be a '*'
* nan_value : float or np.nan type
nan_value could be any value either int, float or str.
If The default is np.nan.
* keep_str : bool, optional
keep the str item on your array.
Returns
-------
array_like
zstrar_array converted
"""
for kk , value in enumerate(zstar_array):
try : zstar_array[kk] =float(value)
except :zstar_array[kk]=nan_value
if type(nan_value) is str : keep_str=True
if keep_str : return zstar_array
return np.array([float(kk) for kk in zstar_array])
# try to change the dtype of data if dtype of the dataset is not float
if self._data_array.dtype not in ['float', 'int']:
data_transitory =[zstar_array_to_nan(zstar_array=rowline)
for dd, rowline in enumerate(self._data_array)]
self._data_array=func.concat_array_from_list(
list_of_array=data_transitory)
# populate classes .
#cut of special freq_array and stations
# for more certaintly , any value will come must be check whether its
# possible to convert , if not value
# will set to np.nan
self.Station._set_station_(station_data_array=zstar_array_to_nan (
zstar_array=self._data_array[:,0]),
normalized_station_value=self.normalized_station_value )
self.Frequency._set_frequency(freq_array=zstar_array_to_nan (
zstar_array=self._data_array[:,1]))
self.Amps._read_amps_data(amps_array=zstar_array_to_nan (
zstar_array=self._data_array[:,2]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# Amps(amps_array=self._data_array[:,2])
self.Emag._read_emag_data(e_mag_array=zstar_array_to_nan (
zstar_array=self._data_array[:,3]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
# Emag(e_mag_array =self._data_array[:,3])
self.Ephz._read_ephz_data(e_phz_array=zstar_array_to_nan (
zstar_array=self._data_array[:, 4]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations,
to_degree =self.set_to_degree )
# Ephz(e_phz_array=self._data_array[:, 4])
self.Hmag._read_hmag_data(h_mag_array=zstar_array_to_nan (
zstar_array=self._data_array[:,5]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
# Hmag (h_mag_array =self._data_array[:,5])
self.Hphz._read_hphz_data(h_phz_array=zstar_array_to_nan (
zstar_array=self._data_array[:, 6]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations,
to_degree =self.set_to_degree )
# Hphz(h_phz_array=self._data_array[:,6])
if self._f==1 :
# Resistivity(res_array=self._data[:,7])
self.Resistivity._read_res_data(res_array =zstar_array_to_nan (
zstar_array=self._data_array[:,7]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# Phase (phase_array=self._data[:8])
self.Phase._read_phase_data(phase_array=zstar_array_to_nan (
zstar_array=self._data_array[:,8]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
# sPhz(s_phz_data=self._data_array[:-1])
self.sPhz._read_sPhz_data(sPhase_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-1]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# pcRho(pc_rho_data=self._data[:,-2])
self.pcRho._read_pcRes_data(pcRes_array =zstar_array_to_nan (
zstar_array=self._data_array[:,-2]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
if self._f==2 :
# Resistivity(res_array=self._data[:,8])
self.Resistivity._read_res_data(res_array =zstar_array_to_nan (
zstar_array=self._data_array[:,8]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations,
Sres=self.added_astatic_data)
# Phase (phase_array=self._data[:7])
self.Phase._read_phase_data(phase_array=zstar_array_to_nan (
zstar_array=self._data_array[:,7]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
# sPhz(s_phz_data=self._data_array[:-2])
self.sPhz._read_sPhz_data(sPhase_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-2]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# pcRho(pc_rho_data=self._data[:,-1])
self.pcRho._read_pcRes_data(pcRes_array =zstar_array_to_nan (
zstar_array=self._data_array[:,-1]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# pcEmag(pc_emag_data=self._data[:,-6])
self.pcEmag._read_pcEmag_data(pc_e_mag_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-6]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# sEphz(s_phz_data=self._data[:,-5])
self.sEphz._read_sEphz_data (sEphz_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-5]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
# pcHmag(pc_hmag_data =self._data[:, -4])
self.pcHmag._read_pcHmag_data(pc_h_mag_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-4]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations)
# sHphz(s_hphz_data=self._data[:,-3])
self.sHphz._read_sHphz_data (sHphz_array=zstar_array_to_nan (
zstar_array=self._data_array[:,-3]),
number_of_frequencies=self._number_of_frequencies,
number_of_stations=self._number_of_stations )
[docs]class Station(object):
"""
Stations informations
Arguments
-----------
**norm_station_value**: bool
If True, station will numbered starting by 0.*Default* is True
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of station columns components
max float maximum distance of survey lines
min float minimum distance strated recorded data
names str names of stations name filled automaticaly
end str last station name
lengh float lenght of the survey
unit str lines lenght units may be on [m, km , ft]
alldata nd.array ndarray of all data value from raw file
loc dict main attribute location of station: eg
loc ['S01'] show the distance of station at
that point.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Station
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> station=Station(path)
>>> print(station.loc['S07'])
"""
def __init__(self, station_data_array =None, **kwargs):
self._logging=csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self.station_data_array =station_data_array
self.normalized_station_value = kwargs.pop('normalized_station_value', False)
self.rename_station =kwargs.pop('rename_sation', None)
self.unit=kwargs.pop('unit_of_distance_measured', 'm')
self.names =None
self.value =None
self.max=None
self.min =None
self.last_station_name =None
self.length =None
self.value =None
def _set_station_(self,station_data_array =None, rename_station =None ,
normalized_station_value=False ) :
"""
Method to set station on dictionnary of each distance. In fact
to easily export for plotting each station .
Parameters
----------
* station_data_array : np.ndarray (ndarray,1), optional
station value recorded in the field at each point.
The *default* is None.
* rename_station : np.ndarray(ndarray,1), optional
list of station name provided.
*Default* is None.
* normalized_station_value : bool, optional
start the value of station by 0.
*Default* is False.
"""
if station_data_array is not None :
self.station_data_array=np.array(
[np.float(kk) for kk in station_data_array ])
if self.station_data_array is None :
raise CSex.pyCSAMTError_station('No stations data to read .')
num_station_counts , repsta = np.unique (self.station_data_array,
return_counts=True)
#---> check if stations_data provided are each the same length.
if np.all(repsta, axis=0) != True :
raise CSex.pyCSAMTError_station(
'Stations provided must be the same length of reccurency.')
if repsta[0] ==1 :
self.value = self.station_data_array
elif repsta[0] !=1 :
self.value =num_station_counts
self.value =np.array([np.float(ss) for ss in self.value])
#--- convert station unit -- Default is 'm'---
if self.unit =='km':
self.value, self.station_data_array =\
self.value /1e3, self.station_data_array/1e3
if self.unit == 'ft':
self.value, self.station_data_array= \
self.value /3.280839895, self.station_data_array/3.280839895
if self.unit not in ['km', 'ft', 'm']:
self._logging.warn(
'Station units provided is incorect.Try "km" or "ft." Default is "m."')
raise CSex.pyCSAMTError_station(
'Unit provided <{0}> doesnt not match correct units.'
'acceptable units are : "m", "km" or "ft"'.format(self.unit))
self.min, self.max, self.num_of_station = \
self.value.min(), self.value.max(), self.value.size
self.length=np.abs(self.max-self.min)
if normalized_station_value is not None :
self.normalized_station_value=normalized_station_value
if self.normalized_station_value :
self.value =np.apply_along_axis(
lambda x : np.abs(x)-self.min, 0 ,self.value)
self.station_data_array=np.apply_along_axis(
lambda x : np.abs(x)-self.min, 0, self.station_data_array)
# the case where the user provide its own name
if rename_station is not None :
self.rename_station = rename_station
if self.rename_station is not None :
if type (self.rename_station) is list :
self.rename_station=np.array(self.rename_station)
if self.value.size != self.rename_station.size :
raise CSex.pyCSAMTError_station(
'Stations rename array must have the same length as'
' the aray_data provided. lenght or stations_data is :<{0}>'
' '.format(self.value.size))
self.names =self.rename_station
elif rename_station is None :
self.names =['S{:02}'.format(ii) for ii in range (self.value.size)]
#---> duplicate stations names and put on dict
if repsta[0] ==1 :
self.loc ={keys:values for keys, values in zip(self.names , self.value)}
if repsta [0] !=1 :
# self.names =self.names * repsta [0]
self.names.sort()
# print(self.station_data_array)
# self.station_data_array=self.station_data_array.reshape
#((self.station_data_array.shape[0],1))
# print(self.station_data_array)
#--- build a commun array station-value and truncated on dict.
# data_to_truncate =np.concatenate((tem_names, self.station_data_array), axis =1 )
station_list_truncated =Zcc.truncated_data(
data=self.station_data_array, number_of_reccurence= repsta [0])
self.loc= {keys:values for keys, values in zip(
self.names, station_list_truncated)}
[docs]class Frequency (object):
"""
Frequency informations - Frequency at which data
was measured (Hertz). Frequency on Hz
Arguments
---------
**freq_array** : arrya_like
frequence data array
**normalize_freq_betw** : list
must be on list of integer value
If float value provided
should be convert on list, same as tuple
====================== ========== =======================================
Attributes Type Explanation
====================== ========== =======================================
value nd.array frequency data columns
components
max float Hight frequency
min float lower frequency
loc dict main attribute : location
of station loc ['S01'] show the frequency
of station at that point.
normalize_freq_betw list must be a list of integer be sure that
the numberprovided will be power
to 10. eg .(start, end, nfreq)
start - first number will be power by 10
0=1:10exp(0) end- last value to be
interpolated : 4 --> 10(exp)4
nfreq- number of frequency : eg : 17
*Default* is None
====================== ========== =======================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
"data", "avg", "K1.AVG")
>>> freq_obj=Avg(path)
>>> freq_obj =avg_obj.Data_section.Frequency.loc['S07']
... print(freq_obj)
"""
def __init__(self, freq_array =None,
normalize_freq_betw =None, **kwargs):
self._freq_array =freq_array
self.normalize_freq_betw=kwargs.pop('normalize_freq_betw', None )
self.value =None
self.min=None
self.max=None
self.numfreq=None
for keys in list (kwargs.keys()):
setattr(self, keys, kwargs[keys])
if self._freq_array is not None :
self._set_frequency()
def _set_frequency (self, freq_array =None, normalize_freq_betw = None ):
"""
Method to set frequency according to each station.
Parameters
----------
* freq_array : np.ndarray(ndarray, 1)
frequency data provided . The default is None.
* normalize_freq_betw : list
list of frequency range to normalize, optional.
The *default* is None.
"""
if freq_array is not None :
self._freq_array =freq_array
if self._freq_array is None :
raise CSex.pyCSAMTError_frequency('No Frequency Data found.')
self._freq_array =np.array([np.float(kk) for kk in self._freq_array])
#--> read frequency from AVG data file :
vacount_freq, freq_repeat =np.unique(self._freq_array,
return_counts=True)
self.numfreq = vacount_freq.size
if np.all(freq_repeat, axis =0) != True :
raise CSex.pyCSAMTError_frequency(
'Problem occured when reading frequency data.'
'All frequency on Avgfile Must be the same length. ')
# self.value =vacount_freq
# if normalize_freq_betw is not None :
# self.value =self.normalize_freq_betw()
# self.max, self.min=self.value.max(), self.value.min()
truncated_freq =Zcc.truncated_data( data =self._freq_array ,
number_of_reccurence=self.numfreq )
name, polyname = Zcc._numbering_station(
number_of_station=freq_repeat[0],
number_of_freq =self.numfreq)
self.loc ={ key:value for key, value in zip(name, truncated_freq )}
self.value = self.loc[name[0]]
if normalize_freq_betw is not None :
self.value =self.normalize_freq_betw()
self.max, self.min=self.value.max(), self.value.min()
[docs] def normalize_frequency (self , normalize_freq_betw=None ):
"""
method to interpolate frequencies.
.. deprecated:: waist_method will replaced on cs module.
Raises
------
CSex
raise Error if input arguments for frequency
interpolating is wrong.
Returns
-------
array_like
frequency normalized.
"""
if normalize_freq_betw is not None :
self.normalize_freq_betw = normalize_freq_betw
if self.normalize_freq_betw is not None :
if type(self.normalize_freq_betw) ==tuple :
self.normalize_freq_betw =list(self.normalize_freq_betw)
try :
self.normalize_freq_betw =[int(ss)
for ss in self.normalize_freq_betw]
except :
self._logging.warn (
"Input interpolated arguments are wrong! arguments "
"type must a list of integers. ")
raise CSex.pyCSAMTError_inputarguments(
"Input value of frequency is wrong. Must be"
" a list of integers.")
lengh_interp =np.linspace(self.normalize_freq_betw[0],
self.normalize_freq_betw[1],
self.normalize_freq_betw[2])
freq_new=np.power(10, lengh_interp)
# range value provide if value not in order .
freq_new.sort()
# freq_new=func.interpol_scipy(x_value=np.array(
# [ii for ii in range (self.numfreq)]),
# y_value =self.vacount_freq, x_new=)
return freq_new
[docs]class Comp (object):
"""
Components measured
Holds the following information:
Arguments
----------
**comp_name** : str
path to avg filename
**new_component** :str
component type which data will acquired . make sure
once change it , the method to calculate impedande Z and
rotating angle will also change to
- ExHy Zxy (Ex/Hy)
- EyHz Zyx (Ey/Hx)
- EyHy Zyy (Ey/Hy)
- ExHx Zxx (Ex/Hx)
================= ========== ============================================
Attributes Type Explanation
================= ========== ============================================
name str name of components will filled automatically
length float lenght of the survey lines
================= ========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Comp
>>> path=os.path.join(os.environ["pyCSAMT"],
'pycsamt', "data", "K1.AVG")
>>> component=Comp(path, component_name='EyHx')
>>> print(component.name)
"""
component_type =['ExHy', "EyHx", 'EyHy', 'ExHx']
def __init__(self, comp_name =None , **kwargs):
self.name =comp_name
self.new_comp=kwargs.pop('component_name', None)
if self.new_comp is not None :
if type (self.new_comp) is str :
if self.new_comp in self.component_type :
self.name =self.new_comp
else :
self._logging.warn ('Component provided is wrong !')
CSex.pyCSAMTError_inputarguments(
"Component provide as new component is wrong"
"list of components:{0}".format(self.component_type))
else :
warnings.warn(
"Components must be a string : a list of arguments below:"
"{0}".format(self.component_type))
CSex.pyCSAMTError_inputarguments(
"Component type is wrong. must a string."
"list of components:{0}".format(self.component_type))
[docs]class Amps (object):
"""
Square-Wave current (amps)
Arguments :
----------
**amps_array** : np.ndarray (ndarray, 1)
evolution of current data on the site
**number_of_stations** : int
number of survey_stations .
**number_of_frequencies**: int
number of frequencies during
the survey for each station location
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of Amp column on avgfile
max float maximum current enforce a that point
min float minimun current unit :amp (A)
loc dict main attribute location of station with
the current enforce a that point .eg loc
['S07'] show the current Amps-data
at that point
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> amps_obj =avg_obj.Data_section.Amps.loc['S00']
>>> print(amp_obj)
"""
def __init__(self, amps_array= None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self._amps_array =amps_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._amps_array is not None :
self._read_amps_data()
def _read_amps_data (self, amps_array= None, number_of_frequencies=None ,
number_of_stations=None ):
"""
Method to read and arange data according each station /amps values .
Parameters
----------
* amps_array : ndarray, optional
amps _current observed sites.
The *default* is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The *default* is None.
* number_of_stations : int , optional
number_of stations.
The *default* is None.
"""
if amps_array is not None :
self._amps_array =amps_array
if self._amps_array is None :
raise CSex.pyCSAMTError_inputarguments('No Ampers data !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else : raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self._amps_array=np.array([np.float(cc) for cc in self._amps_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the amp are wrong."
" must be float or integer .")
vcounts_amps, repeat_amp =np.unique (
self._amps_array, return_counts=True)
self.value =vcounts_amps
self.max, self.min =vcounts_amps.max(), vcounts_amps.min()
truncated_amps=Zcc.truncated_data( data =self._amps_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_amps )}
[docs]class Emag (object):
"""
E-field magnitude (microVolts/(kiloMeter*Amp))
Arguments
----------
**e_mag_array**: ndarray
E_field magnitude Class
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of Emag column on avgfile
max float maximum E-Field magnitude unit :mV/km*Amp
loc dict location of E-Field magnitude value at
the station lambda.e:g : loc['S00'] show
the current E-mag data of station
at that point.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> emag_obj =avg_obj.Data_section.Emag.loc['S02']
... print(emag_obj)
"""
def __init__(self, e_mag_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self.e_mag_array =e_mag_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.e_mag_array is not None :
self._read_emag_data ()
def _read_emag_data (self,e_mag_array=None, number_of_frequencies=None ,
number_of_stations=None ):
"""
Method to read and arange data according each
station /e_field magnitude values .
Parameters
----------
* e_mag_array : ndarray, optional
B-Field magnitude observed at each station.
The *default* is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
"""
if e_mag_array is not None :
self.e_mag_array =e_mag_array
if self.e_mag_array is None :
raise CSex.pyCSAMTError_inputarguments('No E-Field data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self.e_mag_array=np.array([np.float(cc) for cc in self.e_mag_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the amp are wrong."
" must be float or integer .")
vcounts_e_mag, repeat_amp =np.unique (self.e_mag_array,
return_counts=True)
self.value =vcounts_e_mag
self.max, self.min =vcounts_e_mag.max(), vcounts_e_mag.min()
truncated_e_mag=Zcc.truncated_data( data =self.e_mag_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_e_mag )}
[docs]class Ephz (object):
"""
E-field phase (milliRadians)
Arguments
----------
**e_phz_array** : ndarray
E_phase data field
**to_deg** : bool
put the Ephz value on degree .
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of Emag column unit - mrad
max float maximum E-Field magnitude unit -mV/km*Amp
loc dict location of E-phase value at the
station lambda . e.g - loc['S46] show the
current E-phase data at the station S46
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> ephz_obj =avg_obj.Data_section.Ephz.loc['46']
>>> print(ephz_obj)
"""
def __init__(self, e_phz_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self. e_phz_array= e_phz_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.e_phz_array is not None :
self._read_ephz_data ()
def _read_ephz_data (self,e_phz_array=None, number_of_frequencies=None ,
number_of_stations=None, **kwargs ):
"""
Method to read and arange data according each
station /E-phase angles values .
Parameters
----------
* e_phz_array : ndarray, en rad
E-Field phase observed .
The *default* is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The *default* is None.
* number_of_stations : int , optional
number_of stations.
The *default* is None.
* to_degree : bool,
compute angle to degree .
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :
self.to_degree = to_degree
if e_phz_array is not None :
self.e_phz_array =e_phz_array
if self.e_phz_array is None :
raise CSex.pyCSAMTError_inputarguments(
'No E-phase data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self.e_phz_array=np.array([np.float(cc) for cc in self.e_phz_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the E-phase are wrong."
" must be float or integer .")
if self.to_degree : #---> set angle to degree .
self.e_phz_array =np.apply_along_axis (
lambda ephz: ephz * 180/np.pi, 0,self.e_phz_array)
vcounts_e_phz, repeat_ephz=np.unique (self.e_phz_array,
return_counts=True)
self.value =vcounts_e_phz
self.max, self.min =vcounts_e_phz.max(), vcounts_e_phz.min()
truncated_e_phz=Zcc.truncated_data( data =self.e_phz_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_e_phz )}
[docs]class Hmag (object):
"""
H-field magnitude (picoTesla/Amp)
(milliGammas/Amp)
Arguments
----------
**h_mag_array** : ndarray
H/B_field magnitude Class .
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of Hmag column on avgfile maximum
max float H-mag magnitude unit - mV/km*Amp
loc dict location of H-mag magnitude value at the
station lambda . e:g : loc['S05']
show the current H-mag data
of station at S05.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> hmag_obj =avg_obj.Data_section.Hmag.loc['S05']
... print(hmag_obj)
"""
def __init__(self, h_mag_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self.h_mag_array =h_mag_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.h_mag_array is not None :
self._read_hmag_data ()
def _read_hmag_data (self, h_mag_array=None, number_of_frequencies=None ,
number_of_stations=None ):
"""
Method to read and arange data according
each station /B_field magnitude values .
Parameters
----------
* h_mag_array : ndarray, optional
B Field measured data on the sites.
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
"""
if h_mag_array is not None :
self.h_mag_array =h_mag_array
if self.h_mag_array is None :
raise CSex.pyCSAMTError_inputarguments('No B-Field data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self.h_mag_array=np.array([np.float(cc) for cc in self.h_mag_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the B-Field are wrong."
" must be float or integer .")
vcounts_h_mag, repeat_amp =np.unique (self.h_mag_array,
return_counts=True)
self.value =vcounts_h_mag
self.max, self.min =vcounts_h_mag.max(), vcounts_h_mag.min()
truncated_h_mag=Zcc.truncated_data( data =self.h_mag_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_h_mag )}
[docs]class Hphz (object):
"""
H-field phase (milliRadians)
Arguments
---------
**h_phz_array** : ndarray
E_phase data field
**to_degree** : bool
put the Hphz value on degree .
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of Emag column unit : mrad
max float maximum E-Field magnitude unit :mV/km*Amp
loc dict location of E-phase value at the station
lambda , e.g loc['S43] show the
current E-phase data at the stationS43
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Data_section
>>> path=os.path.join(os.environ["pyCSAMT"],
'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> hphz_obj =avg_obj.Data_section.Ephz.loc['43']
... print(hphz_obj)
"""
def __init__(self, h_phz_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self.h_phz_array= h_phz_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self. h_phz_array is not None :
self._read_hphz_data ()
def _read_hphz_data (self,h_phz_array=None, number_of_frequencies=None ,
number_of_stations=None, **kwargs ):
"""
Method to read and arange data according each
station /E-phase angles
values .
Parameters
----------
* e_phz_array : ndarray, en rad
E-Field phase observed . The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey. The default is None.
* number_of_stations : int , optional
number_of stations.
The default is None.
* to_degree : bool,
compute angle todegree .
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :
self.to_degree = to_degree
if h_phz_array is not None :
self.h_phz_array=h_phz_array
if self.h_phz_array is None :
raise CSex.pyCSAMTError_inputarguments('No E-phase data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self.h_phz_array=np.array([np.float(cc) for cc in self.h_phz_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the E-phase are wrong."
" must be float or integer .")
if self.to_degree : #---> set angle to degree .
self.h_phz_array =np.apply_along_axis (
lambda hphz: hphz * 180/np.pi, 0,self.h_phz_array)
vcounts_h_phz, repeat_hphz=np.unique (
self.h_phz_array, return_counts=True)
if not np.all(repeat_hphz):
raise CSex.pyCSAMTError_value(
'Values of B-phases provided must have the same length'
' for each stations. ')
self.value =vcounts_h_phz
self.max, self.min =vcounts_h_phz.max(), vcounts_h_phz.min()
truncated_h_phz=Zcc.truncated_data( data =self.h_phz_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_h_phz )}
[docs]class Resistivity (object):
"""
based on Cagniard Resistivity (Ohm-Meters) calculation
.. see also::Cagniard, L., 1953, Basic theory of the magnetotelluric
method of geophysical prospecting: Geophysics, 18, 605–635,
*doi*: 10.1190/1.1437915.
Arguments
----------
**res_array** : str
data array of apparents resistivy calculation.
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of resistivity column - ohm.m
max float higher value of Resistivity
min float minimum value of resitivity on the area
mean float everage value of the rho
loc dict location of rho value at the station lambda
e.g - loc['S13] show the current rho data
at the station S13
Sres ndarray Astatic value corrected
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from csammtpy.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> rho_obj =avg_obj.Data_section.Resistivity.loc['S00']
>>> print(rho.loc['S00'])
"""
def __init__(self, res_array =None , number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self.res_array=res_array
self.Sres=None
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.Sres =kwargs.pop('added_astatic_rho_array', None)
self.value =None
self.min = None
self.max = None
self.loc = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.res_array is not None :
self._read_res_data ()
def _read_res_data (self,res_array =None, number_of_frequencies=None ,
number_of_stations=None, Sres=None, **kwargs ):
"""
Method to read resitivity according each station /Resistivity angles
values .
Parameters
----------
* res_array : array_like
Resistivity calculated on the field.
The *default* is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The *default* is None.
* number_of_stations : int , optional
number_of stations.
The *default* is None.
* Sres : ndarray,
Zonge Astatic rho calculated.
"""
if res_array is not None :
self.res_array= res_array
if self.res_array is None :
raise CSex.pyCSAMTError_inputarguments('No Resistivity data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
if Sres is not None :
self.Sres = np.array([np.float(res) for res in Sres])
if self.res_array.shape[0] != self.Sres.size :
raise CSex.pyCSAMTError_rho(
'Resistivity calculated & '
'Astatic_array must get the same length!.')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self.res_array =np.array([np.float(cc) for cc in self.res_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the Resistivities are wrong."
" must be float or integer .")
vcounts_res, repeat_hphz=np.unique (self.res_array,
return_counts=True)
if not np.all(repeat_hphz):
raise CSex.pyCSAMTError_value(
'Values of Rho provided must have the same length'\
' for each stations.')
self.value =vcounts_res
self.max, self.min =vcounts_res.max(), vcounts_res.min()
self.mean=self.res_array.mean()
truncated_res=Zcc.truncated_data( data =self.res_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_res )}
if self.Sres is not None :
vcounts_sres, repeat_hphz=np.unique (self.Sres, return_counts=True)
self.vSres , self.mean_Sres = vcounts_sres, self.Sres.mean()
self.max=(vcounts_res.max(),vcounts_sres.max())
self.min = (vcounts_res.min(),vcounts_sres.min())
self.mean =(self.mean, self.mean_Sres)
truncated_sres=Zcc.truncated_data( data =self.Sres,
number_of_reccurence=self.nfreq)
self.loc_Sres ={ key:value for key, value in zip(name,
truncated_sres )}
[docs]class Phase (object):
"""
Impedance phase on milliRadians can be calculate using Ephz
object and Hphz object
* **Phase** = E-phase - H-phase
Arguments
----------
**phase_array** : ndarray
array of phase values got on the field
**to_degree** : bool
put the Phase value on degree .
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of phase column
max float maximum phase value (mrad)
min float minimum Phase (mrad)
loc dict location of H-phase value at the
station lambda . e:g : loc['S00'] show the
current phase data at the station S00
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> phs_obj =avg_obj.Data_section.Phase.loc['S00']
... print(phs.loc['S00'])
"""
def __init__(self, phase_array =None,number_of_frequencies=None ,
number_of_stations=None, **kwargs):
self._phase_array=phase_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._phase_array is not None :
self._read_res_data ()
def _read_phase_data (self,phase_array =None,number_of_frequencies=None ,
number_of_stations=None, **kwargs):
"""
method to read phase values obtained on the sites.units on mrads.
Zonge keep the phase values on mrad.
Parameters
------------
* phase_array : ndarray, optional
phase data aray on the field.
The *default* is None.
* number_of_frequencies : TYPE, optional
number of fequency used .
The *default* is None.
* number_of_stations : int, optional
number of the stations on the site.
The *default* is None.
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :
self.to_degree = to_degree
if phase_array is not None :
self._phase_array =phase_array
if self._phase_array is None :
raise CSex.pyCSAMTError_inputarguments('No Phase data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq=np.int(self.nfreq)
self.number_of_stations =np.int(self.number_of_stations)
self._phase_array=np.array([np.float(cc)
for cc in self._phase_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the Phase are wrong."\
" must be float or integer .")
if self.to_degree : #---> set angle to degree .
self._phase_array =np.apply_along_axis (
lambda phz: phz * 180/np.pi, 0,self._phase_array)
vcounts_phz, repeat_phz=np.unique (
self._phase_array, return_counts=True)
self.value =vcounts_phz
self.max, self.min =vcounts_phz.max(), vcounts_phz.min()
truncated_phz=Zcc.truncated_data( data =self._phase_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_phz )}
[docs]class Z_Tensor(object):
"""
Impedance Tensor Z Calculation :
class can recompute the apparent resistity rho base on impedance Tensor Z.
.. seealso::
Zonge, K.L. and Hughes, L.J., 1991, Controlled source audio-frequency
magnetotellurics,in Electromagnetic Methods in Applied Geophysics, ed.
Nabighian, M.N., Vol. 2,Society of Exploration Geophysicists, pp. 713-809.
Arguments
----------
**data_fn** : str
path to avg filename.
================ =========== =========== ===============================
Attributes Type units Explanation
================ =========== =========== ===============================
Z np.ndarray V/m Impedance tensor
real dict V/m real part of Z
imag dict imaginary part of Z
complex dict complex Z on complex type
rho dict ohm.m Resistivity
loc dict location of Z value at
the station lambda .
e:g loc['S00] show the
current Z data at station S00
================ =========== =========== ===============================
.. note:: all dict-attributes can be loc with the station name .
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg impor Z_tensor
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> z_obj=Z_Tensor(path)
>>> z_z=Zxy.Z
>>> print(z_obj.real["S00"])
>>> print(z_obj.imag["S00"])
>>> print(z_obj.complex["S00"])
>>> print(z_obj.loc["S00"])
>>> print(z_obj.rho["S00"])
"""
def __init__(self, z_array =None , phase_array =None, freq=None, z_error=None,
**kwargs):
self._logging =csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self._z =z_array
self._phase =phase_array
self._z_err =z_error
self._freq = None
self._rho =None
self.loc=None
self.zAS =None
for key in list (kwargs.keys()):
setattr(self, key , kwargs[key])
self.z_pwgt =None
self.z_megt =None
self.z_as_labels=['Z.mag', 'Z.mwgt','Z.pwgt',
'E.wgt', 'B.wgt','Z.%err' ]
@property
def freq (self):
return self._freq
@freq.setter
def freq (self, freq_array):
if freq_array.dtype not in ['float', 'int']:
try :
self._freq =np.array([np.float(ff) for ff in freq_array])
except :
raise CSex.pyCSAMTError_frequency(
'wrong Input frequencies values.must be a float number. ')
@property
def phase (self):
return self._phase
@phase.setter
def phase(self, phz_array):
if phz_array.dtype not in ['float', 'int']:
try :self._phase =np.array([float(phz) for phz in phz_array])
except :
raise CSex.pyCSAMTError_Phase(
'Arguments phase values must be int, or float .')
@property
def z_error (self):
return self._z_err
@z_error.setter
def z_error(self, z_error_):
if z_error_.dtype not in ['float', 'int']:
try :
self._z_err =np.array([float(zz) for zz in z_error_])
except :
raise CSex.pyCSAMTError_Z(
"Error z input values are incorrects.")
@property
def z (self):
return self._z
@z.setter
def z (self, zz_array):
if zz_array.dtype == 'str':
try :
zz_array= np.array([np.float(zz) for zz in zz_array])
except :
raise CSex.pyCSAMTError_Z(
'z_impedance value must be a complex_number.')
#---> provide freq value and phase .
if zz_array.dtype in ['float', 'int']:
self._z = Zcc.rhophi2z(phase =self._phase , freq =self._freq ,
z_array=zz_array)[-1]
else : self._z= zz_array
[docs] def z_and_zerr_2rhophi(self, z_array=None , freq=None):
"""
Method to compute resistivity and phase phase
using Z_values and Zerror _values
Parameters
----------
* z_array : complex
Impedance tensor complex_number
The *default* is None.
* freq : ndarray,
frequency value.
The *default* is None.
Returns
-------
ndarray(ndarray,1)
resistivity value computed in ohm.m
ndarray (ndarray, 1),
phase angle value in degree.
"""
if z_array is not None : self.z= z_array
if freq is not None : self.freq=freq
if self.z is None or self.freq is None :
raise CSex.pyCSAMTError_Z(
'None values can not be computed. Check values !')
self.rho =np.apply_along_axis(
lambda valz : np.abs(valz)**2 * (0.2 / self.freq), 0, self.z)
#-- z is complex number , can be compute using np.angle
self.phase= np.rad2deg(np.angle(self.z))
return self.rho , self.phase
@property
def rho (self):
return self._rho
@rho.setter
def rho(self, res_array):
if res_array.dtype not in ['float', 'int']:
try : self._rho = np.array([float(res) for res in res_array])
except :
raise CSex.pyCSAMTError_rho(
'Resistivities values must be float '
'or integer , not a None type !')
[docs] def rhophi2rhoph_errors (self, res_array=None , phase_array = None,
z_error =None , freq=None ):
"""
Compute the phase and resistivities error via res_array ,
phase_array and _z error.
Parameters
----------
* res_array : ndarray ,
resistivity value in ohm.m
* phase_array : ndarray ,
phase angle value in mradians
* z_error : ndarray ,
impedance Tensor error
* freq : ndarray ,
frequency numbers in Hz
Returns
--------
ndarrays
resistivities error and phase errors values .
"""
self._logging.info('compute rho_phi to resistivities and phases_errors')
if res_array is not None : self.rho =res_array
if z_error is not None : self.z_error =z_error
if phase_array is not None : self.phase =phase_array
if freq is not None : self.freq = freq
if self.rho is None or self.phase is None or self.freq is None :
self._logging.warn(
'NoneType can not be computed. please check your data arrays.')
raise CSex.pyCSAMTError_Z(
'could note compute a Nonetype number. please check numbers.')
# compute imag part and real part of Z
if self.rho.size != self.freq.size :
raise CSex.pyCSAMTError_Z(
'Resistivity , freq and phase_array must be the same size. ')
# zz_= np.array([np.sqrt(0.2 * self._freq[ii] *
#self._rho[ii]) for ii in range (self._freq.size)])
zz_comp = Zcc.rhophi2z(phase=self.phase/1e3, freq=self.freq,
resistivity=self.rho)
self.z =zz_comp[-1]
# zz_ =np.sqrt (0.2 * self._freq * self._rho)
# z_real=(np.apply_along_axis(lambda xxr : np.cos(xxr), 0, self._phase)) * zz_
# z_imag =(np.apply_along_axis(lambda xxi: np.cos(xxi), 0, self._phase))* zz_
res_err, phase_err = Zcc.z_error2r_phi_error(z_real=zz_comp[1],
z_imag=zz_comp[2],
error=self.z_error)
return res_err , phase_err
def _read_zAS_data (self, z_abs_array =None , number_of_frequencies=None ,
number_of_stations =None ):
"""
Method to set Z_values straightforwardly from Zonge Astatic file .
Parameters
-----------
* z_array : pd.DataFrame ,
added values from astatic files.
* number_of_frequency : int ,
number of frequency used during surveys.
* number_of_stations :int ,
differents survey frequencies.
* Frequency : array_like
frequency array units in Hz
.. Notes ::
appropriate converter should be
- 1Hz --> 6.28140704,
- deg-rad --> theta(deg) *np.pi/180
- mu vaccum permitivity with scipy.constants.epsilon_0
"""
self._logging.info('Transfering Astatic Z data .')
if z_abs_array is not None :
self.zAS = z_abs_array
if self.zAS is None :
raise CSex.pyCSAMTError_inputarguments(
'No zonge Astatic data found !')
if number_of_frequencies is not None :
self.nfreq = number_of_frequencies
else :raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations= number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
if type(self.zAS) == pd.core.frame.DataFrame :
for wgt in list(self.zAS.columns):
if wgt == 'Z.mwgt' or wgt == 'E.wgt':
self.z_mwgt =self.zAS[wgt].to_numpy()
if wgt == 'Z.pwgt' or wgt =='B.wgt':
self.z_pwgt =self.zAS['Z.pwgt'].to_numpy()
if wgt == 'Z.%err':
self._z_err =self.zAS[wgt].to_numpy()
if wgt == 'Z.mag' : zabs_array =self.zAS[wgt].to_numpy()
try :
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
zabs_array=np.array([np.float(cc) for cc in zabs_array])
except :
raise CSex.pyCSAMTError_value(
"Values provided for Z-astatic are wrong."
" check your Zonge AVG file .")
self.max, self.min =zabs_array.max(), zabs_array.min()
truncated_zz=Zcc.truncated_data( data =zabs_array,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_zz )}
[docs]class pcEmag (object):
"""
Statistical variation of magnitude values from
averaged data blocks.
Standard Deviation/Average Emag (percent)
Arguments
----------
**pcEmag** : ndarray
data array of statistical variataion
of Emag value on the field.
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of pcEmag column on avgfile
max float maximum % value of pc Emag
min float minimum % value of pcEmag
loc dict location of %Ephz magnitude
value at the station lambda e:g .loc['S05']
show the current H-mag data
of station at S05.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> pcemag_obj =avg_obj.Data_section.pcEmag.loc['S05']
>>> print(pcemag_obj)
"""
def __init__(self, pc_e_mag_array=None,
number_of_frequencies=None , number_of_stations=None ,
**kwargs):
self._pcEmag =pc_e_mag_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._pcEmag is not None :
self._read_pcEmag_data ()
def _read_pcEmag_data (self,pc_e_mag_array=None, number_of_frequencies=None ,
number_of_stations=None ):
"""
Method to read and arange data according each station /Statistical
variation of E-Field magnitude values .
Parameters
----------
* pc_e_mag_array : ndarray, optional
E-Field std variation observed at each station.
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
"""
if pc_e_mag_array is not None :
self._pcEmag =pc_e_mag_array
if self._pcEmag is None :
raise CSex.pyCSAMTError_inputarguments(
'No E-mag statistical variation data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else : raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
self._pcEmag=np.array([np.float(cc) for cc in self._pcEmag])
except : raise CSex.pyCSAMTError_value(
"Values provided for the E-mag stat.variation are wrong."
" must be float or integer .")
vcounts_Emag, repeat_Emag =np.unique (self._pcEmag, return_counts=True)
self.value =vcounts_Emag
self.max, self.min =vcounts_Emag.max(), vcounts_Emag.min()
truncated_pcEmag=Zcc.truncated_data( data =self._pcEmag,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_pcEmag)}
[docs]class sEphz(object) :
"""
Statistical variation of the data blocks averaged for this data point.
100 * Standard Deviation of Ephz values (milliradians)
Arguments
----------
**sEphz** : ndarray,
data array of statistical variation of
Ephase value on the field.
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of sHphz column on avgfile
max float maximum % value of sHphz
min float minimum value of sHphz
loc dict location of sHphz magnitude
value at the station lambda
e.g : loc['S44'] show the
current sHphz data of station at S44
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> shphz_obj =avg_obj.Data_section.sHphz.loc['S44']
... print(shphz_obj)
"""
def __init__(self, sEphz_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self._sEphz= sEphz_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._sEphz is not None :
self._read_sEphz_data ()
def _read_sEphz_data (self, sEphz_array=None, number_of_frequencies=None ,
number_of_stations=None, **kwargs ):
"""
Method to read and arange data according each station /Statistical
variation of E-Field angles values
Parameters
----------
* shphz_array : ndarray,
en rad E-Field phase observed
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
* to_degree : bool,
compute angle todegree .
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :
self.to_degree = to_degree
if sEphz_array is not None : self._sEphz =sEphz_array
if self._sEphz is None :
raise CSex.pyCSAMTError_inputarguments(
'No stat.variation B-field-phase data found !')
if number_of_frequencies is not None :
self.nfreq = number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
def _inspect_input (input_obj , to_degree=False):
"""
Ascertain that the value is possible for computing. and put on
ndarray (ndarray,1)
"""
try :
input_obj =np.array([float(ss) for ss in input_obj])
except :
raise CSex.pyCSAMTError_value(
"Values provided for computing are wrong."
" must be float or integer .")
if to_degree : input_obj =np.apply_along_axis (
lambda x: x * 180/np.pi, 0, input_obj)
return input_obj
self._sEphz= _inspect_input(self._sEphz)
if self.to_degree : #---> set angle to degree .
self._sEphz =_inspect_input(input_obj =self._sEphz,
to_degree =True)
# --> check the maximum angle value and minimum angle value.
vcounts_sEphz, repeat_hphz=np.unique (self._sEphz,
return_counts=True)
self.value =vcounts_sEphz
self.max, self.min =vcounts_sEphz.max(), vcounts_sEphz.min()
#---> tuncated AVG sHphz value on dictionnary for easy acces .
truncated_sEphz=Zcc.truncated_data( data =self._sEphz,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_sEphz)}
[docs]class pcHmag (object):
"""
Statistical variation of magnitude values from averaged data blocks.
Standard Deviation / Average Hmag (percent)
Arguments :
----------
**pcHmag** : ndarray
data array of statistical variataion
of Hmag value on the field.
================ =========== ============================================
Attributes Type Explanation
================= =========== =============================================
value nd.array data of pcHmag column on avgfile
max float maximum % value of pc Hmag
min float minimum % value of pcHmag
loc dict location of %Hphz magnitude
value at the station lambda
e:g . loc['S05'] show the current H-mag
data of station at S05.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> pchmag_obj =avg_obj.Data_section.pcHmag.loc['S05']
>>> print(pchmag_obj)
"""
def __init__(self, pc_h_mag_array=None,
number_of_frequencies=None , number_of_stations=None , **kwargs):
self._pcHmag =pc_h_mag_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._pcHmag is not None :
self._read_pcHmag_data ()
def _read_pcHmag_data (self,pc_h_mag_array=None, number_of_frequencies=None ,
number_of_stations=None ):
"""
Method to read and arange data according each station /Statistical
variation of B-Field magnitude values .
Parameters
----------
* pc_h_mag_array : ndarray, optional
E-Field std variation observed at each station.
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
"""
if pc_h_mag_array is not None :
self._pcHmag =pc_h_mag_array
if self._pcHmag is None :
raise CSex.pyCSAMTError_inputarguments(
'No B-mag statistical variation data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq , self.number_of_stations =np.int(self.nfreq), \
np.int(self.number_of_stations)
self._pcHmag=np.array([np.float(cc) for cc in self._pcHmag])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the B-mag stat.variation are wrong."
" must be float or integer .")
vcounts_Hmag, repeat_Hmag =np.unique (self._pcHmag, return_counts=True)
self.value =vcounts_Hmag
self.max, self.min =vcounts_Hmag.max(), vcounts_Hmag.min()
truncated_pcHmag=Zcc.truncated_data( data =self._pcHmag,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_pcHmag)}
[docs]class sHphz(object) :
"""
Statistical variation of the data blocks averaged for this data point.
100 * Standard Deviation of Hphz values (milliradians)
Arguments
----------
**shphz** : ndarray
data array of statistical variation of Emag
value on the field.
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value nd.array data of sHphz column on avgfile
max float maximum % value of sHphz
min float minimum value of sHphz
loc dict location of sHphz magnitude
value at the station lambda.
e:g : loc['S43'] show the
current sHphz data of station at S43.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg impor Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> shphz_obj =avg_obj.Data_section.sHphz.loc['S43']
>>> print(shphz_obj)
"""
def __init__(self, shphz_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self._sHphz= shphz_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._sHphz is not None :
self._read_sHphz_data ()
def _read_sHphz_data (self, sHphz_array=None, number_of_frequencies=None ,
number_of_stations=None, **kwargs ):
"""
Method to read and arange data according each station /Statistical variation
of B-Field angles.
values .
Parameters
----------
* shphz_array : ndarray, en rad
E-Field phase observed .
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations. The default is None.
* to_degree : bool,
compute angle todegree .
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :
self.to_degree = to_degree
if sHphz_array is not None : self._sHphz =sHphz_array
if self._sHphz is None :
raise CSex.pyCSAMTError_inputarguments(
'No stat.variation B-field-phase data found !')
if number_of_frequencies is not None :
self.nfreq = number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
def _inspect_input (input_obj , to_degree=False):
"""
Ascertain that the value is possible for computing. and put on
ndarray (ndarray,1)
"""
try :
input_obj =np.array([float(ss) for ss in input_obj])
except :
raise CSex.pyCSAMTError_value(
"Values provided for computing are wrong."\
" must be float or integer .")
if to_degree :
input_obj = np.apply_along_axis (
lambda x: x * 180/np.pi, 0, input_obj)
return input_obj
self._sHphz= _inspect_input(self._sHphz)
if self.to_degree : #---> set angle to degree .
self._sHphz =_inspect_input(input_obj =self._sHphz, to_degree =True)
# --> check the maximum angle value and minimum angle value.
vcounts_shphz, repeat_hphz=np.unique (self._sHphz, return_counts=True)
self.value =vcounts_shphz
self.max, self.min =vcounts_shphz.max(), vcounts_shphz.min()
#---> tuncated AVG sHphz value on dictionnary for easy acces .
truncated_shphz=Zcc.truncated_data( data =self._sHphz,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_shphz)}
[docs]class pcRho (object):
"""
Statistical variation of magnitude values
from averaged data blocks.
Standard Deviation / Average Rho (percent)
Arguments
----------
**pcRes** : ndarray
statistical variation averaged rho.
================ =========== ============================================
Attributes Type Explanation
================ =========== ============================================
value ndarray data of pcRho column on avgfile
max float maximum % value of pcRho
min float minimu % value of pcRho
loc dict location of %Rho magnitude
value at the station lambda .
e:g : loc['S07'] show the current H-mag
data of station at S07.
================ =========== ============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg import Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> pcrho_obj =avg_obj.Data_section.pcRho.loc['S00']
... print(pcrho.loc['S00'])
"""
def __init__(self, pcRes_array =None ,
number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self._pcRes=pcRes_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.Sres =kwargs.pop('added_astatic_rho_array', None)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._pcRes is not None :
self._read_pcRes_data ()
def _read_pcRes_data (self, pcRes_array =None, number_of_frequencies=None ,
number_of_stations=None, Sres=None, **kwargs ):
"""
Method to read resitivity according each station /standard deviation *100
for resistivity values .
Parameters
----------
* pcRes_array: ndarray, en ohm.m
Resistivity calculated on the field.
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations.
The default is None.
"""
if pcRes_array is not None :
self._pcRes= pcRes_array
if self._pcRes is None :
raise CSex.pyCSAMTError_inputarguments('No Resistivity data found !')
if number_of_frequencies is not None :
self.nfreq =number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
try :
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
self._pcRes =np.array([np.float(cc) for cc in self._pcRes])
except :
raise CSex.pyCSAMTError_value(
"Values provided for the Resistivities are wrong."
" must be float or integer .")
vcounts_pcres, repeat_hphz=np.unique (self._pcRes, return_counts=True)
if not np.all(repeat_hphz):
raise CSex.pyCSAMTError_value(
'Values of Rho provided must have the same length'
' for each stations. ')
self.value =vcounts_pcres
self.max, self.min =vcounts_pcres.max(), vcounts_pcres.min()
self.mean=self._pcRes.mean()
truncated_pcres=Zcc.truncated_data( data =self._pcRes,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_pcres )}
[docs]class sPhz(object) :
"""
Statistical variation of the data blocks averaged for this data point.
100 * Standard Deviation of Phase values (milliradians)
Arguments
----------
**sphz array** : array_like
coefficient of phase variation arrqy
**path to avg**:str
full path to filename
Holds the following informations:
================ ========== =============================================
Attributes Type Explanation
================ ========== =============================================
value nd.array data of sPhz column on avgfile
max float maximum % value of sHphz
min float minimu value of sHphz
loc dict location of sPhz magnitude value at
the station lambda .
e:g - loc['S07'] show the current sPhz data
of station at S07.
================ ========== =============================================
More attributes can be added by inputing a key word dictionary
:Example:
>>> from pycsamt.ff.core.avg impor Avg
>>> path=os.path.join(os.environ["pyCSAMT"],
... 'pycsamt', "data", "K1.AVG")
>>> avg_obj=Avg(path)
>>> shphz_obj =avg_obj.Data_section.sHphz.loc['43']
... print(shphz_obj)
"""
def __init__(self, sPhase_array=None, number_of_frequencies=None ,
number_of_stations=None , **kwargs):
self._sPhs= sPhase_array
self.number_of_stations =kwargs.pop("number_of_stations", None)
self.nfreq =kwargs.pop('number_of_frequencies', None)
self.to_degree =kwargs.pop('to_degree', False)
self.value =None
self.min = None
self.max = None
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self._sPhs is not None :
self._read_sPhs_data ()
def _read_sPhz_data (self, sPhase_array=None, number_of_frequencies=None ,
number_of_stations=None, **kwargs ):
"""
Method to read and arange data according each station /Statistical
variation of zPhase -Field angles.
Parameters
------------
* shphz_array : ndarray, en rad
E-Field phase observed .
The default is None.
* number_of_frequencies : int , optional
number of frequencies for survey.
The default is None.
* number_of_stations : int , optional
number_of stations.
The default is None.
* to_degree : bool,
compute angle todegree .
"""
to_degree =kwargs.pop("to_degree", False)
if to_degree :self.to_degree = to_degree
if sPhase_array is not None : self._sPhs =sPhase_array
if self._sPhs is None :
raise CSex.pyCSAMTError_inputarguments(
'No stat.variation Impedance Z -phase data found !')
if number_of_frequencies is not None :
self.nfreq = number_of_frequencies
else :
raise CSex.pyCSAMTError_inputarguments(
"Please specify the number of frequency !")
if number_of_stations is not None :
self.number_of_stations =number_of_stations
else :
raise CSex.pyCSAMTError_inputarguments(
'Please specify the number of stations')
self.nfreq , self.number_of_stations =np.int(self.nfreq),\
np.int(self.number_of_stations)
def _inspect_input (input_obj , to_degree=False):
"""
Ascertain that the value is possible for computing. and put on
ndarray (ndarray,1) and convert angle as possible to degree.
"""
try :
input_obj =np.array([float(ss) for ss in input_obj])
except :
raise CSex.pyCSAMTError_value(
"Values provided for computing are wrong."\
" must be float or integer .")
if to_degree :
input_obj =np.apply_along_axis (
lambda x: x * 180/np.pi, 0,input_obj)
return input_obj
self._sPhs=_inspect_input(self._sPhs)
if self.to_degree : #---> set angle to degree .
self._sPhs =_inspect_input(input_obj =self._sPhs, to_degree =True)
# --> check the maximum angle value and minimum angle value.
vcounts_sphz, repeat_hphz=np.unique (self._sPhs, return_counts=True)
self.value =vcounts_sphz
self.max, self.min =vcounts_sphz.max(), vcounts_sphz.min()
#---> tuncated AVG sHphz value on dictionnary for easy acces .
truncated_sphz=Zcc.truncated_data( data =self._sPhs,
number_of_reccurence=self.nfreq)
name, polyname = Zcc._numbering_station(
number_of_station=self.number_of_stations,
number_of_freq =self.nfreq)
self.loc ={ key:value for key, value in zip(name, truncated_sphz)}