Source code for mtuq.misfit.polarity


import numpy as np

import mtuq
from mtuq.util import AttribDict, Null, iterable, warn
from mtuq.misfit.waveform.level2 import _to_array, _type
from obspy.taup import TauPyModel
from mtuq.util.signal import m_to_deg


[docs]class PolarityMisfit(object): """ Polarity misfit function .. rubric:: Usage Evaluating polarity misfit is a two-step procedure. First, the user supplies parameters such as the method used to calculate predicted polarities (see below for detailed parameter descriptions): .. code:: polarity_misfit = PolarityMisfit(**parameters) Second, the user invokes the misfit function: .. code:: values = polarity_misfit(data, greens, sources) During misfit evaluation - observed polarities are collected from the `data` argument, which can be either a list of integers or a `Dataset` with observed polarity values attached (see convention below for more information) - predicted polarities are calculated from the `greens` argument (see parameter descriptions below for more information) - a NumPy array of length `len(sources)` is returned, giving the number of mismatches between observed and predicted .. rubric:: Parameters ``method`` (`str`) - ``'taup'`` Calculate polarities using Taup-P - ``'FK_metadata'`` Read polarities from FK database - ``'CPS_metadata'`` Read polarities from CPS database - ``'waveform'`` Determine polarity from full-waveform synthetics (not implemented yet) .. rubric:: Other input arguments that may be required, depending on the above ``taup_model`` (`str`): Name of built-in ObsPy TauP model or path to custom ObsPy TauP model, required for `method=taup` ``FK_database`` (`str`): Path to FK database, required for for `method=FK_metadata`. .. note:: *Convention* : Positive vertical first motions are encoded as +1 and negative vertical first motions are encoded as -1. Unpicked or indeterminate first motions can be encoded as 0. """ def __init__(self, method='taup', taup_model='ak135', FK_database=None, FK_model=None, CPS_database=None, CPS_model=None, **parameters): if not method: raise Exception('Undefined parameter: method') self.method = method self.taup_model = taup_model self.FK_database = FK_database self.FK_model = FK_model self.CPS_database = CPS_database self.CPS_model = CPS_model # # check parameters # if self.method == 'taup': assert self.taup_model is not None self._taup = TauPyModel(self.taup_model) elif self.method == 'FK_metadata': assert self.FK_database is not None assert exists(self.FK_database) if self.FK_model is None: self.FK_model = basename(self.FK_database) elif self.method == 'CPS_metadata': assert self.CPS_database is not None assert exists(self.CPS_database) if self.CPS_model is None: self.CPS_model = basename(self.CPS_database) else: raise TypeError('Bad parameter: method') def __call__(self, data, greens, sources, progress_handle=Null(), set_attributes=False): # check input arguments _check(greens, self.method) # # evaluate misfit # observed = self.get_observed(data) predicted = self.get_predicted(greens, sources) values = np.abs(predicted - observed)/2. # mask unpicked mask = (observed != 0).astype(int) values = np.dot(values, mask) # returns a NumPy array of shape (len(sources), 1) return values.reshape(len(values), 1)
[docs] def get_observed(self, data): """ Extracts observed polarities from data """ if isinstance(data, mtuq.Dataset): observed = np.array([_get_polarity(stream) for stream in data]) elif isinstance(data, list): observed = np.array(data) elif isinstance(data, np.ndarray): observed = data else: raise TypeError return observed
[docs] def get_predicted(self, greens, sources): """ Calculates predicted polarities """ if type(sources) == mtuq.MomentTensor: sources = sources.as_vector().reshape((1,6)) _calculate = _polarities_mt elif type(sources) == mtuq.Force: raise NotImplementedError elif _type(sources.dims) == 'MomentTensor': sources = _to_array(sources) _calculate = _polarities_mt elif _type(sources.dims) == 'Force': raise NotImplementedError else: raise TypeError if self.method=='taup': takeoff_angles = _takeoff_angles_taup(self._taup, greens) azimuths = _get_azimuths(greens) predicted = _calculate(sources, takeoff_angles, azimuths) elif self.method=='FK_metadata': takeoff_angles = _takeoff_angles_FK(self.FK_database, greens) azimuths = _get_azimuths(greens) predicted = _calculate(sources, takeoff_angles, azimuths) elif self.method == 'CPS_metadata': takeoff_angles = _takeoff_angles_CPS(self.CPS_database, greens) azimuths = _get_azimuths(greens) predicted = _calculate(sources, takeoff_angles, azimuths) elif self.method=='waveform': raise NotImplementedError return predicted
[docs] def collect_attributes(self, data, greens): """ Collect polarity-related attributes (used for beachball plots) """ if self.method=='taup': takeoff_angles = _takeoff_angles_taup( self._taup, greens) elif self.method=='FK_metadata': takeoff_angles = _takeoff_angles_FK( self.FK_database, greens) elif self.method=='CPS_metadata': takeoff_angles = _takeoff_angles_CPS( self.CPS_database, greens) observed = self.get_observed(data) attrs_list = [] for _i, greens_tensor in enumerate(greens): attrs = AttribDict() try: attrs.azimuth = greens_tensor.azimuth attrs.distance_in_m = greens_tensor.distance_in_m except: pass try: attrs.network = greens_tensor.station.network attrs.station = greens_tensor.station.station attrs.location = greens_tensor.station.location attrs.latitude = greens_tensor.station.latitude attrs.longitude = greens_tensor.station.longitude except: pass try: attrs.takeoff_angle = takeoff_angles[_i] except: pass try: attrs.polarity = observed[_i] except: pass attrs_list += [attrs] return attrs_list
def _takeoff_angles_taup(taup, greens): """ Calculates takeoff angles from Tau-P model """ takeoff_angles = np.zeros(len(greens)) for _i, greens_tensor in enumerate(greens): depth_in_km = greens_tensor.origin.depth_in_m/1000. distance_in_deg = m_to_deg(greens_tensor.distance_in_m) takeoff_angles[_i] = _takeoff_angle_taup( taup, depth_in_km, distance_in_deg) return takeoff_angles def _takeoff_angle_taup(taup, depth_in_km, distance_in_deg): arrivals = taup.get_travel_times( depth_in_km, distance_in_deg, phase_list=['p', 'P']) phases = [] for arrival in arrivals: phases += [arrival.phase.name] if 'p' in phases: return arrivals[phases.index('p')].takeoff_angle elif 'P' in phases: return arrivals[phases.index('P')].takeoff_angle else: raise Exception def _polarities_mt(mt_array, takeoff, azimuth): n1,n2 = mt_array.shape if n2!= 6: raise Exception('Inconsistent dimensions') n3,n4 = len(takeoff), len(azimuth) if n3!=n4: raise Exception('Inconsistent dimensions') # prepare arrays polarities = np.zeros((n1,n3)) drc = np.empty((n1, 3)) takeoff = np.deg2rad(takeoff) azimuth = np.deg2rad(azimuth) for _i, angles in enumerate(zip(takeoff, azimuth)): sth = np.sin(angles[0]) cth = np.cos(angles[0]) sphi = np.sin(angles[1]) cphi = np.cos(angles[1]) drc[:, 0] = sth*cphi drc[:, 1] = sth*sphi drc[:, 2] = cth # Aki & Richards 2ed, p. 108, eq. 4.88 cth = mt_array[:, 0]*drc[:, 2]*drc[:, 2] +\ mt_array[:, 1]*drc[:, 0]*drc[:, 0] +\ mt_array[:, 2]*drc[:, 1]*drc[:, 1] +\ 2*(mt_array[:, 3]*drc[:, 0]*drc[:, 2] - mt_array[:, 4]*drc[:, 1]*drc[:, 2] - mt_array[:, 5]*drc[:, 0]*drc[:, 1]) polarities[cth > 0, _i] = +1 polarities[cth < 0, _i] = -1 return polarities def _polarities_force(force_array, takeoff_array, azimuth_array): raise NotImplementedError def _get_azimuths(greens): return np.array([stream.azimuth for stream in greens]) # # input argument checking # def _model_type(greens): try: solver = greens[0].attrs.solver except: solver = 'Unknown' if solver in ('AxiSEM', 'FK', 'CPS', 'syngine'): model_type = '1D model' elif solver in ('SPECFEM3D', 'SPECFEM3D_GLOBE'): model_type = '3D model' else: model_type = 'Unknown model' return model_type def _check(greens, method): return #model = _model_type(greens) #method = _method_type(method) #if model != method: # print() # print(' Possible inconsistency?') # print(' Green''s functions are from: %s' % model) # print(' Polarities are from: %s' % method) # print()