# -*- coding: utf-8 -*-
"""
===============================================================================
Copyright © 2021 Kouadio K.Laurent
This file is part of pyCSAMT.
pyCSAMT is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyCSAMT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with pyCSAMT. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
.. _module-geodrill::`pycsamt.geodrill.geoCore.geodrill`
:synopsis: Deal with geological strata , geological structural info and
geological dataBase. Module to deal with inversion file ,and
geologica data and Borehole data Can read , create an output file
directly use by Oasis Montaj of Geosoft corporation and Goldern Software
Can also build borehode data andgenerate a report of survey location
Deal with geostrata module to incorporate conventional
geological codes , pattern and colors.
Created on Sat Sep 19 12:37:42 2020
@author: Daniel03
"""
from __future__ import division
import os
import copy
import warnings
import datetime
import numpy as np
import pandas as pd
import scipy as sp
import pycsamt.utils.exceptions as CSex
from pycsamt.modeling import occam2d
# from pycsamt.utils.avg_utils import ReadFile as rfi
from pycsamt.ff.core.cs import Profile
from pycsamt.utils import func_utils as func
from pycsamt.utils import plot_utils as punc
from pycsamt.viewer.mpldecorator import geoplot2d
from pycsamt.utils.decorator import deprecated
from pycsamt.utils import Agso
from pycsamt.geodrill.geoCore import structural as STRL
from pycsamt.geodrill.geoDB.sql_recorder import GeoDataBase
try :
from pycsamt.utils._csamtpylog import csamtpylog
_logger=csamtpylog.get_csamtpy_logger(__name__)
except :
pass
try :
import scipy.stats as spSTAT
scipy_version = [int(vers) for vers in sp.__version__.split('.')]
if scipy_version [0] == 1 :
if scipy_version [1] < 4 :
warnings.warn('Note: need scipy version 1.4.0 or more . '
'It may probably get a trouble when import "stats" attribute'
'under such version.It may probably move from ', ImportWarning)
_logger.warning('Note: need scipy version 0.14.0 or higher'
' or for stats.linearegress. Under such version'
'it might not work.')
# from sp import stats
stats_import =True
except :
warnings.warn('Could not find scipy.stats, cannot use method linearregression'
'check installation you can get scipy from scipy.org.')
_logger.warning('Could not find scipy.stats, cannot use method linearegress'
'check installation you can get scipy from scipy.org.')
stats_import =False
[docs]class Geodrill (object):
"""
Class to manage data from Occam2D and Model so to create section of each
sites and it depth.
Each station constitues an attribute framed by two closest
point of station offsets from model resistivities.
Deal with True resistivities get on the survey or with others firms.
In fact , input truth resistivities values into our model , produce an accuracy
underground map.The challenge to build pseudolog allow to know how layers are
disposal in underground so to emphasize the large conductive zone especially
in the case of groundwater exploration. Program works in combinaison with
geophysic data especially Occam 2D inversion data, and geological data.
Actually the program deal only with Occam 2D inversion files or Bo Yang
(x,y,z) files. We intend to extend later with other external softares but
can generate output directly see use with Golder sofware('surfer'). If user
has a golder software installed on its computer , It can use output files
generated here to produce 2D map so to compare both maps to see how far
is the difference between Model map and detail-sequences map )
"pseudosequences model" could match better the reality of underground).
Details sequences map is most closest to the reality When {step descent}
parameter is not too small at all. Indeed True geological data allow
to harmonize the value of resistivity produced by Occam2D model so to force
the pogramm to make a correlation between data fromtruth layers and the model
values.
Arguments
----------
**model_fn** : str,
full path to Occam model file .
**iter_fn** : str,
full path to occam iteration file
**data_fn** : str,
full path to occam_data file
**input_resistivities** : array_like,
Truth values of resistivities
**step_descent**: float,
step to enforce the model resistivities to
keep the truth layers values as reference
Step to cut out data and to force resistivites
calcualted to match the reference data as input
resistivities if not provided the step will be
20% of D0I
**input_layers** : array_like,
True input_layers names : geological
informations of encountered layers
============= =========== ===============================================
Attributes Type Explanation
============= =========== ===============================================
doi str depth of investigation might
be float or str like "1km" =1000
depth_scale str scale of imaging depth can be
`km` or `m`. Default is`m`
lc_AD_curves dict customize line color of average curve
and details sequences
elevation array_like elevation of survey area
iter2dat_fn str full path to Bo Yang (x, y, z) model_file
bln_file str full path to station location file additional
file issue from Bo Yang (x, y, z file)
============= =========== ===============================================
.. note:: In this module, all resistivites are in ohm.meter not in log10.
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import Geodrill
>>> path =os.path.join(os.environ ['pyCSAMT'],
... 'data', 'occam2D')
>>> geo_obj = Geodrill( input_resistivities=[300, 500,
... 1000, 2000, 4000, 6000],
... input_layers =['alluvium',
... 'amphibolite','altered rock',
... 'augen gneiss',
... 'granite'],
... mesh_fn=os.path.join(path, 'Occam2DMesh')
... iter_fn = os.path.join(path, 'ITER17.iter'),
... model_fn =os.path.join(path, 'Occam2DModel') ,
... data_fn =os.path.join(path, 'OccamDataFile.dat'),
... doi='1km',
... step_descent=200.,
... )
>>> geo_obj.geo_build_strata_logs()
>>> geo_obj.geo_name_S01
>>> geo_obj.geo_name_S00
>>> geo_obj.geo_name_S46
>>> geo_obj.geo_d
>>> geo_obj.geo_drr
>>> geo_obj.geo_daver
>>> geo_obj.geo_depth
>>> geo_obj.geo_dstep_descent
"""
geo_rocks_properties={
"basement rocks" : [1e99,1e6 ],
"igneous rocks": [1e6, 1e3],
"duricrust" : [5.1e3 , 5.1e2],
"gravel/sand" : [1e4 , 7.943e0],
"conglomerate" : [1e4 , 8.913e1],
"dolomite/limestone" : [1e5 , 1e3],
"permafrost" : [1e5 , 4.169e2],
"metamorphic rocks" : [5.1e2 , 1e1],
"tills" : [8.1e2 , 8.512e1],
"standstone conglomerate" : [1e4 , 8.318e1],
"lignite/coal": [7.762e2 , 1e1],
"shale" : [5.012e1 , 3.20e1],
"clay" : [1e2 , 5.012e1],
"saprolite" : [6.310e2 , 3.020e1],
"sedimentary rocks": [1e4 , 1e0],
"fresh water" : [3.1e2 ,1e0],
"salt water" : [1e0 , 1.41e0],
"massive sulphide" : [1e0 , 1e-2],
"sea water" : [1.231e-1 ,1e-1],
"ore minerals" : [1e0 , 1e-4],
"graphite" : [3.1623e-2, 3.162e-3]
}
geo_params =['model_x_nodes', 'model_z_nodes', 'model_res', 'station_names',
'station_location', 'elevation']
geoparams =['geo_offsets', 'geo_depth', 'geo_res', 'geo_name',
'geo_location', 'geo_elevation']
def __init__(self, iter2dat_fn=None , model_fn =None , data_fn=None ,
iter_fn=None , mesh_fn=None , bln_fn =None , **kwargs):
self._logging = csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self.iter2dat_fn = iter2dat_fn
self.data_fn =data_fn
self.iter_fn =iter_fn
self.mesh_fn =mesh_fn
self.model_fn =model_fn
self.bln_fn =bln_fn
self.input_layers =kwargs.pop('input_layers', None)
self.input_resistivities =kwargs.pop('input_resistivities', None)
self.doi =kwargs.pop('doi', '1km')
self.elevation =kwargs.pop('elevation', None)
self.etacoeff = kwargs.pop('etaCoef', 5)
self.step_descent =kwargs.pop('step_descent', None)
self.model_res =None
for key in self.geo_params :
setattr(self, key, None)
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.model_fn is not None and self.data_fn is not None\
or self.iter2dat_fn is not None :
self.set_geodata()
[docs] def set_geodata(self,iter2dat_fn =None , model_fn=None , data_fn=None ,
iter_fn=None , mesh_fn=None, **kwargs):
"""
Readmodel data and collected from each site its value from surface to depth
1. Read with Occam 2D outputs files
:Example:
>>> form pycsamt.geodrill import Geodrill
>>> path_occam2d = os.path.join(os.environ ['pyCSAMT'],
... 'data', 'occam2D')
>>> path_i2d =os.path.join(os.environ ['pyCSAMT'],
... 'data', '_iter2dat_2')
>>> geo_obj =Geodrill(
... model_fn = os.path.join(path_occam2d,'Occam2DModel'),
... mesh_fn = os.path.join(path_occam2d, 'Occam2DMesh'),
... iter_fn = os.path.join(path_occam2d, 'ITER17.iter'),
... data_fn = os.path.join(path_occam2d, 'OccamDataFile.dat'))
2. Read with only iter2dat file and bln file
:Example:
>>> form pycsamt.geodrill import Geodrill
>>> geo_obj =Geodrill(iter2dat_fn = os.path.join(path_i2d, 'K1.iter.dat'),
... bln_fn = os.path.join(path_i2d, 'K1.bln'),)
>>> geo_model_off = geo_obj.model_x_nodes
>>> geo_model_res= geo_obj.model_res
>>> geoS01 = geo_obj.geo_name_S01
>>> geoS00= geo_obj.geo_name_S00
>>> geoS46=geo_obj.geo_name_S46
"""
print('{0:-^77}'.format('GeoDrill *Data* info'))
def frame_each_site_into_3_offsets(site_name, model_res,
model_offset, site_offset):
"""
Fonction to frame each site into 3 offsets and put array
:param site_names: name of site , eg S01
:type site_names: str
:param model_res: ndarray(depth, station_offset) resistivity
:type model_res: ndarray(depth, station_offset)
:param model_offset: the offset generated by mesh
:type model_offset: array_like
:site _offset: offset of that site : 15.0 m
:site _offset: float
"""
# fist check if site offset in in model offset
model_offset=np.array([np.around(ss, 2) for ss in model_offset])
site_offset =round(site_offset,2)
if site_offset in model_offset :
# get new index
new_index = int(np.where(model_offset==site_offset)[0])
new_off = model_offset[new_index]
elif site_offset not in model_offset :
if site_offset > model_offset.max (): # the case where
#offset is much larger than the mesh (very rare case )
new_index = int( np.where(model_offset==model_offset.max())[0])
new_off = model_offset.max ()
else :
for ii, di in enumerate(model_offset):
if di > site_offset : # if value of model is
#greater than check the closest site offset
dx0 = round(abs(site_offset -di ),2) # compute the
#distance betewn di and offset (dxmax )
# try co compute the previous and the next point distance
# use try because sometimes , previous point is none
#if model value is at index 0
# if model value at index 0 , then dx0 = dmin
try : dxmin =abs(model_offset[ii-1]-site_offset)
except : dxmin =round(dx0 , 2)
if dx0 <= dx0 :
new_off , new_index = di, ii
break
elif dxmin < dx0 : # take the preious
new_off, new_index = model_offset[ii-1], ii-1
break
# now get the list of resistivity between this index
if new_off == model_offset[0] : # the first site index then take ii+1, ii+2
dm0 = np.concatenate ((model_res[:,new_index].reshape(
model_res[:, new_index].shape[0], 1),
model_res[:,new_index+1].reshape(
model_res[:, new_index+1].shape[0], 1),
model_res[:,new_index +2 ].reshape(
model_res[:, new_index+2].shape[0], 1)),
axis =1)
elif new_off == model_offset[-1] :
dm0 = np.concatenate ((model_res[:,new_index-2].reshape(
model_res[:, new_index-2].shape[0], 1),
model_res[:,new_index-1].reshape(
model_res[:, new_index-1].shape[0], 1),
model_res[:,new_index ].reshape(
model_res[:, new_index].shape[0], 1)),
axis =1)
else :
dm0 = np.concatenate ((model_res[:,new_index-1].reshape(
model_res[:, new_index-1].shape[0], 1),
model_res[:,new_index ].reshape(
model_res[:, new_index].shape[0], 1),
model_res[:,new_index +1].reshape(
model_res[:, new_index+1].shape[0], 1)),
axis =1)
return site_name, dm0
# ----> attributes statements
if model_fn is not None : self.model_fn =model_fn
if data_fn is not None : self.data_fn =data_fn
if iter_fn is not None : self.iter_fn =iter_fn
if mesh_fn is not None : self.mesh_fn =mesh_fn
if iter2dat_fn is not None : self.iter2dat_fn = iter2dat_fn
# then assert all input files
if self.iter2dat_fn is not None :
self._logging.info ('Read Geodata from Bo Yang Iter2Dat files ')
if self.bln_fn is None :
mess =''.join(['No (*bln) station file found. Need sites',
' names and sites locations.',
' Use mode "Iter2Dat" :',
'<from pycsamt.modeling.occam2d import Iter2Dat>',
' : to write a *bln file ',
' use a Profile module :',
' < from pycsamt.ff.core.cs import Profile > .',
' You can also use occam2D output files ',
'like model, mesh, data, and iteration files.'])
warnings.warn('! Error reading *.bln flile !' + mess)
self._logging.error(mess)
raise CSex.pyCSAMTError_geodrill_inputarguments(
'! Error reading *.bln flile ! No (*bln) station file found.'
' Need sites names and sites locations.')
i2d_obj = occam2d.Iter2Dat(iter2dat_fn=self.iter2dat_fn,
bln_fn = self.bln_fn)
self.model_x_nodes= i2d_obj.model_x_nodes
self.model_z_nodes = i2d_obj.model_z_nodes
self.model_res = i2d_obj.model_res
self.station_names = i2d_obj.station_names
self.station_location = i2d_obj.station_location
else :
self._logging.info ('Read Geodata from Occam2D files ')
# check if all attributes are not None
for attr , ins in zip([
'model_fn', 'iter_fn', 'data_fn', 'mesh_fn'],
[self.model_fn, self.iter_fn, self.data_fn, self.mesh_fn]):
if ins ==None :
msg =' !No {0} file found ! Please provide the Occam 2D {0} file'.\
format(str(attr).replace('_fn', ''))
self._logging.error(msg)
warnings.warn(msg)
raise CSex.pyCSAMTError_plot_geoinputargument(msg)
# Recreate object and get ncessaries attributes
model_obj= occam2d.Model(model_fn =self.model_fn,
iter_fn =self.iter_fn,
mesh_fn =self.mesh_fn )
data_obj =occam2d.Data(data_fn= self.data_fn)
# then get important attributes and re
self.model_x_nodes =model_obj.model_station_offsets
self.model_z_nodes =model_obj.model_depth_offsets
self.model_res = model_obj.model_resistivity
# get from data sites names and station location and elevation
self.station_names =data_obj.data_sites
self.station_location = data_obj.data_offsets
#get rms and roughness value
self.model_rms = model_obj.model_rms
self.model_roughness =model_obj.model_roughness
# build for each site an array of three values framed into the main sites
# get the the offset of the site
# build a dictionnary of geoname
#------Mange DOI (investigation depth) --------
# check the depth of investigation and ge the new_nodel matrix
if self.doi is None : self.doi = 1000.
elif self.doi is not None :
self.doi =punc.depth_of_investigation(self.doi) # return meter value
# for consistency let check how the depth is ranged :
#Depth is minimum to max depth , if not flip data
if self.model_z_nodes [0] > self.model_z_nodes[-1]:
self.model_z_nodes= self.model_z_nodes[::-1]
# chech the doi data and compare to max depth data
if self.doi > self.model_z_nodes.max ():
mess='Maximum Input doi value = {0} m is larger'\
' than model depth = {1} m. Investigation'\
' depth "doi" will be resetting at = "{1}"m.'.\
format(self.doi, self.model_z_nodes.max())
# resseting doi to maximum value in model depth
self.doi = self.model_z_nodes.max()
warnings.warn(mess)
self._logging.debug (mess)
if self.doi in self.model_z_nodes:
dep_index = int(np.where(self.model_z_nodes== self.doi)[0])
if self.doi == self.model_z_nodes[-1]: # if doi is the max depth ,
self.__setattr__('geo_depth', self.model_z_nodes)
# resize model _resistivity
elif self.doi < self.model_z_nodes[-1]:
self.model_res = np.resize(self.model_res, (
dep_index +1 , self.model_res.shape[1]))
# resize the large depth to doi max
self.__setattr__('geo_depth', self.model_z_nodes[:dep_index+1])
print('---> resetting model doi to = {0} m depth !'.format(self.doi))
else :
for index, dep in enumerate(self.model_z_nodes) :
# seek the depth index that match better the input
if dep > self.doi : #doi 1014>1000 :index=23,
dep_index = index-1 # get the previous index dep_index = 22
self.model_res = np.resize(self.model_res, (
index, self.model_res.shape[1]))
self.doi = self.model_z_nodes[dep_index]
print('---> resetting model doi to ={0} m depth !'.format(
self.doi))
self.__setattr__('geo_depth', self.model_z_nodes[:index])
break
# build geo_station_names and geodict rho
self.geo_d ={}
if isinstance(self.station_names , (tuple, np.ndarray)):
self.station_names=self.station_names.tolist()
for name in self.station_names :
site_offset_value = self.station_location[
self.station_names.index(name)]
geo_name , geodat = frame_each_site_into_3_offsets(
site_name= name,
model_res = self.model_res,
model_offset=self.model_x_nodes,
site_offset=site_offset_value)
self.__setattr__('geo_name_{0}'.format(geo_name), geodat)
# convert resistivities value from log10 rho to ohm meter
self.geo_d[name]= np.power(10, getattr(
self, 'geo_name_{0}'.format(geo_name))) #
# self.geo_dict[name]= getattr(self, 'geo_name_{0}'.format(geo_name))
[docs] def geo_replace_rho (self, input_resistivity_range =None,
input_layers=None, **kwargs ):
"""
Allow ro replace the calculated resistivities from model
to real resistivites obtained on survey area with other companies.
The accuracy depend the significant values of input resistivites.
More input resitivities are,more accuracy in the design of underground model
geostratigrapyhy model should be.
:param input_resistivity_range: an array of resistivity on the site
:type input_resistivity_range: array_like
:param input_layers: layers_names , eg .`granite`, `fault`, `river `
:param input_layers: list or arrays
:param depth_range: array _of depth of specific layer
:type depth_range: array _like
Example of table of geoinformation collected on the field somewhere
==================== ==================== ===========================
Structure Rho mean value (Ω.m) Rho range (Ω.m)
==================== ==================== ===========================
Granite 2000 1000-3000
Fault fracture zone 120 60-180
River water 68 66-70
==================== ==================== ===========================
"""
if self.iter2dat_fn is None or self.model_fn is None :
self.set_geodata() # loead attribute
def find_and_replace_rho(rowlines, value_range):
"""
Small function replacement function, the model's resistivities are
replaced by the true value of the survey. T higher the replacement is,
the more prominent the specificities of the layers become.
Parameters
------------
* rowlines : array_like ,
array of resistivities values
* value_range : array_like
array of resistivities from survey area.
Returns
---------
array_like
rowlines , new data range
"""
# for consistency convert to float value
try :
value_range= np.array([float(ss) for ss in value_range])
except :
raise CSex.pyCSAMTError_geodrill_inputarguments(
'Value provided as resistivity '
'range must be an array of float number.')
tem=[] # loop the value range
for mm in value_range : # substract rowlines from
op = rowlines-mm #value range to seek the minimum close
op=np.array([abs(aj) for aj in op]) # keep absolute value for difference
tem.append(op) # keep it on temporray list
ts = func.concat_array_from_list(tem) # concat list to axis =0 order
for ii in range(len(rowlines)): # loop the rowlines line now
us=ts[:,ii].argmin() # keep minimum index
rowlines[ii]=value_range[us] # change the resistivity values
return rowlines
def ascertain_input_layers(input_layers, input_rho):
"""
Function to assert the length of input layers and the
input resistivities to avoid miscomputation .
Parameters
-----------
* input_layers : array_like |list,
list of input
* input_rho : list ,
layer list of resistivities provided
Returns
-------
list
new list of input layers
"""
# for consistency put on string if user provide a digit
ilay =[str(ly) for ly in input_layers]
if len(input_rho) ==len(ilay):
return ilay
#get the last value of resistivities to find
elif len(input_rho) > len(ilay): #the structres names ans structures
sec_res = input_rho[len(ilay):] # sresistivities
geos =Geodrill.get_structure(resistivities_range=sec_res)
if len(geos)>1 : tm = 's'
else :tm =''
print('---> !We added other {0} geological '
'structure{1}. You may ignore it.'.format(len(geos), tm))
ilay.extend(geos) # then , extend the list
return ilay
elif len(ilay) > len(input_rho):
ilay = ilay[:len(input_rho)] # truncated the list
return ilay
# if range of resistivity is provided then use it for average
if input_resistivity_range is not None :
self.input_resistivities = input_resistivity_range
if self.input_resistivities is None and self.input_layers is not None :
warnings.warn(
' !Without any input resistivities values,'
' we can not set only your layer names.'
' Bring more details about your layers by adding their '
'corresponding resistivities values.')
# print('-->!')
#abort the input layers by renitializing to NoneType values
self.input_layers=None
auto_mess='' # keep automatic message
if self.input_resistivities is None :
if self.input_layers is None :
n_layers =7. # number of slices layer juged by the program 7
else : n_layers =len(self.input_layers)
mess="".join([
"Resistivity range is not provided . Sites Depth will",
" be cout out into {0} slices as possible layers ",
" below the site. If the number of slices doesnt",
" suit the purpose , please change the number ",
" of slices using argument <input_layers> ",
"to provided the real layer's names."])
warnings.warn(mess.format(int( n_layers)))
self._logging.info(mess.format(int( n_layers)))
# get the model resistivities minimum and maximum from selected doi,
# it is much betterthan to select min max into the
#global resistivities models.
minmax=[(res_values.min(), res_values.max())
for stn, res_values in self.geo_d.items()] #
maxres= max([mm[1] for mm in minmax])
minres= min([mm[0] for mm in minmax])
self.input_resistivities =np.linspace(
minres, maxres, int( n_layers))
self.input_resistivities = np.around(
self.input_resistivities,2)
auto_mess ='Automatic'
if isinstance (self.input_resistivities , (tuple, list)):
self.input_resistivities = np.array(self.input_resistivities )
elif isinstance(self.input_resistivities , (float, int, str)):
try :
self.input_resistivities = float(self.input_resistivities )
except :
raise CSex.pyCSAMTError_plot_geoinputargument(
'Can not converted value <%s> into float number.'\
' Please provide a float number.'%self.input_resistivities )
# Display infos
print('**{0:<37} {1} {2}'.format('{0} Layers sliced'.format(
auto_mess ), '=' , len(self.input_resistivities )))
print('**{0:<37} {1} {2} (Ω.m)'.format('{0} Rho range'.format(
auto_mess ),'=' , tuple(self.input_resistivities ) ))
print('**{0:<37} {1} {2} {3}'.format(
' Minimum rho ','=' ,self.input_resistivities.min(), 'Ω.m' ))
print('**{0:<37} {1} {2} {3}'.format(' Maximum rho ',
'=' ,
self.input_resistivities.max(),
'Ω.m' ))
# so to get the structures for each input resistivities
# ascertain unput layers first if no misleading value is inputted
if self.input_layers is not None :
# rebuild new list of layer by adding
#necessary strutures or substracting unecessaries structures
formations= ascertain_input_layers(
input_layers= self.input_layers,
input_rho= self.input_resistivities)
# self.depth_range =depth_range
else : formations = Geodrill.get_structure(self.input_resistivities)
self.input_layers=formations
# self.__setattr__('geo_dict_rho', nOne)
self.geo_drr=copy.deepcopy(self.geo_d)
tem =[] # dictionnary opf replacement rho
for key , geovalue in self.geo_drr.items() :
for ii, rowlines in enumerate(geovalue):
out_aver =find_and_replace_rho(rowlines= rowlines,
value_range= self.input_resistivities)
tem.append(out_aver)
self.geo_drr[key]=func.concat_array_from_list(list_of_array=tem)
tem=[]
[docs] @staticmethod
def get_structure (resistivities_range):
"""
function to get according the range of resistivities values ,
the corresponding associated geological rocks
The list of electrical properties of rocks is not exhaustive ,
can be fill by others
:param resistivities_range: array of input_resistivities
:type resistivities_range: array_like,
:returns: the list of geological structures
form go_electrical _rocks properties
:rtype: list
"""
# only single value provided than put on list.
if isinstance(resistivities_range, (float, str, int)):
try :
resistivities_range=[float(resistivities_range)]
except :
raise CSex.pyCSAMTError_geodrill_inputarguments(
'Can not convert <%s> to float number !Input resistivity '
'must be a float number not <%s>!'% (resistivities_range,
type(resistivities_range)))
# for consistency , check again input values
if isinstance(resistivities_range, (tuple, list, np.ndarray)):
try : resistivities_range =np.array(
[float(ss) for ss in resistivities_range])
except :
raise CSex.pyCSAMTError_geodrill_inputarguments(
'Input argument provided is wrong. '
'Please check your resistivities '
' range, values must be a float number.')
# in fact append the idde of resistivities values located
# so to be sure that the resistivities will match exactly the
# layer found in geo_electricl_property of rocks
geo_structures=[]
f=False
for resr in resistivities_range :
f=False
for rocks, eprops in Geodrill.geo_rocks_properties.items():
if eprops[-1]<resr < eprops[0] :
f=True
geo_structures.append(rocks.capitalize())
break
if f==False :
geo_structures.append('*! Struture not found')
return geo_structures
[docs] @staticmethod
def get_average_rho(data_array, transpose =False ):
"""
Function to average rho to one point to onother .
It show the lowest point and the maximum point averaged . Function
averaged rho value between local maximum and local minima values .
if data values of station are located on columnlines , set transpose
to True then rotate the matrix to find minima and maxima locals value
then calculated averaged rho after will return matrix transpose
as the same shape as inputted . .Defaut is **False** .
:param data_array: data of resistivities collected at the site point
:type data_array: ndarray
"""
if data_array.dtype not in ['float', 'int']:
try :
data_array= data_array.astype('float64')
except :
warnings.warn("It seems somethings wrong"
" happened during data conversion to float values.")
raise CSex.pyCSAMTError_inputarguments(
'Could not convert value'
' to float numbers , Please check your data!')
if transpose is True :
data_array =data_array.T
exem =[punc.average_rho_with_locals_minmax(rowlines)
for rowlines in data_array ]
new_data = func.concat_array_from_list(exem)
if transpose is True :
new_data = new_data.T
return new_data
[docs] def geo_build_strata_logs (self, input_resistivities=None,
input_layers =None,
step_descent = None, **kwargs):
""""
Read resistivits data got on survey area and build geological strata block.
If constrained_electrical_properties_of_rocks is False , will build a log
by considering the conventional electrical property of rocks
can be find through this link.
Parameters
------------
* input_resistivity :array_like
an array of resistivity on the site
* step_descent : float
depth value to averaged rho . Must be smaller as possible
if None , it take the 2% times the investigation depth
* input_layers : list or array
layers_names , eg `granite`, `fault`, `river`
Returns
---------
obj ,
Build stratigraphy log obj
A Sample of electrical_properties_of_rocks is below
========================= =================== =======================
Rocks Max Rho Min Rho (ohm-m)
========================= =================== =======================
igneous rocks 10^6 10^3
duricrust 5.10^3 5.10^2
gravel and sand 10^4 10^2.90(800)
conglomerate 10^4 10^1.95(90)
dolomite/limestone 10^5 10^3
permafrost 10^5 10^2.62(750)
metamorphic rocks 5.10^2 10.^1
tills 8.10^2 10^1.93(85)
standstone conglomerate 10^4 10^1.92 (80)
lignite/coal 10^2.89(790) 10^1
shales 10^1.7(50) 10^1.48(30)
clays 10^2 10^1.7(50)
saprolite 10^2.08(120) 10^1.48(30)
sedimentary rocks 10^4 10^0
fresh water 3.10^2 10^0
salt water 10^0 10^-0.15
massive sulfide 10^0 10^-2
sea water 10^-0.09(0.8) 10^-1
ore minerals 10^0 10^-4
Graphite 10^-2.5 10^-3.5
========================= =================== =======================
.. seealso:: https://www.eoas.ubc.ca/ubcgif/iag/foundations/properties/resistivity.htm
list is not exhaustive and depend of the geological formations of
survey area.
.. note:: list is not Exhaustive, use the data base script to populate
most of goeological electrical properties.
"""
etaCoef = kwargs.pop('etaCoef', None)
def get_conductive_zone (dep_array, rho_array, step_in_deep):
"""
Get a conductive zone is important for many purposes .In the case
of groundwater exploration for instance, sometimes in deeper,
because of heterogeneities of structures in underground , it is
much difficult to take some minima rho as a conductive area or
conductiv productive veification drill point. To be sure that this
point is really among a good saturated zone, it is better to get
averaged rho in some distance and to see how the layer resistivity
value goes on on this range, whether it's an effective conductive
zone or overlapping zone. In addition, fixing resistivities values
at specific distance depth allow us to detect the unsatured
zone as weel as the satured zone at the set of investigation depth
imaged. This technique allow us to build a pseudo specific strata
that could match the zone .
Parameters
----------
* dep_array : array_like
the imaged depth
* step_in_deep : float,
value to averaged resistivities in deeper
* rho_array : array_like ,
the resistivities at the sites depth {Top to bottom}
Returns
--------
array_like
rho average for each station dep_averaged for each station
.. note:: Much larger is the step , the accuracy becomes weak ,
it must be as possible 1/20 of investigation depth (doi)
:Example:
>>> import numpy as np
>>> pseudo_depth = np.arange(0, 1201.,100)
>>> print(pseudo_depth)
... dep_aver= dep(dep_array=pseudo_depth, value=100)
... print(dep_aver)
... print(pseudo_depth.shape)
... print(dep_aver.shape)
"""
step_in_deep =punc.depth_of_investigation(step_in_deep)
v,r , dm, rm=[[] for i in range(4)]
if isinstance(step_in_deep, (str, int)):
try :
step_in_deep =float(step_in_deep)
except :
raise CSex.pyCSAMTError_plot_geoinputargument(
'Could not convert depth value ={} to float.'
' Please check your value.'.format(step_in_deep))
if step_in_deep< dep_array.min():
raise CSex.pyCSAMTError_plot_geoinputargument(
'Value provided ={0} m is less than the minimum'
' depth ={1} m.'.format(step_in_deep, dep_array.min()))
if step_in_deep > dep_array.max():
raise CSex.pyCSAMTError_plot_geoinputargument(
'Value provided is = {0} m is greater than '
'maximum depth ={1}m.'.format(step_in_deep, dep_array.max()))
_init_depth =step_in_deep
for index , depth in enumerate(dep_array):
if depth <= step_in_deep : # value less than step descent
v.append(depth)
r.append(rho_array[index])
if depth > step_in_deep :
if v !=[]:
# rebuild resistivities values with rho averaged
dm.append(np.repeat(np.array(v).mean(), len(v)))
rm.append(np.repeat(np.array(r).mean(), len(r)))
step_in_deep += _init_depth
v=[depth] # initialise new list by adding
r=[rho_array[index]] #the index value greater one
if depth ==dep_array[-1]:
# it length last value ==1, means is the last value of depth
if len(v)==1 :
dm.append(dep_array[index])
rm.append(rho_array[index])
elif len(v) !=1 : # averaged the reamin rho values
dm.append(np.repeat(np.array(v).mean(), len(v)))
rm.append(np.repeat(np.array(r).mean(), len(r)))
return np.hstack(tuple(rm)),np.hstack(tuple(dm))
self.__setattr__('qc', .0) # set quality control in the data
#------ STATEMENT OF INPUT ARGUMENTS------------------
# check data files if provided (Dont need to check all )
if input_resistivities is not None :
self.input_resistivities= input_resistivities
if self.model_fn is None or self.iter2dat_fn is None :
self.geo_replace_rho()
if input_layers is not None :
# rebuild input_layers
self.input_layers = ascertain_layers_with_its_resistivities(
real_layer_names= input_layers,
real_layer_resistivities=self.input_resistivities)
# if self.etacoeff is not None :
# self.step_descent = self.doi / int(self.etacoeff)
if etaCoef is not None :
self.etacoeff = etaCoef
if step_descent is not None :
self.step_descent = step_descent
if self.step_descent is None :
self.step_descent = self.doi / int(self.etacoeff)
if self.step_descent > self.doi :
self._logging.info('Step descent is = {0} much larger than doi ={1}.'
'Value is ressetting to 20% of DOI = {2}m.'
.format(round(self.step_descent,3),
round(self.doi,3),
.2 * self.doi ))
self.step_descent = .2 * self.doi
# recompute a new block coefficient if step descent is given
if self.step_descent is not None :
self.etacoeff = int(self.doi/ self.step_descent)
# ---------------get the geodictionnary from from geodata average ----
self.geo_daver ={}
for stn, vrho in self.geo_d.items():
# transpose data so to read rowlines S01,S02 etc
self.geo_daver[stn] = Geodrill.get_average_rho( data_array= vrho,
transpose =True)
self.qc += .25
#----- set attribute of geo_dstep_descent -----------------------------
self.geo_dstep_descent={}
for stn , geo_sd_values in self.geo_d.items():
# for ii in range(3) :
mv= [get_conductive_zone(dep_array= self.geo_depth,
rho_array=geo_sd_values[:,ii],
step_in_deep= self.step_descent)[0]
for ii in range(3)] # concat three array
self.geo_dstep_descent[stn]=func.concat_array_from_list(
mv, concat_axis=1)
#----build a dict of pseudosequences layer and resistivities ----------
# build at dictionnary of strata from resistivities
self._logging.info ('Build the pseudosequences of strata.')
# if constrained_electrical_properties_of_rocks is False :
# build layer according to geo_drr (geo_replacedrho)
self.geo_dpseudo_sequence, self.geo_dpseudo_sequence_rho =[
{} for ii in range(2)]
sc=[]
self.__setattr__('geo_secure_pseudo_sequence', None)
for stn , georr in self.geo_drr.items():
# for jj in range(3):
if stn =='S00' :
# first station id framed into three started at the left
svm = punc.build_resistivity_barplot(depth_values=self.geo_depth,
res_values=georr[:,0])
elif stn == self.station_names[-1] :
svm = punc.build_resistivity_barplot(depth_values=self.geo_depth,
res_values=georr[:,2])
else :
svm = punc.build_resistivity_barplot(depth_values=self.geo_depth,
res_values=georr[:,1])
sc.append(svm[-1])
self.geo_dpseudo_sequence[stn]= svm[0]
self.geo_dpseudo_sequence_rho[stn]=svm[1]
self.geo_secure_pseudo_sequence = np.array(sc)
#---------------- Quality control --------------------------------.---
mess =' Your data pass safety the Quality Control !'
if len(self.geo_secure_pseudo_sequence) == len(self.station_location):
self.qc +=.25
if np.all(self.geo_secure_pseudo_sequence== self.doi ):
self.qc += .25
else :
warnings.warn('Data provided are inconsistencies ,'
' Please you may try to undestand carefuuly the code.')
self._logging.debug('Data provided are inconsistencies ,'
' Please you may try to understand carefully the code.')
if self.qc ==1. :
print('---> {}.'.format(mess))
print('**{0:<37} {1} {2} {3}'.format(' QC flux rate','=' ,
100. * self.qc, '%' ))
#-rewrite info with real layers resistivities
# if provided and layers names ---
# self.input_layers, _ , _= pycsamt.geodrill.get_geo_formation_properties(
# structures_resistivities= self.input_resistivities,)
#-------------------Print Info ----------------------------------------
print('**{0:<37} {1} {2}'.format(' Number of layers','=' ,
len(self.input_layers) ))
print('**{0:<37} {1} {2} {3}'.format(' Step descent','=' ,
round(self.step_descent,3), 'm' ))
print('**{0:<37} {1} {2}'.format(' Block coefficient','=' ,
int(self.etacoeff)))
print('-'*77)
print('{0:<25}{1:<25} {2:<25}'.format('Structure', 'Rho mean value (Ω.m)',
'Rho range (Ω.m)'))
print('-'*77)
for ij , (ires, geos) in enumerate(zip (
self.input_resistivities,self.input_layers )):
if len(self.input_resistivities ) >1 :
if ij==0 :
rho_rg = '{0}-{1}'.format(
ires, self.input_resistivities [ij+1])
rho_mean = np.around(
(ires+ self.input_resistivities [ij+1])/2,2)
elif ires == self.input_resistivities[-1]:
rho_rg = '{0}-{1}'.format(self.input_resistivities[ij-1],
self.input_resistivities[ij])
rho_mean =np.around( (self.input_resistivities[ij-1]+\
self.input_resistivities[ij])/2,2)
else :
rho_rg = '{0}-{1}'.format(self.input_resistivities[ij-1],
self.input_resistivities[ij+1])
rho_mean = np.around( (self.input_resistivities[ij-1]+\
self.input_resistivities[ij+1])/2,2)
if len(self.input_resistivities)==1:
rho_rg = '{0}'.format(ires)
rho_mean = ires
print('{0:<25}{1:<25} {2:>25}'.format(geos, rho_mean,rho_rg ))
print('-'*77)
[docs] def to_golden_software(self, input_resistivities=None, input_layers =None,
step_descent = None, filename =None,
savepath =None , **kwargs ):
"""
Output average files, rehoreplaced files, spseudosequennces
files and station locations files will generate 3 outputs for
Golden software plots
1. One model for rho averaged (*_aver), transitory data betwen
the calculated rho and true rho.
2. second for rho value replaced . Replaced calcualted model
structures resistivities
to their closest resistivities as resistivities reference from
input resistivities.
3. the most important files: the cut out resistivities value
with step descent (._sd) show most dominant stratigraphy sequences .
4. the station location file (.bln)
Plot the 3 files in Golden software to see transition from model
calculation to truthsequence detail models which is most closest to reality.
Parameters
------------
* input_resistivities : array_like
Truth values of resistivities
* filename :str
name of output file
* step_descent : float,
Step to cut out data and to force resistivites calcualted
to match the reference data as input resistivities
if not provided the step will be 20% of D0I.
* input_layers : array_like
True input_layers names ( geological informations of
encountered layers )
* savepath : str,
full path to the savepath ,
if None , will create folder name to savepath
Returns
----------
obj , str
golden software outputfiles `_rr.dat`, `_aver.dat`, `_sd.dat`.
Holding a followings informations:
=================== ================ ================================
Optional params Type Explanation
=================== ================ ================================
elevation array_like elevation of survey area
to_negative_depth bool export deth in negative
value or positive
default is "negative".
scale str scale to export data .Must be
*m* or `km`. *default* is **m**
etaCoef int number of sperated model blocks
=================== ================ ================================
"""
elevation = kwargs.pop('elevation', None)
write_negative_depth=kwargs.pop('to_negative_depth', True)
etaCoef = kwargs.pop('etaCoef', None)
scale =kwargs.pop('scale', 'm')
if scale is None : scale= 'm'
# rescale data
if scale =='m':
df =1.
elif scale =='km':
df =1000.
else : df =1.
if input_layers is not None : self.input_layers = input_layers
if input_resistivities is not None :
self.input_resistivities = input_resistivities
if etaCoef is not None : self.etacoeff = etaCoef
if step_descent is not None : self.step_descent= step_descent
if elevation is not None : self.elevation = elevation
if self.elevation is None :
mess='!Elevation is not provided. We gonna set to 0.'
print('-->'+mess)
self._logging.debug(mess)
if self.station_location is not None :
self.elevation = np.repeat(0., len(self.station_location))
elif self.elevation is not None :
self.elevation = geo_length_checker(main_param= self.station_location,
optional_param = self.elevation,
param_names = ('station location', 'elevation'), fill_value=0.)
# now read files
if self.input_resistivities is not None :
self.geo_build_strata_logs()
else :
mess ="".join(["Need ABSOLUTELY an input resistivities get"
" on the field or other companies ",
" Input resistivities MUST provided in order"
" to take data as reference resistivities.",
" please provided a least",
" a truth resitivities of one layer."])
warnings.warn(mess)
self._logging.error(mess)
raise CSex.pyCSAMTError_plot_geoinputargument(
"Can not write details sequences log files "
"! Please provided at least one"
" truth layer resistivity.")
matrix_rhoaver ,matrix_rhorr,\
matrix_rho_stepdescent =[[]for i in range(3)]
for site in self.station_names :
#station id to read is the first index =0
if site ==self.station_names[0]:
index =0
if site ==self.station_names [-1] :
index =2 # station id to read is the 2index
else : index =1 # read th middle array
matrix_rhorr.append(self.geo_drr[site][:, index])
matrix_rhoaver.append(self.geo_daver[site][:, index])
matrix_rho_stepdescent.append(
self.geo_dstep_descent[site][:, index ])
# build a matrix of all data collected
matrix_rhorr= func.concat_array_from_list(
matrix_rhorr, concat_axis=1)
matrix_rhoaver= func.concat_array_from_list(
matrix_rhoaver, concat_axis=1)
matrix_rho_stepdescent= func.concat_array_from_list(
matrix_rho_stepdescent, concat_axis=1)
# create empty list to collect infos of read values
write_averlines , write_rrlines, write_stepdescent_lines ,\
write_blnfiles =[[]for i in range(4)]
# writes multilines in the same times
if write_negative_depth is True : self.geo_depth *=-1
# scalled station location and geo_depth
self.geo_depth /=df
self.station_location /=df
for writes_lines , matrix in zip ( [
write_averlines , write_rrlines, write_stepdescent_lines],
[matrix_rhoaver, matrix_rhorr, matrix_rho_stepdescent]):
for ii in range(len(self.geo_depth)) :
for jj in range(len(self.station_names)):
writes_lines.append(''.join([
'{0:>15.7f}'.format(self.station_location[jj]),
'{0:>15.7f}'.format(self.geo_depth[ii]),
'{0:>15.7f}'.format(matrix[ii,jj]),
'\n',
]))
# create fileame if not provided
if filename is None :
filename = '.{0}'.format(datetime.datetime.now().month)
else :
filename ='{0}.{1}'.format(os.path.basename(
filename), datetime.datetime.now().month)
for ikey, stn in enumerate(self.station_names) :
write_blnfiles .append(''.join([
'{0:<7.6f},'.format(
self.station_location[ikey]),
'{0:<7.6f},'.format(
self.elevation[ikey]),
'{0:<4}'.format(stn), '\n']))
if savepath is None : # create a folder in your current work directory
try :
savepath = os.path.join(os.getcwd(), '_outputGeoSD_')
if not os.path.isdir(savepath):
os.mkdir(savepath)# mode =0o666)
except :
warnings.warn("It seems the path already exists !")
#writes files
for ii , (tfiles , wfiles) in enumerate(zip(
['_aver', '_rr', '_sd', '_yb'],
[write_averlines, write_rrlines,
write_stepdescent_lines, write_blnfiles])):
if ii == 3 : mm='bln'
else :mm='dat'
with open(''.join([filename,'{0}.{1}'.format(tfiles, mm)]),
'w') as fw :
fw.writelines(wfiles)
#savefile inot it savepath
if savepath is not None :
import shutil
try :
for jj, file in enumerate( [
filename + '_aver.dat', filename +'_rr.dat',
filename +'_sd.dat', filename + '_yb.bln']):
shutil.move(os.path.join(os.getcwd(),file) ,
os.path.join(savepath , file))
except :
warnings.warn("It seems the files already exists !")
filenames =[filename + '_aver.dat', filename +'_rr.dat',
filename +'_sd.dat',
filename + '_yb.bln']
print('---> geo output files {0}, {1}, {2} & {3}'
' have been successfully written to <{4}>.'.format(
*filenames, savepath))
[docs] def to_oasis_montaj (self, model_fn=None , iter_fn=None ,profile_fn =None,
mesh_fn=None , data_fn=None , filename=None,
savepath =None , **kwargs) :
"""
write output to oasis montaj when station loocation and profile
coordinates are provided . We assune before using this method , you are
already the coordinates files at disposal(*stn), if not use the method
`to_golden_software`.Coordinated files are Easting Northing value
not in degree decimals . It uses occam 2D outputfiles or Bo Yang
iter2Dat file also add profile XY coordinates (utm_zone ).
Parameters
------------
* model_fn : str
full path to Occam2D model file
* mesh_fn : str
full path to Occam2D mesh file
* data_fn : str
full path to Occam2D data file
* iter_fn : str
full path to Occam2D iteration file
OR Bo Yang (x, y, z ) files
* iter2dat_fn : str
full path to iter2dat file
(see _occam2d module to see which file is it. or call
occam2d.Iter2Dat.__doc__)
* bln : str
full path to satation location file
profile files (Easting , Northing Coordinates) ,
"see :ref:module-cs"
* profile_fn: str
full path to station profile file . You can
useProfile module to rewrite _coordinate files
* write_negative_depth: bool
output negative depth.
*Default* is True
* scalled_east_north: tuple
scalled the easting and northing.
Substract or add value to easting
or northing values. first `index1`
equal easting and `index2` equal
Nothing.*Default* is (0,0).
Returns
--------
obj , str
oasis outputfiles `.xlxs`, `.csv`
Holding a followings informations:
================= ============== ====================================
Optional params Type Explanation
================= ============== ====================================
filename str New name of output file .
*Default* is None
normalize_depth bool Set the depth ega spacing depth .
In fact for oasis montaj ,depth
must be equidistant the same
spacing when image in deeper.
*default* is True. if false ,
will generate depth as Occam2D
mesh z nodes.
easting array_like Easting UTM coordinates .Profile
*Stn* file is provided,
no need to input. *default* is None
northing array_like Northing coordinates . If station
coordinates is provided from *stn
file .It will detect automatically.
*default* is None.
elevation array_like Elevation area . *Default* is None.
output_s_XY bool Scalled coordinates output files.
*Default* is True
input_rho array_like input resistivities of True
geological formations (optional)
if input resistivities is provided
will generated output step descent
files (_sd), roughness_rho (_rr),
rho_averaged (_aver). *Default* is
output the main resistivty model.
input_layers array_like list of true names of layers(opt.)
step_descent float Step to cut out data and to
force resistivites calcualted
to match the reference data as
input resistivities if not provided
the step will be 20% of D0I.(opt)
writeType str Writer format *.csv or *.xlsx .
*Default* is *.xlsx
add_header bool Add head on exported sheet.
set False to mask heads.
*Default* is True.
csv_separateType str Indicated for csv exported files.
The type of comma delimited.
*Defaut* is ','
to_log10 bool if True : will ouput all
resistivities data to log10 values
etaCoef int number to separate the model blocks
================= ============== ====================================
"""
def create_model_matrix (station_names, geo_dict_rho, transpose =True):
"""
simple function to generate matrix from geo_dictionnary data
main site is frames into3 stations
Parameters
----------
* station _names: list
name of stations
* geo_dict_rho: dict
dictionnary of model_resistivities
can be a step _descent dict ,
geo_average_or geo_replace rho
* transpose: bool
if transpose is True , depth will be a xoffset
and and station names as y offsets
Returns
--------
ndarray
model_rho_matrix , matrix , depth & station locations
"""
model_rho_matrix =[]
for site in station_names :
if site ==station_names[0]:
index =0
if site ==station_names [-1] : index =2
else : index =1 # read th middle array
model_rho_matrix.append(geo_dict_rho[site][:, index])
model_rho_matrix= func.concat_array_from_list(model_rho_matrix ,
concat_axis=1)
if transpose is True : model_rho_matrix= model_rho_matrix.T
return model_rho_matrix
print('-'*77)
normalize_depth =kwargs.pop('normalize_depth', True )
iter2dat_fn =kwargs.pop('iter2dat_fn', None)
bln_fn =kwargs.pop('bln_fn', None)
easting =kwargs.pop('easting', None)
northing =kwargs.pop('northing', None)
elevation = kwargs.pop('elevation', None)
scalled_east_north =kwargs.pop('scalled_east_north', (0,0))
output_sprofile =kwargs.pop('output_s_XY', True)
write_negative_depth=kwargs.pop('to_negative_depth', True)
to_log10 = kwargs.pop('to_log10', True)
etaCoef = kwargs.pop('etaCoef', None)
# write -externals files : Step descent , average rho
input_rho =kwargs.pop('input_resistivities', None)
input_layers =kwargs.pop('input_layers', None)
step_descent =kwargs.pop('step_descent', None)
if input_rho is not None : self.input_resistivities = input_rho
if self.input_resistivities is not None :
write_external_files=True
else : write_external_files = False
# write sheet
writeType =kwargs.pop('writeType', 'xlsx')
csvsetHeader =kwargs.pop('add_header',True)
csvsep =kwargs.pop('csv_separateType',',')
writeIndex=kwargs.pop('write_index_on_sheet',False)
# create filename if not provided
if filename is None and profile_fn is not None :
filename = os.path.basename(profile_fn)
if filename is None :
filename = '.{0}'.format(datetime.datetime.now().month)
else:
filename ='{0}.{1}'.format(os.path.basename(filename),
datetime.datetime.now().month)
# statements of arguments ----
for mattar, mfile in zip(['model_fn', 'mesh_fn', 'data_fn',
'iter_fn', 'iter2dat_fn', 'bln_fn' ],
[model_fn, mesh_fn, data_fn, iter_fn,
iter2dat_fn, bln_fn ]):
if mfile is not None :
setattr(self, mattar, mfile)
# --Buil profile object ----------------------------
if elevation is not None : self.elevation = elevation
profile_obj = Profile ()
profile_obj.read_stnprofile(profile_fn , easting=easting,
northing =northing ,
elevation = self.elevation)
profile_obj.straighten_profileline ( reajust=scalled_east_north ,
output =output_sprofile)
print('** {0:<37} {1} {2} {3}'.format('Dipole length','=',
profile_obj.dipole_length,
'm.' ))
profile_angle, geo_electric_strike = profile_obj.get_profile_angle()
print('** {0:<37} {1} {2} {3}'.format(' Profile angle ','=',
round(profile_angle, 2),
'deg N.E' ))
print('** {0:<37} {1} {2} {3}'.format('Geo_electric strike','=',
round(geo_electric_strike , 2),
'deg N.E' ))
# In the case where elevation is
#contain on the profile stn file , get elevation
if profile_obj.elev is not None : self.elevation = profile_obj.elev
# build section Infos
#Stations Easting_X_m Northing_Y_m Elev_H_m
#x_m , Norm_h_m offsets_m DOI_max_m
info_list = ['station', 'easting_X_m', 'northing_Y_m', 'elev_H_m',
'position_x_m', 'nivelize_h_m', 'offset_m', 'doi']
#normalH
nivelize_elevation = np.around(
profile_obj.elev - profile_obj.elev.min(), 2)
# stn position
if write_negative_depth is True : doi =-1* self.doi
else : doi = self.doi
infos_oasis_montaj = func.concat_array_from_list(list_of_array= [
self.station_names ,
profile_obj.east ,
profile_obj.north ,
profile_obj.elev ,
profile_obj.stn_position ,
nivelize_elevation ,
self.station_location ,
np.full((self.station_location.shape[0],), doi),
],
concat_axis=1)
#---Build main data for oasis ------------------------------------
# manage the depth info and create matrix of depth
spacing_depth =abs(self.geo_depth.max()-self.geo_depth.min())/ (
len(self.geo_depth)-1)
print('** {0:<37} {1} {2} {3}'.format('Spacing depth','~=', round(
spacing_depth,2), 'm.' ))
if normalize_depth is True :
# get the step_deth
geo_depth = np.around(np.linspace(self.geo_depth [0],
self.geo_depth[-1],
len(self.geo_depth )))
spacing_depth =abs(self.geo_depth.max()- self.geo_depth.min())/ (
len(self.geo_depth)-1)
print('---> Depth normalized !')
print('---> {0:<37} {1} {2} {3}.'.format('new spacing depth',
'=', round(spacing_depth),
'm' ))
else :
print('---> UNnormalized depth!')
# build a mtrix of depth
depth_oasis_montaj = np.repeat(geo_depth.reshape(geo_depth.shape[0], 1),
len(self.station_location), axis =1 )
depth_oasis_montaj = depth_oasis_montaj .T # Transpose the depth
if write_negative_depth is True :depth_oasis_montaj *= -1
# build data
model_oasis_montaj = create_model_matrix (
station_names = self.station_names,
geo_dict_rho= self.geo_d)
if to_log10: np.log10( model_oasis_montaj )
data_oasis = np.concatenate((infos_oasis_montaj,
depth_oasis_montaj,
model_oasis_montaj), axis =1)
# buidd pandas columns
depres_pandas_columns = info_list + ['dep_{0}'.format(int(dd))
for dd in geo_depth] +\
['res_{0}'.format(int(dd)) for dd in geo_depth]
oas_pandas = pd.DataFrame(data=data_oasis,
columns=depres_pandas_columns)
fex=0 # flag to write external files
if write_external_files is True :
if input_layers is not None : self.input_layers = input_layers
if step_descent is not None : self.step_descent = step_descent
if etaCoef is not None : self.etacoeff =etaCoef
if self.input_resistivities is None :
mess = ''.join(['! No input resistivities provided! ',
'Could not write external geo-files.',
' If you expect to write external GEO ',
'{step_descent - roughness and average_rho} ',
' files, you ABSOLUTELY need to provided at',
' least a truth resistivity values',
' of some geological formation of the area.'])
print(punc.fmt_text(mess,fmt='+'))
warnings.warn(mess)
self._logging.debug (mess)
if self.input_resistivities is not None :
self.geo_build_strata_logs(
input_resistivities= self.input_resistivities,
input_layers=self.input_layers,
step_descent=self.step_descent,
etaCoef =self.etacoeff)
# now build matrix of all external files
model_step_descent = create_model_matrix (
station_names = self.station_names,
geo_dict_rho= self.geo_dstep_descent)
model_geo_roughness = create_model_matrix (
station_names = self.station_names,
geo_dict_rho= self.geo_drr)
model_average_rho = create_model_matrix (
station_names = self.station_names,
geo_dict_rho= self.geo_daver)
if to_log10 :
model_step_descent= np.log10(model_step_descent)
model_geo_roughness=np.log10(model_geo_roughness)
model_average_rho =np.log10( model_average_rho)
data_oas_stepDescent = np.concatenate((infos_oasis_montaj,
depth_oasis_montaj,
model_step_descent), axis =1)
data_oas_roughness = np.concatenate((infos_oasis_montaj,
depth_oasis_montaj,
model_geo_roughness), axis =1)
data_oas_averageRho = np.concatenate((infos_oasis_montaj,
depth_oasis_montaj,
model_average_rho), axis =1)
# create pandas dataFrame
STD_pandas = pd.DataFrame(data=data_oas_stepDescent,
columns=depres_pandas_columns)
ROUGH_pandas = pd.DataFrame(data=data_oas_roughness,
columns=depres_pandas_columns)
AVER_pandas = pd.DataFrame(data=data_oas_averageRho,
columns=depres_pandas_columns)
fex =1 # everything where ok then take one
# write files
# if f== 1 :
# create filename
if writeType.lower().find('exc')>= 0 or writeType.lower().find('xls')>=0 :
writeType = '.xlsx'
if fex ==1 : # write all files into one worksheet
with pd.ExcelWriter(''.join([
filename + '.main._cor_oas',writeType])) as writer :
for sname , df_ in zip([
'.main.', '_sd', '_rr', '_aver'],
[oas_pandas, STD_pandas,ROUGH_pandas, AVER_pandas ]):
df_.to_excel(writer,sheet_name=sname, index =writeIndex)
fex =2
else :
oas_pandas.to_excel(''.join([
filename,'.main._cor_oas', writeType]),
sheet_name=filename +'main_cor_oas', index=writeIndex)
elif writeType.lower().find('csv')>= 0 :
writeType ='.csv'
oas_pandas.to_csv(''.join([filename,'.main._cor_oas',writeType]),
header=csvsetHeader,index =writeIndex,sep=csvsep)
if fex==1 :
for fnx, _df in zip( ['_sd', '_rr', '_aver'] ,
[STD_pandas,ROUGH_pandas, AVER_pandas ]):
_df.to_csv(''.join([filename + fnx,'_cor_oas', writeType]),
header=csvsetHeader,index =writeIndex,sep=csvsep)
else :
mess = 'The type you provide is wrong.'\
' Support only *.xlsx and *.csv format'
self._logging.error (mess)
warnings.warn (mess)
raise CSex.pyCSAMTError_geodrill_inputarguments(
'wrong format ={0} !'\
' Could not write geo file to oasis.'\
' Support only *.xlsx or *.csv format ')
# export to savepath
if savepath is None : # create a folder in your current work directory
try :
savepath = os.path.join(os.getcwd(), '_output2Oasis_')
if not os.path.isdir(savepath):
os.mkdir(savepath)# mode =0o666)
except :
warnings.warn("It seems the path already exists !")
if savepath is not None :
import shutil
try :
if fex ==2 :
shutil.move(os.path.join(os.getcwd(),''.join(
[filename,'.main._cor_oas', writeType])) ,
os.path.join(savepath , ''.join([
filename,'.main._cor_oas', writeType])))
else :
for jj, file in enumerate(
['.main.', '_sd', '_rr', '_aver']):
if fex ==0 :
if jj ==1 :
break # dont continue, other files dont exists
shutil.move(os.path.join(os.getcwd(),
''.join([filename, file,
'_cor_oas', writeType])) ,
os.path.join(savepath ,
''.join([filename, file,
'_cor_oas', writeType])))
except :
warnings.warn("It seems the files already exists !")
# write Infos
print('** {0:<37} {1} {2} '.format('number of stations',
'=', len(self.station_names)))
print('** {0:<37} {1} {2} {3}'.format('minimum offset',
'=', self.station_location.min(),
'm' ))
print('** {0:<37} {1} {2} {3}'.format('maximum offset',
'=', self.station_location.max(),
'm' ))
print('** {0:<37} {1} {2} {3}'.format('maximum depth',
'=', geo_depth.max(), 'm' ))
print('** {0:<37} {1} {2} {3}'.format('spacing depth ',
'=', round(spacing_depth,2),
'm' ))
if self.elevation is None or np.all(self.elevation ==0.) :
print('---> Elevation no added !')
else :
print('** {0:<37} {1} {2} {3}'.format('minumum elevation',
'=', self.elevation.min(),
'm' ))
print('** {0:<37} {1} {2} {3}'.format('maximum elevation ',
'=', self.elevation.max(),
'm' ))
print('** {0:<37} {1} {2} {3}'.format('minumum resistivity value','=',
round(model_oasis_montaj.min(), 2),
'Ω.m' ))
print('** {0:<37} {1} {2} {3}'.format('maximum resistivity value','=',
round(model_oasis_montaj.max(), 2),
'Ω.m' ))
print('** {0:<37} {1} {2} '.format('Lowest station','=',
self.station_names[
int(np.where(
nivelize_elevation==nivelize_elevation.min(
))[0])] ))
print('** {0:<37} {1} {2}'.format('Highest station','=',
self.station_names[
int(np.where(
nivelize_elevation==nivelize_elevation.max())[0])]))
print('** {0:<37} {1} {2} {3}'.format('Altitude gap','=',
nivelize_elevation.max(), 'm' ))
print('** {0:<37} {1} {2}'.format('Number of running ','=',
len(depres_pandas_columns)
* len(self.station_location)))
if fex ==0 or fex==2:
print('---> geo output file {0}, '
' has been successfully written to <{1}>.'.
format(''.join([filename, '.main._cor_oas',
writeType]), savepath))
elif fex ==1 : #read external files
filenames =[ filename + file +'_cor_oas'+ writeType for file ,
exten in zip(['.main.', '_sd', '_rr', '_aver'],
[writeType for i in range(4)])]
print('---> geo output files {0}, {1}, {2} '
' & {3} have been successfully '\
'written to <{4}>.'.format(*filenames, savepath))
print('-'*77)
[docs]class Geosurface :
"""
Read Multidata from oasis montaj output files generated by `geodrill`
module . Class to Build a depth surface map for imaging .
Arguments
-----------
**path**: string
path to oasis ouput files , frequently the files generated by
`geodrill`modules.
Holding a followings informations:
============== ============= ============================================
KeyWords Type Description
============== ============= ============================================
depth _values array_like Values of depth for imaging .
can be a list of depth like [38,912]
fileformat str output format . actually geosurface can
only generate ouput in `csv` and `xls`.
*default* is `csv`.
savepath str full path to sve directory . If none
will create an a new directory
============== ============= ============================================
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import Geosurface
>>> gs_obj = geosurface (path =os.path.join(os.environ['pyCSAMT'],
... 'geodrill', 'data',
... InputOas),
... depth_values = [28, 100])
>>> gs_obj.write_file()
"""
geo_surface_format=["csv", "xlsx", "json","html","sql"]
read_dico = {
".csv":pd.read_csv,
".xlsx":pd.read_excel,
".json":pd.read_json,
".html":pd.read_json,
".sql" : pd.read_sql
}
def __init__(self , path=None, **kwargs):
self._logging =csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self.path=path
self.depth_values =kwargs.pop('depth_values', None)
self.export_format =kwargs.pop ('fileformat', 'xlsx')
self.savepath =kwargs.pop('savepath', None)
if self.path is not None :
self._validate_oasis_path()
def _validate_oasis_path(self, path =None ) :
"""
Validate oasis montaj files and get extension files
:param path: full path to oasis montaj files
:type path: str
"""
if path is not None : self.path =path
if self.path is not None :
if os.path.isdir(self.path): # get the list of files in folder
# get file and be sure that format exist in
self.oasis_data = [os.path.join(self.path, file)
for file in os.listdir(self.path)
if ( file.split('.')[-1]
in self.geo_surface_format)]
# print(self.oasis_data)
if self.oasis_data is None or len(self.oasis_data)==0 :
mess ='No files detected!. Please provided'\
' right path to oasis models files.'
warnings.warn(mess)
self._logging.error(mess)
raise CSex.pyCSAMTError_inputarguments(mess)
elif self.path is None :
raise CSex.pyCSAMTError_inputarguments(
'No path found ! Please provided a right path.')
# get extension file
extension_files =[os.path.splitext(pathfile)[1]
for pathfile in self.oasis_data]
self.extension_file =extension_files
if self.extension_file.replace('.','') not in self.geo_surface_format :
mess = 'Unacceptable format = {0}. Could read format ={1}'.\
format(self.extension_file, tuple(self.geo_surface_format))
self._logging.warning(mess)
raise CSex.pyCSAMTError_file_handling(mess)
@property
def extension_file (self):
return self._extension_file
@extension_file.setter
def extension_file(self, get_extension):
# put on array and use np.unique to get the most frequent format
if isinstance(get_extension, (list, tuple)):
get_extension = np.array(get_extension)
ex , ex_count =np.unique(get_extension , return_counts= True)
self._extension_file = ex[int(np.where (ex_count== ex_count.max())[0])]
[docs] def read_oasis_files (self, path =None ):
"""
Method to get depth spacing , station info data infos,
Each line becomes it own attributes components
of info values are `Stations`, `Easting_X_m`,`Northing_Y_m`,
`v_H_m`,`x_m` `Norm_h_m`,`sets_m` `DOI_max_m`.
:param path: full path to the oasis files .
:type path: str
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import Geosurface
>>> path = r'F:/__main__csamt__\oasis data\OASISWORKS\all_data'
>>> geo_surface_obj = Geosurface( path =path )
>>> geo_surface_obj.read_oasis_files()
>>> geofilenames = geo_surface_obj.filenames
... note::To get the values of line K1_cor_oas "K1_cor_oas.csv
do ``k1_obj = geo_surface_obj.K1_cor_oas``
"""
print('{0:-^77}'.format('GeoSurface * Data * info'))
def get_depth_values(df ):
"""
get specific depth values from geomodel dataframe
Parameters
----------
* df : pandas.Core.DataFrame
Dataframe pandas
Returns
--------
info_names: list
header names info
depth_values: array_like
spacing depth offsets
depth_spacing: float
value of spacing depth
"""
info_names = [name.lower() for name in df.columns if not
(name.find('dep_')>=0 or name.find('res_')>= 0) ]
# The depth spacing value is the same like the res
depth_values = np.array([float(name.replace('dep_', '')) for
name in df.columns
if name.find('dep_')>=0 ])
return info_names, depth_values , np.abs(depth_values.max() -
depth_values.min())/ (
len(depth_values)-1)
def get_infohead_index(fnames, info_matrix , name, dtype ='float'):
"""
If oasis file is litthel messy about the head , better way to avoid
miscomputation
if to get the real index of colums names you are seeking so to
get corresponding values
Parameters
-----------
* fnames: list
head of info names except the depth matrix
(dep_) and ressitivity matrix (res_)
* name: str
name of head user are looking for
* info_matrix: ndarray
ndarray array of oasis montag info .
start by ]station] to end normally [doi max]
"""
for head in fnames :
if head.find(name)>=0 :
name =head
index = fnames.index(name)
break
if dtype is not None :
indexmatrix = info_matrix[:, index].astype(dtype)
return indexmatrix
if path is not None : self.path = path
if self.path is not None :
self._validate_oasis_path()
# initiliase gloabal dico to takes all files with key as survey lines
self.global_dico = {}
for keys, vitems in self.read_dico.items():
if keys == self.extension_file :
for files in self.oasis_data :
self.global_dico [os.path.basename(files).\
replace('{0}'.format(self.extension_file), '')] =\
vitems(files)
self.info_depthvalues = { filename : get_depth_values(df_) for filename ,
df_ in self.global_dico.items()}
# get the input filenames and set attributes
self.filenames = list(self.global_dico.keys())
# set main attributes for selects key
for oasnames in self.filenames :
# convert dataframe to numpy : numpy very fast
tem_array = self.global_dico[oasnames].to_numpy()
# get the length (of head infos)
len_info_names = len(self.info_depthvalues[oasnames][0])
# get the length of depth infos
len_depth_offset = len(self.info_depthvalues[oasnames][1])
# now set temporary attributes for each limes
self.__setattr__(oasnames, (tem_array[::, :len_info_names],
tem_array[::, len_info_names:len_info_names +len_depth_offset],
tem_array[::, len_info_names +len_depth_offset:]) )
# initiliase dictionary to hold
self.geos_profile_strike_angles ={}
for gskeys,gsvalues in self.info_depthvalues.items():
print('** ----- {0} : --|>{1:<37} :'.format('file',gskeys))
print('** {0:<37} {1} {2} {3}'.format('depth spacing ','=',
gsvalues[2], 'm' ))
print('** {0:<37} {1} {2} {3}'.format('maximum depth','=',
gsvalues[1].max(), 'm' ))
# compute profile ange and geolectrike strike
easting = get_infohead_index(fnames=gsvalues[0],
info_matrix= getattr(self, gskeys)[0],
name='east')
northing= get_infohead_index(fnames=gsvalues[0],
info_matrix= getattr(self, gskeys)[0],
name='north')
try :
gstrike , profile_angle,_ = geostrike.\
compute_geoelectric_strike(easting=easting ,
northing = northing)
print('** {0:<37} {1} {2} {3}'.format('profile angle ','=',
profile_angle,
'degrees E of N.' ))
print('** {0:<37} {1} {2} {3}'.format('geoelectric strike','=',
gstrike,
'degrees E of N.' ))
except :
warnings.warn('Trouble occurs when computing profile'\
' angle and geo electric strike.')
self._logging.debug('Trouble occurs when computing profile'\
' angle and geo electric strike.')
else :
self.geos_profile_strike_angles[gskeys]= (profile_angle, gstrike)
# pass when something wrong
[docs] def get_depth_surfaces(self, path =None , depth_values =None):
"""
get the depth surfaces for multi-lines and build the numpy
corresponding array at that depth .
:param depth_values: array of depth
:type depth_values: float or array_like
:param path: full path to oasis outputfiles .
:type path: str
"""
if path is not None :
self.path = path
if self.path is not None : self.read_oasis_files()
if depth_values is not None : self.depth_values= depth_values
if self.depth_values is None :
raise CSex.pyCSAMTError_plot_geoinputargument(
'NoneType could not be computed. Please provided'
' right values !')
if isinstance(self.depth_values, (float, str)):
try :
self.depth_values = float(self.depth_values)
except :
raise CSex.pyCSAMTError_float('Could not convert {0} to float'.\
format(self.depth_values))
else : self.depth_values =np.array([self.depth_values])
elif isinstance(self.depth_values , (list, tuple, np.ndarray)):
self.depth_values = np.array(self.depth_values)
try :
self.depth_values = self.depth_values.astype('float')
except : raise CSex.pyCSAMTError_float(
'Could not convert values to float!')
def get_single_surface_from_one_line(site_name , depth_value) :
"""
Fonction to get single surface from depth value fron one line .
:param depth value: depth value to image
:type depth value: float
:param site_name: str name of site
:type site_name: str
"""
# get attributes array : depth and resistivity
#get depth index from info_depthvalues dictionnary (key :
# values = info_names, depth_values, depth spacing )
new_depth_value, index = get_closest_value(
values_range= self.info_depthvalues[site_name][1],
input_value= depth_value)
# now get especially array or depth and resistivy at that value
# from attribute name value attr = infomatrix ,
#depth matrix and res matrix
depth = getattr(self, site_name)[1][:, index]
res = getattr(self, site_name)[2][:, index]
return new_depth_value , depth , res
# for loop to concat array
self.geosurface_dico ={} # dict to keep all surface matrix
tem=[]
for dvalue in self.depth_values :
for names in self.filenames :
new_depth_value, sdepth , sres =\
get_single_surface_from_one_line(site_name=names ,
depth_value = dvalue)
#build matrix array info depth res
globalmatrix =np.concatenate((getattr(self, names)[0],
sdepth.reshape(sdepth.shape[0], 1),
sres.reshape(sres.shape[0], 1)),
axis =1 )
tem.append(globalmatrix)
self.geosurface_dico[new_depth_value] =tem
tem=[]
for gkey, gvalue in self.geosurface_dico.items():
# buid the array of each depth
self.geosurface_dico[gkey]=func.concat_array_from_list(gvalue)
# create pandas dataframe
for hnames, hvalues in self.info_depthvalues.items() :
# Generally for heads from oasis are the same for all lines
# the let get one:
headnames = self.info_depthvalues[hnames][0] # break
break
for gnkey, gnvalue in self.geosurface_dico.items():
columnames = headnames + [ 'dep_{0}'.format(int(gnkey))] + \
[ 'rho_{0}'.format(int(gnkey))]
self.geosurface_dico[gnkey] = pd.DataFrame(data=gnvalue,
columns = columnames)
[docs] def write_file(self, path =None, depth_values =None,
fileformat='.csv', **kwargs):
"""
Write output files. Output files are `.xlsx` or `.csv` .
:param path: full path to `geodrill` ouput files
:type path: str
:param depth_value: depth values for imaging
:type depth_value: float or array_like
:param fileformat:`xlsx` or `csv` are actually the acceptable formats.
:type fileformat: str
"""
savepath = kwargs.pop('savepath', None)
writeIndex= kwargs.pop('write_index', False)
csvsetHeader =kwargs.pop('add_header',True)
csvsep =kwargs.pop('csv_separateType',',')
if savepath is not None : self.savepath =savepath
if path is not None : self.path = path
if depth_values is not None : self.depth_values =depth_values
if fileformat is not None : self.export_format = fileformat.lower()
if self.export_format is not None :
self.export_format= self.export_format.lower() # for consistency
if self.export_format.find('exc')>=0 or \
self.export_format.find('xls')>=0 :
self.export_format = 'xlsx'
elif self.export_format.find('csv')>=0 :
self.export_format ='csv'
else :
mess=''.join(['-->Sorry! Geosurface actually does not support the {0} ',
'format provided. Only support `xlsx` or `csv` format.',
' Please provided the rigth format !'])
self._logging.error(mess.format(self.export_format))
warnings.warn(mess.format(self.export_format))
raise CSex.pyCSAMTError_file_handling(
'Format provided = {0} is wrong ! '
'Could output only `csv` or `xlsx`.'.
format(self.export_format))
if self.depth_values is None :
warnings.warn(
'Need to specify the value of depth for imaging !')
self._logging.warn(
'Need to specify the value of depth for imaging !')
raise CSex.pyCSAMTError_inputarguments(
'! Need to specify the depth value for imaging.')
if self.path is None :
warnings.warn(
'Need to provide the rigth path `geodrill` model output files.')
self._logging.warning(
'Need to provide the rigth path `geodrill` model output files.')
raise CSex.pyCSAMTError_inputarguments(
'No path detected ! Please provide a right path to `geodrill`\
oasis montaj outputfiles ')
self.get_depth_surfaces() # call methods to create
# get values from dictionnary and write file
#make output filename
filenames =[]
filename =''.join([file.replace('_cor_oas', '')
for file in self.filenames])
if self.export_format =='xlsx':
with pd.ExcelWriter(''.join([filename+
'_gs{0}.'.format(
datetime.datetime.now().month ),
self.export_format])) as writer :
for file , df_ in self.geosurface_dico.items():
df_.to_excel(writer,
sheet_name='dep_{0}'.format(file),
index =writeIndex)
filenames.append(''.join(
[filename+
'_gs{0}.'.format(datetime.datetime.now().month ),
self.export_format]))
if self.export_format =='csv':
for file , df_ in self.geosurface_dico.items():
df_.to_csv(''.join([filename + str(file),
'_gs{0}.'.format(
datetime.datetime.now().month),
self.export_format]),
header=csvsetHeader,
index =writeIndex,sep=csvsep)
filenames.append(
''.join([filename + str(file) ,
'_gs{0}.'.format(datetime.datetime.now().month ),
self.export_format]))
# export to savepath
if self.savepath is None : # create a folder in your current work directory
try :
self.savepath = os.path.join(os.getcwd(), '_outputGS_')
if not os.path.isdir(self.savepath):
os.mkdir(self.savepath)# mode =0o666)
except :
warnings.warn("It seems the path already exists !")
#savefile inot it savepath
if self.savepath is not None :
import shutil
try :
for nfile in filenames :
shutil.move(os.path.join(
os.getcwd(), nfile),
os.path.join(self.savepath , nfile))
except :
warnings.warn("It seems files already exist !")
# fmt =''.join(['{0}'.format(ifile) for ifile in filenames])
print('---> Geosurfaces outputfiles :{0} : have been successfully '\
'written to <{1}>.'.format(','.join(
['{0}'.format(ifile) for ifile in filenames]),
self.savepath))
print('-'*77)
[docs]class geostrike :
"""
Class to deal with computation with profile angle and geo_electrical strike.
Compute Profile angle and strike angle
Need to import scipy.stats as one module . Sometimes import scipy
differently with stats may not work .
either `import scipy.stats` rather than `import scipy as sp` to use :
`sp.stats.linregress` .
"""
[docs] @staticmethod
def compute_profile_angle (easting=None, northing=None):
"""
Essentially dedicated to compute geoprofile angle.
Parameters
-----------
* easting : array_like
easting coordiantes values
* northing : array_like
northing coordinates values
Returns
---------
float
profile_angle
float
geo_electric_strike
str
message of return
"""
_logger.info(
'Computing profile angle from Easting and Nothing coordinates.')
if easting is None or northing is None :
raise CSex.pyCSAMTError_inputarguments(
'NoneType can not be computed !')
# use the one with the lower standard deviation
try :
easting = easting.astype('float')
northing = northing.astype('float')
except :
raise CSex.pyCSAMTError_float(
'Could not convert input argument to float!')
if stats_import is True :
profile1 = spSTAT.linregress(easting, northing)
profile2 =spSTAT.linregress(northing, easting)
else :
warnings.warn(
'Could not find scipy.stats, cannot use method linearRegression '
'check installation you can get scipy from scipy.org.')
_logger.warning(
'Could not find scipy.stats, cannot use method lineaRegress '
'check installation you can get scipy from scipy.org.')
raise ImportError(
'Could not find scipy.stats, cannot use method lineaRegress '
'check installation you can get scipy from scipy.org.')
profile_line = profile1[:2]
# if the profile is rather E=E(N),
# the parameters have to converted into N=N(E) form:
if profile2[4] < profile1[4]:
profile_line = (1. / profile2[0], -profile2[1] / profile2[0])
# if self.profile_angle is None:
profile_angle = (90 - (np.arctan(profile_line[0]) * 180 / np.pi)) % 180
# otherwise: # have 90 degree ambiguity in
#strike determination# choose strike which offers larger
# angle with profile if profile azimuth is in [0,90].
return np.around(
profile_angle,2) , 'Profile angle is {0:+.2f} degrees E of N'.format(
profile_angle)
[docs] @staticmethod
def compute_geoelectric_strike (profile_angle = None , easting =None,
northing=None, **kws):
"""
Compute geoelectric strike
Parameters
-------------
* profile_angle : float
If not provided , will comput with
easting and northing coordinates
* easting : array_like
Easting coordiantes values
* northing : array_like
Northing coordinates values
* geo_strike : float
strike value , if provided, will
recomputed geo_electric strike .
Returns
--------
float
profile_angle in degree E of N
float
geo_electric_strike in degrees E of N
str
message of return
"""
gstrike =kws.pop('geo_strike', None)
if profile_angle is None and easting is None and northing is None :
mess =''.join(['None type detected for profile angle , easting and'\
' northing. NoneType can not be computed',
' will check whether geostrike is provided .'])
_logger.debug(mess)
if gstrike is None :
_logger.warning(
'NoneType found.Could not compute geo-electrike strike!')
raise CSex.pyCSAMTError_inputarguments(
'NoneType found. Could not compute geo-electrike strike!')
if profile_angle is None :
if easting is not None and northing is not None :
profile_angle ,_ = geostrike.compute_profile_angle(easting, northing)
if gstrike is None :
if 0<= profile_angle < 90 :
geo_electric_strike = profile_angle + 90
elif 90<=profile_angle < 180 :
geo_electric_strike = profile_angle -90
elif 180 <= profile_angle <270 :
geo_electric_strike = - profile_angle +90
else :
geo_electric_strike = - profile_angle -90
geo_electric_strike %= 180
if gstrike is not None : # recomputed geo_electrike strike
if 0 <= profile_angle < 90:
if np.abs(profile_angle - gstrike) < 45:
geo_electric_strike = gstrike+ 90
elif 90 <= profile_angle < 135:
if profile_angle - gstrike < 45:
geo_electric_strike = gstrike - 90
else:
if profile_angle - gstrike >= 135:
geo_electric_strike = gstrike+ 90
geo_electric_strike %= 180 # keep value of
#geoelectrike strike less than 180 degree
geo_electric_strike =np.floor(geo_electric_strike)
return geo_electric_strike, profile_angle ,\
'Profile angle is {0:+.2f} degrees E of N'.format(geo_electric_strike)
[docs]class Drill(object):
"""
This class is focus on well logs . How to generate well Log for Oasis:
Arguments
-----------
**well_filename** : string ,
The well filename. 02 options is set :
1rst option is to build well data manually and the program will
generate a report. 2nd option is to send to
the program a typical file type to be parsed . the programm parses
only the typical well datafile. If None , the program will
redirect to build mannually option .
**auto** : bool
option to automatically well data . set to True
if you want to build manually a well data .
*default* is False
==================== ========== =========================================
Key Words/Attributes Type Description
==================== ========== =========================================
utm_zone str utm WGS84 zone. should be N or S.
*default* is 49N .
compute_azimuth bool if no azimuth is provided.
set to True to letprogram to compute
azimuth .*Default* is False.
Drill_dip float The dip of drill hole.*default* is 90
Drill_buttom float The average bottom of drill ,
can be filled during the well
buiding . *default* is None
mask int the mask of DrillHole(DH) data.
*Default * is 1.
==================== ========== =========================================
================== =======================================================
Methods Description
================== =======================================================
_collar build _collar data *return* collar log dataframe
format
dhGeology build DH log geology *return* geology log dataframe.
dhSample build DH Geochemistry-Strutural sample, *return* Sample
log dataframe
dhSurveyElevAz build DH Elevation & Azimuth logs.*return * Elevation
& Azimuth dataframes
writeDHDATA output log :* return * the right log to output for
Oasis Montaj
================== =======================================================
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import Drill
>>> parser_file ='nbleDH.csv'
>>> drill_obj=Drill(well_filename=os.path.join(os.environ['pyCSAMT'],
... 'data', 'drill_example_files',parser_file),
... build_manually_welldata=False)
>>> scollar=drill._collar(DH_Top=None)
>>> sgeo=drill.dhGeology()
>>> ssam=drill.dhSample()
>>> selevaz=drill.dhSurveyElevAz( add_elevation=None,
... add_azimuth=None)
>>> swrite=drill.writeDHData(data2write ="*",
savepath =None)
"""
def __init__(self, well_filename=None , auto=True, **kwargs):
self._logging=csamtpylog.get_csamtpy_logger(self.__class__.__name__)
self.wfilename=well_filename
self.auto=auto
self.mask=kwargs.pop("mask",1)
self.utm_zone=kwargs.pop("utm_zone","49N")
self.compute_azimuth=kwargs.pop("compute_azimuth",False)
self.dip =kwargs.pop("Drill_dip",90)
self.buttom=kwargs.pop("Drill_buttom", None)
self.savepath =kwargs.pop('savepath', None )
self.easts=None
self.norths= None
self.wellnames= None
self._f=None
#populate attribute later
self.wdico={"DH_Hole" :None,
"DH_East":None,
"DH_North":None,
"Mask": None,
"DH_RH":None,
'DH_From':None ,
"DH_To": None ,
"Rock": None ,
"DH_Azimuth":None ,
'DH_Top':None,
'DH_Bottom':None,
'DH_PlanDepth':None,
'DH_Decr':None,
'Sample':None,
'DH_Dip': None,
'Elevation':None,
'DH_RL':None,
}
if self.auto is False and self.wfilename is None :
self.daTA=func.build_wellData (add_azimuth=self.compute_azimuth,
utm_zone=self.utm_zone,
report_path = self.savepath,
)
self.wdata=self.daTA[1]
self.wdico["DH_East"] = self.wdata[:,1]
self.wdico["DH_North"] = self.wdata[:,2]
self.wdico["DH_Hole"] = self.wdata[:,0]
self.wdico['DH_Dip'] = self.wdata[:,4]
self.wdico['DH_Bottom'] = self.wdata[:,3]
self.wdico['DH_Decr'] = self.wdata[:,7]
self.wdico['DH_PlanDepth'] = self.wdata[:,6]
self.wdico["DH_Azimuth"] = self.wdata[:,5]
self._f=0
elif self.wfilename is not None :
self.daTA=func.parse_wellData(filename=self.wfilename,
include_azimuth=False,
utm_zone=self.utm_zone)
self.wdata=self.daTA[1]
self.wdico.__setitem__("DH_East", self.wdata[:,1])
self.wdico.__setitem__("DH_North", self.wdata[:,2])
self.wdico.__setitem__("DH_Hole", self.wdata[:,0])
self.wdico.__setitem__('DH_Dip', self.wdata[:,3])
self.wdico.__setitem__('DH_PlanDepth', self.wdata[:,8])
self.wdico.__setitem__("DH_Azimuth", self.wdata[:,5])
self.wdico.__setitem__('DH_Decr', self.wdata[:,9])
self.wdico.__setitem__('DH_Bottom', self.wdata[:,7])
self._f=1
#set Mask and set dr_rh
self.mask=np.full((self.wdata.shape[0]),self.mask,dtype='<U12')
# print(self.mask.shape)
self.wdico.__setitem__("Mask", self.mask)
self.dh_rh=np.zeros((self.wdata.shape[0]))
self.wdico.__setitem__("DH_RH", self.dh_rh)
for keys in kwargs.keys():
self.__setattr__(keys, kwargs[keys])
def _collar(self, DH_Top=None,add_elevation =None ):
"""
Method to build Collar Data
Parameters
----------
* DH_Top : np.ndarray ,
it's the Top of data for each Hole Name.
ndaray (number of DH , 1)
*Default* is None.
Returns
-------
pd.DataFrme
collar Drillhole log
"""
if DH_Top is None :
DH_Top=np.zeros((self.wdata.shape[0]))
elif type(DH_Top) is float or type(DH_Top) is int :
DH_Top=np.full((self.wdata.shape[0]),DH_Top,dtype='<U12')
elif DH_Top is not None :
if type(DH_Top)==list:
DH_Top=np.array(DH_Top)
assert DH_Top.shape[0]==self.wdata.shape[0],'the input DH_Top '\
'shape doesnt match. The convenience '\
' shape is %d.'%self.wdata.shape[0]
# print(DH_Top)
self.wdico.__setitem__('DH_Top',DH_Top)
if self._f == 0 :
if add_elevation is None :
#No topography is added , set to 0
add_elevation=np.full((len(self.wdico['DH_East']),1),0,
dtype='<U12')
elif add_elevation is not None :
if type(add_elevation ) is list :
add_elevation =np.array(add_elevation)
assert add_elevation.shape[0]==\
self.wdico['DH_East'].shape[0],"INDEXERROR:"\
" The the current dimention of Elevation data is {0}.It's must be"\
" the size {1}.".format(
add_elevation.shape[0],self.wdico['DH_East'].shape[0])
self.wdico.__setitem__("Elevation", add_elevation)
elif self._f == 1 :
if add_elevation is not None:
if type(add_elevation ) is list :
add_elevation =np.array(add_elevation)
try :
np.concat((add_elevation,self.wdico['DH_East']))
except Exception :
mess =''.join([
'SIZEERROR! Try to set the elevation dimentional as ',
'same size like the collar data'])
self._logging.error(mess)
warnings.warn(mess)
elif add_elevation is None :
add_elevation=self.daTA [1][:,4]
self.wdico.__setitem__("Elevation", add_elevation)
collarKeys=["DH_Hole", "DH_East", "DH_North", "DH_RH",
"DH_Dip", "Elevation", "DH_Azimuth","DH_Top", "DH_Bottom",
"DH_PlanDepth", "DH_Decr", "Mask"]
# print(self.wdico)
collar=self.wdico[collarKeys[0]]
collar=collar.reshape((collar.shape[0],1))
for ss, collk in enumerate(collarKeys[1:]):
# print(collk)
for key , value in self.wdico.items():
if key == collk :
value=value.reshape((value.shape[0],1))
collar=np.concatenate((collar,value), axis=1)
self.coLLAR=pd.DataFrame(data=collar, columns=collarKeys)
return self.coLLAR
[docs] def dhGeology (self, dh_geomask=None):
"""
Method to build geology drillhole log. The name of input rock must
feell exaction accordinag to a convention AGSO file . If not sure
for the name of rock and Description and label. You may consult
the geocode folder before building the well_filename. If the entirely
rock name is given , program will search on the AGSO file the corresponding
Label and code . If the rock name is founc then it will take
its CODE else it will generate exception.
Parameters
----------
* dh_geomask : np.ndarray, optional
geology mask. send mask value can take exactly
the np.ndarray(num_of_geology set ,). The better way
to set geology maskis to fill on the wellfilename.
if not , programm will take the general mask value.
The *default* is None.
Returns
-------
pd.DataFrame
geology drillhole log.
"""
geolKeys=["DH_Hole","DH_From", "DH_To","Rock", "Sample",
"East", "DH_North", "DH_RH", "Mask"]
wgeo=self.daTA[2]
# print(wgeo)
self.wdico.__setitem__('DH_From', wgeo[:,1])
self.wdico.__setitem__('DH_To', wgeo[:,2])
self.wdico.__setitem__("Rock",wgeo[:,3])
dhgeopseudosamp=np.zeros((wgeo.shape[0]))
###### FIND AGSO MODULE #######
#Try to check the name of rocks and their acronym
geoelm=Agso._agso_on_dict_(set_agsoDataFrame=False,
return_orientation="SERIES")
# #extract elem with their acronym
geolemDico_AGSO={key:value for key , value in \
zip (geoelm["CODE"],geoelm['__DESCRIPTION'])}
# elemgeo_AGSO=sorted(geolemDico.items())
for ii, elm in enumerate (self.wdico['Rock']):
if elm.upper() in geolemDico_AGSO.keys():
pass
elif elm.upper() not in geolemDico_AGSO.keys():
if elm.lower() in geolemDico_AGSO.values():
for key, values in geolemDico_AGSO.items():
if elm.lower() == values :
self.wdico['Rock'][ii]=key
else :
mess=''.join(['The Geological Name ({0})'
' given in is wrong'.format(elm),
'Please provide a right name the right Name.',
'Please consult the AGSO file in _geocodes folder',
'without changing anything.'])
self._logging.warn(mess)
warnings.warn(mess)
######END AGS0 ########
self.dh_geoleast=np.zeros((wgeo.shape[0]))
self.dh_geol_norths=np.zeros((wgeo.shape[0]))
for ss , value in enumerate(self.dh_geoleast):
for indix, val in enumerate(self.wdico["DH_East"]):
if wgeo[:,0][ss] in self.wdico["DH_Hole"]:
value=val
self.dh_geoleast[ss] =value
self.dh_geol_norths[ss]=self.wdico["DH_North"][indix]
dhgeopseudosamp=np.zeros((wgeo.shape[0]))
if dh_geomask == None :
dh_geomask =self.mask[0]
maskgeo= np.full((wgeo.shape[0]),dh_geomask,dtype='<U12')
dhrhgeo=np.array([ -1* np.float(ii) for ii in self.wdico['DH_From']])
dhGeol=np.concatenate((wgeo[:,0].reshape(wgeo[:,0].shape[0],1),
self.wdico['DH_From'].reshape((
self.wdico['DH_From'].shape[0],1)),
self.wdico['DH_To'].reshape((
self.wdico['DH_To'].shape[0],1)),
self.wdico['Rock'].reshape((
self.wdico['Rock'].shape[0],1)),
dhgeopseudosamp.reshape((
dhgeopseudosamp.shape[0],1)),
self.dh_geoleast.reshape((
self.dh_geoleast.shape[0],1)),
self.dh_geol_norths.reshape((
self.dh_geol_norths.shape[0],1)),
dhrhgeo.reshape((dhrhgeo.shape[0],1)),
maskgeo.reshape((maskgeo.shape[0],1))),axis=1)
self.geoDHDATA=pd.DataFrame(data=dhGeol, columns=geolKeys)
return self.geoDHDATA
[docs] def dhSample (self,path_to_agso_codefile=None, dh_sampmask=None):
"""
Method to build Sample log. This method focuses on the sample obtained
during the DH trip.it may georeferenced as the well_filename needed.
A main thing is to set the AGSO_STCODES file. AGSO_STCODES is the
conventional code of structurals sample. If you have an own AGSO_STCODES ,
you may provide the path * kwargs=path_to_ags_codefile * .
the program will read and generate logs according to the DESCRIPTION
and STCODES figured. if None, the program will take it STCODES and set
the samplelogs. When you set the Sample code aor sample name ,
make sur that the name match the same name on STCODES. If not ,
program will raises an error.
Parameters
----------
* path_to_agso_codefile : str, optional
path to conventional
AGSO_STRUCTURAL CODES.
The *default* is None.
* dh_sampmask : np.ndarray, optional
Structural mask. The default is None.
Returns
-------
pd.DataFrame
Sample DH log.
"""
sampKeys=["DH_Hole","DH_From", "DH_To","Rock", "Sample",
"East", "DH_North", "DH_RH", "Mask"]
wsamp=self.daTA[3]
# print(wgeo)
if wsamp is None :
self.sampleDHDATA = None
return # mean no geochemistry sample is provided
self.wdico.__setitem__('DH_From', wsamp[:,1])
self.wdico.__setitem__('DH_To', wsamp[:,2])
self.wdico.__setitem__("Sample",wsamp[:,3])
dhsampseudorock=np.zeros((wsamp.shape[0]))
###### FIND AGSO MODULE (AGSO_STCODES) #######
#Try to check the name of sample and their acronym
if path_to_agso_codefile is None:
path_to_agso_codefile=os.path.join(os.path.abspath('.'), 'pycsamt',
'geodrill','_geocodes' )
# os.path.join(os.environ ['pyCSAMT'],
# 'geodrill','_geocodes' )
agsofilename=[file for file in os.listdir(path_to_agso_codefile)
if file =='AGSO_STCODES.csv' ]
if agsofilename is not None :
sampelm=Agso._agso_on_dict_(set_agsoDataFrame=True,
return_orientation="series",
agso_codefile=os.path.join(
path_to_agso_codefile,
agsofilename[0]))
elif agsofilename is None :
self._logging.warning(
'None AGSO_STCODES.csv file is found.'
' Please provide the right path ')
warnings.warn(
'None AGSO_STCODES.csv file is found. '
'Please provide the right path ')
elif path_to_agso_codefile is not None :
#os.chdir(os.path.dirname(path_to_agso_codefile))
sampelm=Agso._agso_on_dict_(set_agsoDataFrame=True,
return_orientation="series",
agso_codefile=path_to_agso_codefile)
# #extrcat elem with their acronym
sampelmDico_AGSO={key:value for key , value in \
zip (sampelm["CODE"],sampelm['__DESCRIPTION'])}
# elemgeo_AGSO=sorted(geolemDico.items())
for ii, elm in enumerate (self.wdico['Sample']):
if elm.lower() in sampelmDico_AGSO.keys():
pass
elif elm.lower() not in sampelmDico_AGSO.keys():
if elm in sampelmDico_AGSO.values():
for key, values in sampelmDico_AGSO.items():
if elm == values :
self.wdico['Sample'][ii]=key
else :
mess=''.join([
'The Sample Name({0}) given in is wrong'.format(elm),
'Please provide a right name the right Name.',
'Please consult the AGSO_STCODES.csvfile in _geocodes folder',
'without changing anything.'])
self._logging.warn(mess)
warnings.warn(mess)
######END AGS0_STCODES ########
dh_sampeast=np.zeros((wsamp.shape[0]))
dh_sampnorths=np.zeros((wsamp.shape[0]))
for ss , value in enumerate(dh_sampeast):
for indix, val in enumerate(self.wdico["DH_East"]):
if wsamp[:,0][ss] in self.wdico["DH_Hole"]:
value=val
dh_sampeast[ss] =value
dh_sampnorths[ss]=self.wdico["DH_North"][indix]
dhsampseudorock=np.zeros((wsamp.shape[0]))
if dh_sampmask == None :
dh_sampmask =self.mask[0]
masksamp= np.full((wsamp.shape[0]),dh_sampmask,dtype='<U12')
dhrhsamp=np.array([ -1* np.float(ii) for ii in self.wdico['DH_From']])
dhSample=np.concatenate((wsamp[:,0].reshape(wsamp[:,0].shape[0],1),
self.wdico['DH_From'].reshape(
(self.wdico['DH_From'].shape[0],1)),
self.wdico['DH_To'].reshape(
(self.wdico['DH_To'].shape[0],1)),
dhsampseudorock.reshape(
(dhsampseudorock.shape[0],1)),
self.wdico['Sample'].reshape(
(self.wdico['Sample'].shape[0],1)),
dh_sampeast.reshape(
(dh_sampeast.shape[0],1)),
dh_sampnorths.reshape(
(dh_sampnorths.shape[0],1)),
dhrhsamp.reshape((dhrhsamp.shape[0],1)),
masksamp.reshape((masksamp.shape[0],1))),axis=1)
self.sampleDHDATA=pd.DataFrame(data=dhSample, columns=sampKeys)
return self.sampleDHDATA
[docs] def dhSurveyElevAz(self, add_elevation=None, add_azimuth=None, **kwargs):
"""
Method to build Elevation & Azimuth DH logs. if add_elevation and .
add_azimuth are set . The programm will ignore the computated azimuth,
and it will replace to the new azimuth provided . all elevation will
be ignore and set by the new elevation . *kwargs arguments
{add_elevation , add-azimuth } must match the same size like the
number of Drillholes . Each one must be on ndarray(num_of_holes, 1).
Parameters
----------
* add_elevation : np.nadarray , optional
elevation data (num_of_holes, 1)
The *default* is None.
* add_azimuth : np.ndarray , optional
azimuth data (num_of_holes,1).
The *default* is None.
* DH_RL :np.float or np.ndarray(num_of_hole,1),
if not provided , it's set to 0. means No topography is added'.
Returns
-------
pd.Dataframe
Elevation DH log .
pd.DataFrame
Azimuth DH log.
"""
dh_rl=kwargs.pop("DH_RL",None)
# sizep=self.wdico['DH_East'].shape[0]
if self._f == 0 :
if add_elevation is None :
#No topography is added , set to 0
add_elevation=np.full((len(self.wdico['DH_East']),1),0,
dtype='<U12')
elif add_elevation is not None :
if type(add_elevation ) is list :
add_elevation =np.array(add_elevation)
assert add_elevation.shape[0]==self.wdico[
'DH_East'].shape[0],"INDEXERROR:"\
" The the current dimention of Elevation data is {0}.It's must be"\
" the size {1}.".format(
add_elevation.shape[0],self.wdico['DH_East'].shape[0])
self.wdico.__setitem__("Elevation", add_elevation)
elif self._f == 1 :
if add_elevation is not None:
if type(add_elevation ) is list :
add_elevation =np.array(add_elevation)
try :
np.concat((add_elevation,self.wdico['DH_East']))
except :
mess= ''.join([
'SIZEERROR! Try to set the elevation dimentional. ',
'same like the collar data '])
self._logging.error(mess)
warnings.warn(mess)
elif add_elevation is None :
add_elevation=self.daTA [1][:,4]
self.wdico.__setitem__("Elevation", add_elevation)
#set DH_RL
if dh_rl is not None :
if type (dh_rl) is list :
dh_rl=np.array (dh_rl)
assert dh_rl.shape[0]==self.data.shape[0]," DH_RL data size is out"\
" of the range.Must be {0}".format(self.data.shape[0])
self.wdico.__setitem__("DH_RL",dh_rl)
elif dh_rl is None :
#if None set DH_RL to None :
self.wdico.__setitem__("DH_RL",np.full(
(self.daTA[1].shape[0]),0,dtype='<U12'))
#set azimuth
if add_azimuth is not None :
if type(add_azimuth) ==list :
add_azimuth=np.array(add_azimuth)
assert add_azimuth.shape[0]==self.data.shape[0]," Azimuth data size is out"\
" of the range.Must be {0}".format(self.data.shape[0])
self.wdico.__setitem__("DH_Azimuth",add_azimuth)
elif add_azimuth is None :
pass
elevazKeys=['DH_Hole', 'Depth','DH_East',
'DH_North','Elevation','DH_RL','DH_Dip']
self.wdico.__setitem__("DH_RL",np.full(
(self.daTA[1].shape[0]),0,dtype='<U12'))
# add Hole and Depth
surveyELEV =np.concatenate((self.wdico['DH_Hole'].reshape(
(self.wdico['DH_Hole'].shape[0],1)),
self.wdico["DH_Bottom"].reshape(
(self.wdico["DH_Bottom"].shape[0],1))),
axis=1)
surveyAZIM=np.concatenate((self.wdico['DH_Hole'].reshape(
(self.wdico['DH_Hole'].shape[0],1)),
self.wdico["DH_Bottom"].reshape(
(self.wdico["DH_Bottom"].shape[0],1))),
axis=1)
for ss , elm in enumerate (elevazKeys[2:]):
for key, values in self.wdico.items():
if elm==key :
values=values.reshape((values.shape[0],1))
if elm =='DH_RL'or elm=='DH_Dip':
# print(values)
surveyAZIM=np.concatenate((surveyAZIM,values),axis=1)
elif elm=='Elevation':
surveyELEV =np.concatenate((surveyELEV,values),axis=1)
else:
surveyAZIM=np.concatenate((surveyAZIM,values),axis=1)
if ss < elevazKeys.index('Elevation')-1:
surveyELEV =np.concatenate((surveyELEV,values),axis=1)
self.surveyDHELEV=pd.DataFrame(
data=surveyELEV, columns=elevazKeys[:5])
# pop the elevation elm on the list
[elevazKeys.pop(ii) for ii, elm in
enumerate(elevazKeys) if elm=='Elevation']
self.surveyDHAZIM=pd.DataFrame(data=surveyAZIM,
columns=elevazKeys)
return (self.surveyDHELEV, self.surveyDHAZIM)
[docs] def writeDHData (self, data2write =None ,**kwargs):
"""
Method to write allDH logs. It depends to the users to sort which data
want to export and which format. the program support only two format
(.xlsx and .csv) if one is set , it will ouptput the convenience format.
Users can give a list of the name of log he want to export.
Program is dynamic and flexible. It tolerates quite symbols number to
extract data logs.
Parameters
----------
* data2write : str or list , optional
the search key. The default is None.
* datafn :str
savepath to exported file
*Default* is current work directory.
* write_index_on_sheet : bool,
choice to write the sheet with pandas.Dataframe index.
* writeType : str ,
file type . its may *.csv or *.xlsx .
*Default* is *.xlsx
* add_header : bool,
add head on exported sheet. set False to mask heads.
*Default* is True.
* csv_separateType : str ,
Indicated for csv exported files ,
the type of comma delimited . defaut is ','.
"""
savepath =kwargs.pop("savepath",None )
writeIndex=kwargs.pop('write_index_on_sheet',False)
writeType =kwargs.pop('writeType', 'xlsx')
# csvencoding =kwargs.pop('encoding','utf-8')
csvsetHeader =kwargs.pop('add_header',True)
csvsep =kwargs.pop('csv_separateType',',')
wDATA ={"collar": self._collar,
"geology": self.dhGeology,
'sample':self.dhSample,
'elevazim':self.dhSurveyElevAz}
_all=['5',"all","__all__",'CollGeoSampElevAz','CGSAZ','cgsaz',
['Collar','Geology','Sample','Elevation','Azimuth'],
'colgeosamelevaz','alldata','*']
df_collar=wDATA['collar']()
df_geology=wDATA['geology']()
df_sample=wDATA['sample']()
df_elevation,df_azimuth=wDATA['elevazim']()
# for df_ in [df_collar, df_geology, df_sample,
# df_elevation,df_azimuth]:
# df_.set_index(setIndex) # this is unnecessary
_dHDico ={'collar': [['1','c'], df_collar],
'geology':[['2','g'],df_geology],
'sample': [['3','s'],df_sample],
'survey_elevation':[['4','elev', 'topo','topography','e'],
df_elevation],
'survey_azimuth': [['5','-1','azim','a'],df_azimuth]}
# skip the sample building geochemistry doesnt exists
if self.sampleDHDATA is None :
data2write =['1','2','4','5']
if data2write is None or data2write in _all : # write all
with pd.ExcelWriter(''.join([self.daTA[0][:-1],'.xlsx'])) as writer :
for keys, df_ in _dHDico.items():
df_[1].to_excel(writer,sheet_name=keys, index =writeIndex)
elif data2write is not None :
if type(data2write) is not list:
data2write=str(data2write)
try :
if writeType in ['xlsx','.xlsx', 'excell',
'Excell','excel','Excel','*.xlsx']:
for keys, df in _dHDico.items():
if data2write ==keys or data2write.lower(
) in keys or data2write in df[0]:
df[1].to_excel('.'.join(
[self.daTA[0][:-1],'xlsx']),
sheet_name=keys,index =writeIndex)
elif writeType in ['csv','.csv', 'comma delimited','*.csv',
'comma-separated-value',
'comma seperated value',
'comsepval']:
# print('passed')
for keys, df_ in _dHDico.items():
if data2write == keys or data2write.lower(
) in keys or data2write in df_[0]:
df_[1].to_csv(''.join(
[self.daTA[0][:-1],'.csv']),
header=csvsetHeader,
index =writeIndex,sep=csvsep)
except Exception as error :
self._logging.error (
'The type you provide as WriteType argument is wrong.'
' Support only *.xlsx and *.csv format',error)
warnings.warn (
'Argument writeType support only [xlsx or csv] format.'
' Must change your *.{0} format'.format(writeType))
elif type(data2write) is list :
data2write=[str(elm) for elm in data2write] # check the string format
with pd.ExcelWriter(''.join(
[self.daTA[0][:-1],'xlsx'])) as writer :
for ii, df in enumerate (data2write):
for keys, df__ in _dHDico.items():
if df.lower() in keys or df in df__[0] :
df__[1].to_excel(
writer,sheet_name=keys, index =writeIndex)
else :
self._logging.error (
'The key you provide as agrument of data2write is wrong. '
'the data2write argument should be either [collar, geology,'
' sample, elevation, azimuth] or all (*). ')
warnings.warn (
'Wrong format of input data2write ! Argument dataType is str,'
' or list of string element choosen among [collar, geology,'
'sample, elevation, azimuth] or all (*),'
' not {0}'.format(data2write))
# export to savepath
if savepath is not None : self.savepath = savepath
# create a folder in your current work directory
if self.savepath is None :
try :
self.savepath = os.path.join(os.getcwd(), '_outputDH_')
if not os.path.isdir(self.savepath):
os.mkdir(self.savepath)# mode =0o666)
except :
warnings.warn("It seems the path already exists !")
if self.savepath is not None :
import shutil
if writeType in ['csv','.csv', 'comma delimited',
'comma-separated-value','comma sperated value',
'comsepval']:
shutil.move ( os.path.join(os.getcwd(),
''.join(
[self.daTA[0][:-1],'csv'])),
self.savepath)
print('---> Borehole output <{0}> has been written to {1}.'.\
format(os.path.basename(
''.join([self.daTA[0][:-1],'.csv'])), self.savepath))
elif writeType in ['xlsx','.xlsx', 'excell','Excell','excel','Excel']:
shutil.move (os.path.join(os.getcwd(),
'.'.join([self.daTA[0][:-1],'xlsx'])),
self.savepath)
print('---> Borehole output <{0}> has been written to {1}.'.\
format(os.path.basename(
''.join([self.daTA[0][:-1],'xlsx'])), self.savepath))
#------Usefull functions --------------------------
[docs]def get_closest_value (values_range, input_value):
"""
Fonction to get closest values when input values is not in the values range
we assume that values are single on array. if the same value is repeated
will take the first index and the value at that index
:param values_range: values to get
:type values_range: array_like
:param input_value: specific value
:type input_value: float,
:returns: the closest value and its index
:rtye: float
"""
values_range = np.array(values_range)
if np.all(values_range <0) : # if all element less than zero than convert
#inoput value to negative value
if input_value >0 : input_value *=-1 # for depth select purpose
if input_value < values_range.min():
print('--> ! Input value ={0} is out the range, min value = {1}. Value'\
' should be reset to ={1}.'.format(input_value, values_range.min()))
warnings.warn('Input value ={0} is out the range ! min value = {1}'.\
format(input_value, values_range.min()))
_logger.debug('Input value ={0} is out the range ! min value = {1}'.\
format(input_value, values_range.min()))
input_value = values_range.min()
elif input_value > values_range.max():
warnings.warn('Input value ={0} is out the range ! max value = {1}'.\
format(input_value, values_range.max()))
_logger.debug('Input value ={0} is out the range ! max value = {1}'.\
format(input_value, values_range.max()))
input_value = values_range.max()
print('!--> Input value ={0} is out the range , min value = {1}. Value'\
' should be reset to ={1}.'.format(input_value, values_range.max()))
if input_value in values_range :
indexes,*_= np.where(values_range==input_value)
if len(indexes)==1 :
index =int(indexes )
elif len(indexes)>1 : # mean element is repeated then take the first index
index = int(indexes)[0]
value = values_range[index]
return value , index
elif values_range.min() < input_value < values_range.max():
# values_range = sorted(values_range)
for ii, xx in enumerate(values_range):
if xx > input_value :
#compute distance :
d0 = abs(input_value-xx) # make the diffence between distances
d1= abs(values_range[ii-1]-input_value) # nad take the short
if d0 < d1 :
return xx , ii
elif d0> d1 :
return values_range[ii-1], ii-1
[docs]def ascertain_layers_with_its_resistivities(real_layer_names ,
real_layer_resistivities):
"""
Assert the length of of real resistivites with their corresponding layers.
If length of resistivities of larger that the layer's names, then will add
layer that match the best the remained resistivities. If the length
of layer is larger than resistivities , to avoid miscomputation , will
cut out this more layer and work only the length of resistivities provided.
Parameters
----------
* real_layer_names: array_like , list
list of input layer names as real
layers names encountered in area
* real_layer_resistivities :array_like , list
list of resistivities get on survey area
Returns
--------
list
real_layer_names, new list of input layers
"""
# for consistency put on string if user provide a digit
real_layer_names =[str(ly) for ly in real_layer_names]
if len(real_layer_resistivities) ==len(real_layer_names):
return real_layer_names
elif len(real_layer_resistivities) > len(real_layer_names):
# get the last value of resistivities to find the structres
# names ans structures sresistivities
sec_res = real_layer_resistivities[len(real_layer_names):]
# get the name of structure as possible
geos =Geodrill.get_structure(resistivities_range=sec_res)
if len(geos)>1 : tm = 's'
else :tm =''
print('---> !We added other {0} geological struture{1}.'\
' You may ignore it.'.format(len(geos), tm))
real_layer_names.extend(geos)
return real_layer_names
elif len(real_layer_names) > len(real_layer_resistivities):
real_layer_names = real_layer_names[:len(real_layer_resistivities)]
return real_layer_names
[docs]def geo_length_checker(main_param, optional_param, force =False,
param_names =('input_resistivities', 'input_layers'), **kws):
"""
Geo checker is a function to check differents length of different params.
the length of optional params depend of the length of main params . if
the length of optional params is larger than the length of main
params, the length of optional params will be reduce to the length of
main params .If the optional params length is shorther than the length of
main params, will filled it either with "None" if dtype param is string
of 0.if dtype params is float or 0 if integer.if Force is set True ,
it will absolutely check if the main params and the optional params have
the same length. if not the case , will generate an error .
Parameters
------------
* main_param : array_like, list
main parameter that must took
its length as reference length
* optional params : array_like, list
optional params, whom length depend
to the length of main params
* param_names : tuple or str
names of main params and optional params
so to generate error if exits.
* fill_value: str, float, optional
Default value to fill thearray in the case where
the length of optional param is
less than the length of the main param .If None ,
will fill according to array dtype
Returns
--------
array_like
optional param truncated according to the man params
"""
add_v =kws.pop('fill_value', None)
if isinstance(main_param, (str, float, int)):
main_param = np.array([main_param])
if isinstance(optional_param, (str, float, int)):
optional_param = np.array([optional_param])
if isinstance(main_param, (list, tuple)):
main_param =np.array(main_param)
if isinstance(optional_param, (list, tuple)):
optional_param =np.array(optional_param)
mes=''
if len(optional_param) > len(main_param):
mes ="---> Note ! {0} will be truncated to length = {1} '\
as the same length of {2} .".\
format(param_names[1], len(main_param),param_names[0] )
warnings.warn(mes)
optional_param= optional_param[:len(main_param)]
if force is True :
mess = '--> force argument is set <True>, Can not truncate {0}'\
' = {1} to match the length of {2} = {3}.'\
.format(param_names[1], len(param_names[1]), param_names[0],
len(param_names[0]))
raise CSex.pyCSAMTError_parameter_number(mess)
elif len(optional_param) < len(main_param) :
if force is True :
mess = '--> force argument is set <True>, Can not fill value of {0} '\
'to match the length of {1} = {2}.'\
.format(param_names[1], param_names[0], len(param_names[0]))
raise CSex.pyCSAMTError_parameter_number(mess)
if add_v is not None :
add_v =[add_v for vv in range(
len(main_param)-len(optional_param))] # repeat the value to add
add_v = np.array(add_v)
if add_v is None :
if optional_param.dtype not in [ 'float', 'int'] :
add_v =['None' for i in range(
len(main_param)-len(optional_param))]
else :
for type_param, fill_value in zip([ 'float', 'int'],[ 0., 0] ):
if type_param == optional_param.dtype :
add_v =[fill_value for i in range(
len(main_param)-len(optional_param))]
mes ="--> !Note Length of {0} is ={1} which length of {2} is ={3}.'\
We'll add {4} to fill {5} value.".\
format(param_names[1], len(optional_param), param_names[0],
len(main_param), add_v[0], param_names[1])
warnings.warn(mes)
optional_param= optional_param.tolist()
optional_param.extend(add_v)
return np.array(optional_param)
[docs]@geoplot2d(reason='model', cmap='jet_r', plot_style ='pcolormesh')
def geoModel( **kwargs ):
"""
Get the type of geoData and plot the model . func will call the decorator
`pycsamt.wiewer.plot.geoplot2d`. Decorator can be filled by using matplotlib
properties. To plot geomodel `misfit `, please
change the `reason` argument to `misfit`.
:see also:: see documentation of decorator `_geoplot2d.__doc__`
:param kwargs: kwargs arguments of "Geodrill obj"
:type model_fn: dict
:param etaCoef: cofficent to divide the model blocks
:type etaCoef: int
:param input_resistivities: Truth values of resistivities
:type input_resistivities: array_like or list
:param plot_misfit: plot misfit of geodata
:type plot_misfit: bool
============== ========= ================================================
Params Type Description
============== ========= ================================================
geodtype str Plot geodata type, can be `rr` for stratigraphy
log, `sd` for roughnness model
and `aver` for rho average model.
*default*plot is `rr`.
iter_fn str full path to occam iteration file
mesh_fn str full path to mesh_fn file
data_fn str full path to occam_data file
doi str depth of investigation might
be float or str like "1km" =1000
depth_scale str scale of imaging depth can be
"km" or "m". *Default* is"m"
step_descent float block size for roughning , if not
provided the step will be 20% of D0I
input_layers list true input_layers names : geological
informations of encountered layers
============== ========= ================================================
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import geoModel
>>> path=r'F:\ThesisImp\occam2D\invers+files\inver_res\K1'
>>> inversion_files = {'model_fn':'Occam2DModel',
... 'mesh_fn': 'Occam2DMesh',
... "iter_fn":'ITER17.iter',
... 'data_fn':'OccamDataFile.dat'
... }
>>> input_resistivity_values =[10, 66, 70, 180, 1000, 2000,
... 3000, 7000, 15000 ]
>>> input_layer_names =['river water', 'fracture zone', 'granite']
>>> inversion_files = {key:os.path.join(path, vv) for key,
... vv in inversion_files.items()}
>>> geoModel(**inversion_files,
... input_resistivities=input_resistivity_values,
... input_layers=input_layer_names, geodtype ='rr',
... plot_misfit=True
... )
"""
depth_scale =kwargs.pop('scale', 'm')
geodtype=kwargs.pop('kind', 'rr')
plot_misfit = kwargs.pop('plot_misfit', False)
#create geodrill obj
csamtpylog().get_csamtpy_logger().info(
'Plot Geodatatype = {}'.format(geodtype))
geodrill_obj = Geodrill(**kwargs)
geodrill_obj.geo_build_strata_logs()
if depth_scale is None : depth_scale= 'm'
matrix_rhoaver ,matrix_rhorr, \
matrix_rho_stepdescent, raw_model =[[]for i in range(4)]
for site in geodrill_obj.station_names :
if site ==geodrill_obj.station_names[0]:
# station id to read is the first index =0
index =0
if site ==geodrill_obj.station_names [-1] :
index =2 # station id to read is the 2index
else : index =1 # read th middle array
matrix_rhorr.append(geodrill_obj.geo_drr[site][:, index])
matrix_rhoaver.append(geodrill_obj.geo_daver[site][:, index])
matrix_rho_stepdescent.append(
geodrill_obj.geo_dstep_descent[site][:, index ])
raw_model.append(geodrill_obj.geo_d[site][:, index])
# build a matrix of all data collected
if geodtype =='sd':
geo_rho_data= func.concat_array_from_list(
matrix_rho_stepdescent, concat_axis=1)
elif geodtype=='aver':
geo_rho_data= func.concat_array_from_list(
matrix_rhoaver, concat_axis=1)
else :
geo_rho_data= func.concat_array_from_list(
matrix_rhorr, concat_axis=1)
# convert data into log10 values
# compute model_misfit
if plot_misfit is True :
print('You are ploting geoDataType = {} misfit ! '.format(geodtype))
csamtpylog().get_csamtpy_logger().info(
'Plot {0} misfit is running !'.format(geodtype))
raw_model = func.concat_array_from_list(
raw_model, concat_axis=1)
model_misfit = .01* np.sqrt (
(raw_model - geo_rho_data)**2 /raw_model**2 )
print('{0:-^77}'.format('GeoMisfit info'))
print('** {0:<37} {1} {2} {3}'.format('Misfit max ','=',
model_misfit.max()*100., '%' ))
print('** {0:<37} {1} {2} {3}'.format('Misfit min','=',
model_misfit.min()*100., '%' ))
print('-'*77)
geo_rho_data= model_misfit *100.
else :
geo_rho_data = np.log10(geo_rho_data)
return (geo_rho_data, geodrill_obj.station_names,
geodrill_obj.station_location,
geodrill_obj.geo_depth, geodrill_obj.doi, depth_scale,
geodrill_obj.model_rms, geodrill_obj.model_roughness , plot_misfit )
[docs]class GeoStratigraphy(Geodrill):
"""
Inherit the :class:`pycsamt.geodrill.geoCore.geodrill` to create new model
NM using the model get from occam 2D inversion results.
The challenge of this class is firstly to delineate with much
accuracy the existing layer boundary (top and bottom) and secondly,
to predict the stratigraphy log before the drilling operations at each
station. Moreover, it’s a better way to select the right drilling location
and also to estimate the thickness of existing layer such as water table
layer as well as to figure out the water reservoir rock in the case of
groundwater exploration.
Arguments
----------
**crm** : str,
full path to Occam model file.
**eta** : int,
Value to divide into the CRM blocks to improve
the computation times. default is`5`
**n_epochs** : int,
Number of iterations. default is `100`
**tres** : array_like,
Truth values of resistivities. Refer to
:class:`~.geodrill.Geodrill` for more details
**ptols**: float,
Existing tolerance error between the `tres` values given and
the calculated resistivity in `crm`
**input_layers** : list or array_like
True input_layers names : geological
informations of collected in the area.
Hold the attributes from :class:`~.geodrill.Geodrill`
============= =========== ===============================================
Attributes Type Explanation
============= =========== ===============================================
kind str Kind of model function to compute the best fit
model to replace the value in `crm` . Can be
'linear' or 'polynomial'. if `polynomial` is
set, specify the `degree. Default is 'linear'.
alpha float Learing rate for graident descent computing.
*Default* is ``1e+4`` for linear. If `kind` is
set to `polynomial` the default value should
be `1e-8`.
degree int Polynomail function degree to impelment
gradient descent algorithm. If `kind` is set to
`Polynomial` the default `degree` is 3.
and details sequences
nm ndarray The NM matrix with the same dimension with
`crm` model blocks.
============= =========== ===============================================
:Example:
>>> from pycsamt.geodrill.geoCore import Geostratigraphy
>>> path=r'F:\ThesisImp\occam2D\invers+files\inver_res\K4'
>>> inversion_files = {'model_fn':'Occam2DModel',
'mesh_fn': 'Occam2DMesh',
"iter_fn":'ITER27.iter',
'data_fn':'OccamDataFile.dat'
}
>>> input_resistivity_values =[10, 66, 70, 180, 1000, 2000,
3000, 50, 7
# 7000,
# 15000
]
>>> input_resistivity_values =[10, 66, 70, 180, 1000, 2000,
... 3000, 7000, 15000 ]
>>> input_layer_names =['river water', 'fracture zone', 'granite']
>>> inversion_files = {key:os.path.join(path, vv) for key,
vv in inversion_files.items()}
>>> geosObj = GeoStratigraphy(**inversion_files,
input_resistivities=input_resistivity_values)
>>> zmodel = geosObj._zmodel
>>> geosObj._createNM(ptol =0.1)
>>> geosObj.nm
"""
def __init__(self, crm=None, eta=5, ptol=0.1 , n_epochs=100, **kwargs):
Geodrill.__init__(self, **kwargs)
self.crm =crm
self._eta =eta
self._ptol =ptol
self._n_epochs = n_epochs
self._tres = kwargs.pop('tres', None)
self._alpha = kwargs.pop('alpha_', 1e-4)
self._kind =kwargs.pop('kind', 'linear')
self._degree = kwargs.pop('degree', 1)
self.s0 =None
self._zmodel =None
self.nm= None
self.z =None
self.nmSites=None
self.crmSites=None
if self.model_res is not None :
self.crm = self.model_res
self.s0= np.zeros_like(self.model_res)
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.input_resistivities is not None:
self.tres = self.input_resistivities
if self.crm is not None:
self._makeBlock()
@property
def n_epochs(self):
""" Iteration numbers"""
return self._n_epochs
@n_epochs.setter
def n_epochs(self, n_iterations):
""" n_epochs must be in integers value and greater than 0"""
try :
self._n_epochs = int(n_iterations)
except:
TypeError('Iteration number must be `integer`')
else:
if self._n_epochs <=0:
self._logging.debug(
" Unaceptable iteration value! Must be a positive value not "
f"{'a negative value.' if self._n_epochs <0 else 'equal to 0.'}")
warnings.warn(" {self._n_epochs} is unaceptable value."
" Could be resset to the default value=100.")
self._n_epochs = 100
@property
def eta (self):
""" Block constructor param"""
return self._eta
@eta.setter
def eta(self, eta0 ):
""" Block constructor must be interger value."""
try :
self._eta = int(eta0)
except Exception:
raise TypeError
else:
if self._eta <=0 :
self._logging.debug(
f'{self._eta} is unaceptable. Could resset to 5.')
warnings.warn(
f'`{self._eta}` is unaceptable. Could resset to 5.')
self._eta= 5
@property
def ptol(self) :
""" Tolerance parameter """
return self._ptol
@ptol.setter
def ptol(self, ptol0):
""" Tolerance parameter must be different to zero and includes
between 0 and 1"""
try :
self._ptol =float(ptol0)
except Exception :
TypeError
else :
if 0 >= self._ptol >1:
self._logging.debug(f"Tolerance value `{self._ptol}` is "
"{'greater' if self._ptol >1 else 'is unacceptable value'}`."
"Could resset to 10%")
warnings.warn(
f'Tolerance value `{self._ptol}` is unacceptable value.'
'Could resset to 10%')
self._ptol = 0.1
@property
def tres(self):
""" Input true resistivity"""
return self._tres
@tres.setter
def tres(self, ttres):
""" Convert Tres to log 10 resistivity """
try :
self._tres =[np.log10(t) for t in ttres]
except :
raise ValueError('Unable to convert TRES values')
def _createNM(self, crm =None, eta =5 , ptol= 0.1, **kws):
""" Create NM through the differents steps of NM creatings.
- step 1 : sof minimal computing
- step2 : model function computing
:param crm: calculated resistivity model blocks
:param eta: number of block to build.
:param ptol: Error tolerance parameters
"""
def s__auto_rocks (listOfauto_rocks):
""" Automatick rocks collected during the step 3
:param listOfauto_rocks: List of automatic rocks from differents
subblocks.
:returns:rocks sanitized and resistivities.
"""
listOfauto_rocks= np.concatenate((listOfauto_rocks), axis =1)
rho_= listOfauto_rocks[1, :]
rho_=np.array([float(ss) for ss in rho_])
r_= list(set(listOfauto_rocks[0, :]))
hres= np.zeros((len(r_), 1))
h_= []
for ii, rock in enumerate(r_):
for jj, ro in enumerate(listOfauto_rocks[0, :]):
if rock == ro:
h_.append(rho_[jj])
m_= np.array(h_)
hres[ii]= m_.mean()
h_=[]
return r_, hres
iln =kws.pop('input_layers', None)
tres =kws.pop('tres', None)
subblocks =kws.pop('subblocks', None)
disp= kws.pop('display_infos', True)
hinfos =kws.pop('headerinfos', ' Layers [auto=automatic]')
if subblocks is not None: self.subblocks = self.subblocks
if iln is not None: self.input_layers = iln
if tres is not None: self.tres = tres
if crm is not None: self.crm = crm
if eta is not None: self.eta = eta
if ptol is not None: self.ptol = ptol
self.s0 , errors=[], []
#step1 : SOFMINERROR
for ii in range(len(self.subblocks)):
s1, error = self._softminError(subblocks= self.subblocks[ii])
self.s0.append(s1)
errors.append(error)
#step2 : MODELFUNCTION USING DESCENT GRADIENT
for ii in range(len(self.s0)):
if 0 in self.s0[ii][:, :]:
s2, error = self._modelFunc(subblocks =self.subblocks[ii],
s0= self.s0[ii])
self.s0[ii]=s2
errors[ii]= error
arp_=[]
#Step 3: USING DATABASE
for ii in range(len(self.s0)):
if 0 in self.s0[ii][:, :]:
s3, autorock_properties= self._createAutoLayer(
subblocks=self.subblocks[ii], s0=self.s0[ii] )
arp_.append(autorock_properties)
self.s0[ii]=s3
# Assembly the blocks
self.nm = np.concatenate((self.s0))
self.z=self.nm[:, 0]
self.nm = self.nm[:, 1:]
# make site blocks
self.nmSites= makeBlockSites(x_nodes=self.model_x_nodes,
station_location= self.station_location,
block_model=self.nm )
self.crmSites = makeBlockSites(x_nodes=self.model_x_nodes,
station_location= self.station_location,
block_model=self.model_res)
#Update TRES and LN
gammaL, gammarho = s__auto_rocks(arp_)
if self.input_layers is not None:
print_layers = self.input_layers + [ ' {0} (auto)'.format(l)
for l in gammaL ]
self.input_layers = self.input_layers + gammaL
self.tres = self._tres + list (np.power(10,
np.array([float(rv) for rv in gammarho])))
# display infos
if disp:
display_infos(infos=print_layers,
header= hinfos)
#STEP 4: Train ANN: see pycsamt.geodrill.geoDB.ann.py to predict your
#layer: No need to plot the NM
return self.nm
def _softminError(self, subblocks=None, **kws ):
"""
Replace the calculated resistivity by the true resistivity
using the soft minimal error (ξ)
:param crm: Is the calculated resistivity model from Occam2D
inversion results
"""
buffer =self.ptol +1 #bufferr error
s0 = np.zeros_like(subblocks.T)
s0[:, 0] = subblocks.T[:, 0]
_z = subblocks[:, 0]
subblocks = subblocks[:, 1:]
# Hold the columns of depth values
s0 = np.zeros_like(subblocks.T)
error =[]
for ii in range(subblocks.shape[1]): # hnodes N
for jj in range(subblocks.shape[0]): # znodes V
for k in range(len(self.tres)) :
sfme_k = (subblocks.T[ii, jj]-self.tres[k])**2\
/subblocks.T[ii, jj] **2
error.append(sfme_k )
if sfme_k <= self.ptol :
if sfme_k < buffer : # keep the best minimum
buffer = sfme_k
s0[ii, jj] = self.tres[k]
buffer = self.ptol +1 # initilize buffer
s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1)
return s0, error
def _modelFunc(self, tres=None, subblocks=None, s0=None, ptol = None,
kind='linear', **kwargs ):
"""The second step introduces the model function F=W∙Z where W
contains the weights of parameters number and Z is V×2 matrix
that contains a “bias” column. If the parameter number P equal to two,
the model function f(z)=∑_(p=1)^P▒〖w_(p-1) z^(p-1) 〗 becomes a
linear function with 〖f_1〗^((1) ) (z)= wz+r_0 with w_1=w and w_0=r_0
he gradient descent algorithm is used to find the best parameters w
and r_0 that minimizes the MSE loss function J .
:param subblocks: `crm` block
:param s0: blocks from the first step :meth:`~._sofminError`
:param kind: Type of model function to apply. Can also be
a `polynomial` by specifying the `degree`
into argument `degree`.
:Example:
>>> geosObj = GeoStratigraphy(**inversion_files,
input_resistivities=input_resistivity_values)
>>> ss0, error = geosObj._modelFunc(subblocks=geosObj.subblocks[0],
s0=geosObj.s0[0])
"""
if tres is not None: self.tres = tres
if ptol is not None: self.ptol = ptol
alpha = kwargs.pop('alpha', None)
if alpha is not None: self._alpha = alpha
n_epochs =kwargs.pop('n_epochs', None)
if n_epochs is not None: self.n_epochs = n_epochs
kind = kwargs.pop('kind', None)
if kind is not None: self._kind = kind
degree = kwargs.pop('degree', None)
if degree is not None: self._degree = degree
buffer =self.ptol +1 #bufferr error
_z= s0[:, 0]
s0 = s0[:, 1:].T
subblocks=subblocks[:, 1:].T
error =[]
for ii in range(s0.shape[0]): # hnodes N
F, *_= self.gradient_descent(z=_z,s=subblocks[ii,:],
alpha_= self._alpha,
n_epochs= self.n_epochs,
kind= self._kind)
for jj in range(s0.shape[1]): # znodes V
if s0[ii, jj] ==0. :
rp =F[jj]
for k in range(len(self.tres)) :
sfme_k = (rp -self.tres[k])**2\
/rp**2
_ermin = abs(rp-subblocks[ii, jj])
error.append(sfme_k)
if sfme_k <= self.ptol and _ermin<= self.ptol:
if sfme_k < buffer : # keep the best minimum
buffer = sfme_k
s0[ii, jj]= self.tres[k]
buffer = self.ptol +1 # initilize buffer
s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1)
return s0, error
[docs] @staticmethod
def gradient_descent(z, s, alpha_, n_epochs, **kws):
""" Gradient descent algorithm to fit the best model parameter.
:param z: vertical nodes containing the values of depth V
:param s: vertical vector containin the resistivity values
:param alpha_: step descent parameter or learning rate.
*Default* is ``0.01`
:param n_epochs: number of iterations. *Default* is ``100``
Can be changed to other values
:returns:
- `F`: New model values with the best `W` parameters found.
- `W`: vector containing the parameters fits
- `cost_history`: Containing the error at each Itiretaions.
:Example:
>>> z= np.array([0, 6, 13, 20, 29 ,39, 49, 59, 69, 89, 109, 129,
149, 179])
>>> res= np.array( [1.59268,1.59268,2.64917,3.30592,3.76168,
4.09031,4.33606, 4.53951,4.71819,4.90838,
5.01096,5.0536,5.0655,5.06767])
>>> fz, weights, cost_history = gradient_descent(z=z, s=res,
n_epochs=10,
alpha_=1e-8,
degree=2)
>>> import matplotlib.pyplot as plt
>>> plt.scatter (z, res)
>>> plt.plot(z, fz)
"""
kind_=kws.pop('kind', 'linear')
kind_degree = kws.pop('degree', 1)
if kind_degree >1 : kind_='poly'
if kind_.lower() =='linear':
kind_degree = 1
elif kind_.lower().find('poly')>=0 :
if kind_degree <=1 :
_logger.debug(
'The model function is set to `Polynomial`. '
'The degree must be greater than 1. Degree wil reset to 2.')
warnings.warn('Polynomial degree must be greater than 1.'
'Value is ressetting to `2`.')
kind_degree = 2
try :
kind_degree= int(kind_degree)
except Exception :
raise ValueError('Could not convert to integer.')
def kindOfModel(degree, x, y) :
""" Generate kind of model. If degree is``1`` The linear subset
function will use. If `degree` is greater than 2, Matrix will
generate using the polynomail function.
:param x: X values must be the vertical nodes values
:param y: S values must be the resistivity of subblocks at node x
"""
c= []
deg = degree
w = np.zeros((degree+1, 1)) # initialize weights
def init_weights (x, y):
""" Init weights by calculating the scope of the function along
the vertical nodes axis for each columns. """
for j in range(x.shape[1]-1):
a= (y.max()-y.min())/(x[:, j].max()-x[:, j].min())
w[j]=a
w[-1] = y.mean()
return w # return weights
for i in range(degree):
c.append(x ** deg)
deg= deg -1
if len(c)> 1:
x= func.concat_array_from_list(c, concat_axis=1)
x= np.concatenate((x, np.ones((x.shape[0], 1))), axis =1)
else: x= np.vstack((x, np.ones(x.shape))).T # initialize z to V*2
w= init_weights(x=x, y=y)
return x, w # Return the matrix x and the weights vector w
def model(Z, W):
""" Model function F= Z.W where `Z` id composed of vertical nodes
values and `bias` columns and `W` is weights numbers."""
return Z.dot(W)
# generate function with degree
Z, W = kindOfModel(degree=kind_degree, x=z, y=s)
# Compute the gradient descent
cost_history = np.zeros(n_epochs)
s=s.reshape((s.shape[0], 1))
for ii in range(n_epochs):
W= W - (Z.T.dot(Z.dot(W)-s)/ Z.shape[0]) * alpha_
cost_history[ii]= (1/ 2* Z.shape[0]) * np.sum((Z.dot(W) -s)**2)
F= model(Z=Z, W=W) # generate the new model with the best weights
return F,W, cost_history
def _makeBlock (self):
""" Construct the differnt block base on `eta` param. Separate blocks
from number of vertical nodes generated by the first `eta` value applied
to the `crm`."""
self.zmodel = np.concatenate((self.geo_depth.reshape(
self.geo_depth.shape[0], 1), self.model_res), axis =1)
vv = self.zmodel[-1, 0] / self.eta
for ii, nodev in enumerate(self.zmodel[:, 0]):
if nodev >= vv:
npts = ii # collect number of points got.
break
self._subblocks =[]
bp, jj =npts, 0
if len(self.zmodel[:, 0]) <= npts:
self._subblocks.append(self.zmodel)
else:
for ii , row in enumerate(self.zmodel) :
if ii == bp:
_tp = self.zmodel[jj:ii, :]
self._subblocks.append(_tp )
bp +=npts
jj=ii
if len(self.zmodel[jj:, 0])<= npts:
self._subblocks.append(self.zmodel[jj:, :])
break
return self._subblocks
@property
def subblocks(self):
""" Model subblocks divised by `eta`"""
return self._subblocks
@subblocks.setter
def subblocks(self, subblks):
""" keep subblocks as :class:`~GeoStratigraphy` property"""
self._subblocks = subblks
def _createAutoLayer (self, subblocks=None, s0=None,
ptol = None,**kws):
"""
The third step of replacement using the geological database.
The third step consists to find the rock γ_L in the Γ with the
ceiled mean value γ_ρ in E_props column is close to the calculated
resistivity r_11. Once the rock γ_L is found,the calculated
resistivity r_11 is replaced by γ_ρ. Therefore, the rock γ_L is
considered as an automatic layer. At the same time,the TRES and LN
is updated by adding GeoStratigraphy_ρ and γ_L respectively to
the existing given data.
"""
db_properties = kws.pop('properties',['electrical_props',
'__description'] )
tres = kws.pop('tres', None)
disp = kws.pop('display_infos', False)
hinfos = kws.pop('header', 'Automatic layers')
if tres is not None :self.tres = tres
def _findGeostructures(_res):
""" Find the layer from database and keep the ceiled value of
`_res` calculated resistivities"""
structures = self.get_structure(_res)
if len(structures) !=0 or structures is not None:
if structures[0].find('/')>=0 :
ln = structures[0].split('/')[0].lower()
else: ln = structures[0].lower()
return ln, _res
else:
valEpropsNames = self._getProperties(db_properties)
indeprops = db_properties.index('electrical_props')
for ii, elecp_value in enumerate(valEpropsNames[indeprops]):
if elecp_value ==0.: continue
elif elecp_value !=0 :
try :
iter(elecp_value)
except : pass
else :
if min(elecp_value)<= _res<= max(elecp_value):
ln= valEpropsNames[indeprops][ii]
return ln, _res
def _normalizeAutoresvalues(listOfstructures,listOfvalues):
""" Find the different structures that exist and
harmonize value. and return an array of originated values and
the harmonize values and the number of automatics layer found as
well as their harmonized resistivity values.
"""
autolayers = list(set(listOfstructures))
hvalues= np.zeros((len(autolayers,)))
temp=[]
for ii , autol in enumerate(autolayers):
for jj, _alay in enumerate(listOfstructures):
if _alay ==autol:
temp.append(listOfvalues[jj])
hvalues[ii]= np.array(list(set(temp))).mean()
temp=[]
# build values array containes the res and the harmonize values
h= np.zeros((len(listOfvalues),))
for ii, (name, values) in enumerate(zip (listOfstructures,
listOfvalues)):
for jj, hnames in enumerate(autolayers) :
if name == hnames:
h[ii]= hvalues[jj]
finalres= np.vstack((np.array(listOfvalues),h) )
finalln = np.vstack((np.array(autolayers), hvalues))
return finalln, finalres
if tres is not None: self.tres = tres
if ptol is not None: self.ptol = ptol
_z= s0[:, 0]
s0 = s0[:, 1:].T
_temptres , _templn =[], []
subblocks=subblocks[:, 1:].T
for ii in range(s0.shape[0]): # hnodes N
for jj in range(s0.shape[1]): # znodes V
if s0[ii, jj] ==0. :
lnames, lcres =_findGeostructures(
np.power(10, subblocks[ii, jj]))
_temptres.append(np.log10(lcres))
_templn.append(lnames)
auto_rocks_names_res, automatics_resistivities =\
_normalizeAutoresvalues(_templn,_temptres )
for ii in range(s0.shape[0]): # hnodes N
for jj in range(s0.shape[1]): # znodes V
if s0[ii, jj] ==0. :
for k in range(automatics_resistivities.shape[1]):
subblocks[ii, jj] == automatics_resistivities[0,:][k]
s0[ii, jj]= automatics_resistivities[1,:][k]
break
s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1)
# display infos
if disp:
display_infos(infos=self.input_layers,
header= hinfos)
return s0, auto_rocks_names_res
@staticmethod
def _getProperties(properties =['electrical_props', '__description']):
""" Connect database and retrieve the 'Eprops'columns and 'LayerNames'
:param properties: DataBase columns.
:returns:
- `_gammaVal`: the `properties` values put on list.
The order of the retrieved values is function of
the `properties` disposal.
"""
def _fs (v):
""" Sanitize value and put on list
:param v: value
:Example:
>>> sanitize('(416.9, 100000.0)'))
...[416.9, 100000.0]
"""
try :
v = float(v)
except :
v = tuple([float (ss) for ss in
v.replace('(', '').replace(')', '').split(',')])
return v
# connect to geodataBase
try :
_dbObj = GeoDataBase()
except:
_logger.debug('Connection to database failed!')
else:
_gammaVal = _dbObj._retreive_databasecolumns(properties)
if 'electrical_props' in properties:
indexEprops = properties.index('electrical_props')
_gammaVal [indexEprops] = list(map(lambda x:_fs(x),
_gammaVal[indexEprops]))
return _gammaVal
[docs] @deprecated(reason= 'Expensive method, should deprecated soon to hard-code'
' and generate a bug when dimensions need to be resized.!')
@geoplot2d(reason='model',cmap='jet_r', plot_style ='pcolormesh',
show_grid=False )
def strataModel(self, kind ='nmStrata', **kwargs):
"""
Visualize the `strataModel` after `nm` creating using decorator from
:class:'~.geoplot2d'.
:param kind: can be :
- `nms` mean new model plots after inputs the `tres`
- `crm` means calculated resistivity from occam model blocks
*default* is `nm`.
:param plot_misft: Set to ``True`` if you want to visualise the error
between the `nm` and `crm`.
:param scale: Can be ``m`` or ``km`` for plot scale
:param in_percent`: Set to `True` to see your plot map scaled in %.
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import Geostratigraphy
>>> geosObj = GeoStratigraphy(**inversion_files,
input_resistivities=input_resistivity_values,
input_layers=input_layer_names)
>>> geosObj.strataModel(kind='nm', plot_misfit =False)
"""
def compute_misfit(rawb, newb, percent=True):
""" Compute misfit with calculated block and new model block """
m_misfit = .01* np.sqrt (
(rawb - newb)**2 /rawb**2 )
if percent is True:
m_misfit= m_misfit *100.
return m_misfit
depth_scale = kwargs.pop('scale', 'm')
plot_misfit =kwargs.pop('misfit_G', False)
misfit_percentage = kwargs.pop('in_percent', True)
if kind.lower().find('nm')>=0 or kind.lower().find('strata')>=0:
if self.nmSites is None:
self._createNM()
data = self.nmSites
elif kind.lower().find('crm'):
data = self.crmSites
# compute model_misfit
if plot_misfit is True :
self._logging.info('Plot strata misfit')
if kind.lower().find('strata')>=0:
data = compute_misfit(rawb=self.crmSites ,
newb= self.nmSites,
percent = misfit_percentage)
print('{0:-^77}'.format('StrataMisfit info'))
print('** {0:<37} {1} {2} {3}'.format(
'Misfit max ','=',data.max()*100., '%' ))
print('** {0:<37} {1} {2} {3}'.format(
'Misfit min','=',data.min()*100., '%' ))
print('-'*77)
data = np.resize (data, (len(self.geo_depth),
len(self.station_location)))
self.doi = self.geo_depth.max()
return (data, self.station_names, self.station_location,
self.geo_depth, self.doi, depth_scale,self.model_rms,
self.model_roughness, plot_misfit )
[docs] def stratigraphyModel (self, kind ='nm', misfit_G= False, **kwargs):
""" Make stratigraphy model
:param kind:
- `nms` mean new model plots after inputs the `tres`
- `crm` means calculated resistivity from occam model blocks
:param misfit_G: bool,
compute error between CRM and NM if set to `True`
"""
# get attribute from Geostratigraph object `
for file in ['model_fn', 'iter_fn', 'data_fn', 'mesh_fn',
'input_resistivities', 'input_layers']:
if hasattr(self, file):
kwargs[file]= getattr(self, file)
geoModel( kind =kind,
plot_misfit=misfit_G,
**kwargs)
[docs]def makeBlockSites(station_location, x_nodes, block_model):
""" Build block that contains only the station locations values
:param station_location: array of stations locations. Must be
self contains on the horizontal nodes (x_nodes)
:param x_nodes: Number of nodes in horizontal
:param block_model: Resistivity blocks model
:return:
- `stationblocks`: Block that contains only the
station location values.
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import makeBlockSite
>>> mainblocks= get_location_value(
station_location=geosObj.makeBlockSite,
x_nodes=geosObj.model_x_nodes, block_model=geosObj.model_res )
"""
index_array =np.zeros ((len(station_location), ), dtype =np.int32)
for ii, distance in enumerate(station_location):
for jj , nodes in enumerate(x_nodes):
if nodes == distance :
index_array [ii]= jj
break
elif nodes> distance:
min_= np.abs(distance-x_nodes[jj-1])
max_= np.abs(distance - x_nodes[jj+1])
if min_<max_:
index_array [ii]= jj-1
else: index_array [ii]=jj
break
_tema=[]
for ii in range(len(index_array )):
a_= block_model[:, int(index_array [ii])]
_tema.append(a_.reshape((a_.shape[0], 1)))
stationblock = np.concatenate((_tema), axis=1)
return stationblock
[docs]def display_infos(infos, **kws):
""" Display unique element on list of array infos
:param infos: Iterable object to display.
:param header: Change the `header` to other names.
:Example:
>>> from pycsamt.geodrill.geoCore.geodrill import display_infos
>>> ipts= ['river water', 'fracture zone', 'granite', 'gravel',
'sedimentary rocks', 'massive sulphide', 'igneous rocks',
'gravel', 'sedimentary rocks']
>>> display_infos('infos= ipts,header='TestAutoRocks',
size =77, inline='~')
"""
inline =kws.pop('inline', '-')
size =kws.pop('size', 70)
header =kws.pop('header', 'Automatic rocks')
if isinstance(infos, str ):
infos =[infos]
infos = list(set(infos))
print(inline * size )
mes= '{0}({1:02})'.format(header.capitalize(),
len(infos))
mes = '{0:^70}'.format(mes)
print(mes)
print(inline * size )
am=''
for ii in range(len(infos)):
if (ii+1) %2 ==0:
am = am + '{0:>4}.{1:<30}'.format(ii+1, infos[ii].capitalize())
print(am)
am=''
else:
am ='{0:>4}.{1:<30}'.format(ii+1, infos[ii].capitalize())
if ii ==len(infos)-1:
print(am)
print(inline * size )
if __name__=="__main__" :
path=r'F:\ThesisImp\occam2D\invers+files\inver_res\K1'
inversion_files = {'model_fn':'Occam2DModel',
'mesh_fn': 'Occam2DMesh',
"iter_fn":'ITER17.iter',
'data_fn':'OccamDataFile.dat'
}
input_resistivity_values =[10, 66, 70, 100, 1000, 2000,
3000, 7000, 15000 ]
input_layer_names =['river water', 'fracture zone', 'granite']
inversion_files = {key:os.path.join(path, vv) for key,
vv in inversion_files.items()}
# with np.errstate(divide='ignore'):
# ss= np.array(inpt2) /np.array(input_resistivity_values )
# # print(ss )
geosObj = GeoStratigraphy(**inversion_files,
input_resistivities=input_resistivity_values,
input_layers=input_layer_names)
geosObj.stratigraphyModel(kind ='nm',misfit_G=False)