Source code for mtuq.io.clients.CPS_SAC


import obspy
import os
import numpy as np

from glob import glob
from os.path import basename, exists, isdir, join
from os import listdir
from mtuq.greens_tensor.CPS import GreensTensor
from mtuq.io.clients.base import Client as ClientBase
from mtuq.util.signal import resample
from obspy.core import Stream
from obspy.geodetics import gps2dist_azimuth


# CPS workflow (solver + binary-to-SAC conversion) produces the following
# individual time series
CHANNELS = [
    'ZEX', 'ZSS', 'ZDS', 'ZDD',
    'REX', 'RSS', 'RDS', 'RDD',
    'TSS', 'TDS',
]


# disply prominent warning
print('CPS client still under testing')


[docs]class Client(ClientBase): """ CPS database client .. rubric:: Usage To instantiate a database client, supply a path or url: .. code:: from mtuq.io.clients.CPS_SAC import Client db = Client(path_or_url) Then the database client can be used to generate GreensTensors: .. code:: greens_tensors = db.get_greens_tensors(stations, origin) .. note:: `GreensTensor`s are obtained by reading precomputed time series from an CPS directory tree with naming convention `ZZZz/RRRRrZZZz.EXT`, where ZZZz gives the depth of the source, RRRRr gives the horizontal offset between source and receiver, and EXT is the file extension related to so-called fundamental sources. .. note:: The above directory tree convention permits us to represent offsets from 0 to 9999.9 km in 0.1 km increments and origin depths from 0 to 999.9 km in 0.1 km increments """ def __init__(self, path_or_url=None, model=None, include_mt=True, include_force=False): if not path_or_url: raise Exception if not exists(path_or_url): raise Exception if include_force: raise NotImplementedError if not model: model = basename(path_or_url) # path to CPS directory tree self.path = path_or_url # model from which CPS Green's functions were computed self.model = model self.include_mt = include_mt self.include_force = include_force
[docs] def get_greens_tensors(self, stations=[], origins=[], verbose=False): """ Extracts Green's tensors from database Returns a ``GreensTensorList`` in which each element corresponds to a (station, origin) pair from the given lists .. rubric :: Input arguments ``stations`` (`list` of `mtuq.Station` objects) ``origins`` (`list` of `mtuq.Origin` objects) ``verbose`` (`bool`) """ return super(Client, self).get_greens_tensors(stations, origins, verbose)
[docs] def _get_greens_tensor(self, station=None, origin=None): if station is None: raise Exception("Missing station input argument") if origin is None: raise Exception("Missing station input argument") traces = [] offset_in_m, _, _ = gps2dist_azimuth( origin.latitude, origin.longitude, station.latitude, station.longitude) # what are the start and end times of the data? t1_new = float(station.starttime) t2_new = float(station.endtime) dt_new = float(station.delta) if self.include_mt: # closest depth for which Green's functions are available depth_km = _closest_depth(self.path, origin.depth_in_m/1000.) # closest horizontal offset for which Green's functions are available offset_km = _closest_offset(self.path, depth_km, offset_in_m/1000.) # the file naming convention used by CPS for offset is RRRRr, which # allows us to represent offsets up to 9999.9 km # in increments of 0.1 km offset_str = '%05d' % (10.*offset_km) # the file naming convention used by CPS for depth is ZZZz, which # allows us to represent depths up to 999.9 km # in increments of 0.1 km depth_str = '%04d' % (10.*depth_km) for _i, ext in enumerate(CHANNELS): trace = obspy.read('%s/%s/%s%s.%s' % (self.path, depth_str, offset_str, depth_str, ext), format='sac')[0] trace.stats.channel = CHANNELS[_i] trace.stats._component = CHANNELS[_i][0] # ad hoc workaround for difference between # CPS and FK conventions if CHANNELS[_i].endswith('EX'): trace.stats.channel = trace.stats._component+'EP' # what are the start and end times of the Green's function? t1_old = float(origin.time)+float(trace.stats.starttime) t2_old = float(origin.time)+float(trace.stats.endtime) dt_old = float(trace.stats.delta) data_old = trace.data # resample Green's function data_new = resample(data_old, t1_old, t2_old, dt_old, t1_new, t2_new, dt_new) trace.data = data_new # convert from 10^-20 dyne to N^-1 trace.data *= 1.e-15 trace.stats.starttime = t1_new trace.stats.delta = dt_new traces += [trace] if self.include_force: raise NotImplementedError tags = [ 'model:%s' % self.model, 'solver:%s' % 'CPS', ] return GreensTensor(traces=[trace for trace in traces], station=station, origin=origin, tags=tags, include_mt=self.include_mt, include_force=self.include_force)
# # utility functions # def _closest_depth(path, depth_km, thresh_km=1.): """ Searches CPS directory tree to find closest depth for which Green's functions are available """ if not _listdir(path): raise Exception('No subdirectories found: %s' % path) depths = [] for subdir in _listdir(path): # exclude improperly-formatted subdirectories if len(subdir) != 4: continue if not subdir.isdigit(): continue # convert to km depths += [float(subdir)/10.] closest_depth = min(depths, key=lambda z: abs(z - depth_km)) if (closest_depth - depth_km) > thresh_km: print('Warning: Closest available Greens functions differ from given source ' 'by %.f km vertically' % (closest_depth - depth_km)) print('Warning: Depth displayed in figure header may be inaccurate') return closest_depth def _closest_offset(path, depth_km, offset_km, thresh_km=1.): """ Searches CPS directory tree to find closest horizontal offset for which Green's functions are available """ # the directory naming convention used by CPS is ZZZz wildcard = '%s/%04d' % (path, 10*depth_km) # the file naming convention used by CPS is RRRRrZZZz.EXT wildcard = '%s/?????%04d.ZEX' % (wildcard, 10*depth_km) if not glob(wildcard): raise Exception('No Greens functions found: %s' % wildcard) offsets = [] for fullname in glob(wildcard): filename = os.path.basename(fullname) # exclude improperly-formatted filenames if not filename[0:9].isdigit(): continue # convert to km offsets += [float(filename[0:5])/10.] closest_offset = min(offsets, key=lambda r: abs(r - offset_km)) if (closest_offset - offset_km) > thresh_km: print('Warning: Closest available Greens functions differ from given source ' 'by %.f km horizontally' % (closest_offset - offset_km)) return closest_offset def _listdir(path): # lists all subdirectories for name in listdir(path): if isdir(os.path.abspath(os.path.join(path, name))): yield name