Source code for siria.si.demonstrator

# GPLv3
#
# The Developers, 21st Century
import logging

from copy import deepcopy

import os
import os.path as op

import numpy as num

from pyrocko import squirrel, weeding
from pyrocko.gf import Timing, Range, LocalEngine
from pyrocko.guts import (
    Object, Float, StringChoice, Tuple, Dict, List, String, Int, Bool)
from pyrocko.io import stationxml
from pyrocko.model import Event
from pyrocko import obspy_compat

import grond  # noqa
from grond import CMTProblemConfig
from grond.meta import Path
from grond.problems.base import ProblemDataNotAvailable

from siria.util import \
    remove_blocklisted_stations, keep_allowlisted_stations, snr_filter
from siria.ids import IDSConfigFull, IDSRunner
from siria.io import sources_to_json
from siria.io.grond import pyrocko_to_sources
from siria.si.invert import SourceInjectionConfig, SourceInjector


logger = logging.getLogger('siria.si.demonstrator')

obspy_compat.plant()
d2r = num.pi / 180.


quantity2unit = dict([
    ('displacement', 'M'),
    ('velocity', 'M/S'),
    ('acceleration', 'M/S**2')])


source_types_injection_fn = 'sources_inject_{source_type}.yaml'


[docs]class MagnitudeGroup(Object): ''' Definition of inversion parameters specific to a certain magnitude range. The core of a magnitude group is the magnitude range (:attr:`~siria.si.demonstrator.MagnitudeGroup.magmin`, :attr:`~siria.si.demonstrator.MagnitudeGroup.magmax`). It limits the application of the magnitue group to earthquakes with the initial magnitude in the given range. Detailed inversion settings can be set: * the waveform time window, * which waveform channels are used for inversion. * the bandpass filter frequencies and fading time, * distance ranges for stations to be considered, * inversion parameter search space ranges, and * for which source models the magnitude group is applied for. .. code-block :: yaml - !siria.si.demonstrator.MagnitudeGroup # Range of the initally given magnitude to use this Magnitude Group. magmin: 6.0 magmax: 9.0 # Waveform time window settings. tmin: '{stored:begin}' tmax: '{vel_surface:2.0}' tfade: # Waveform bandpass filter settings. fmin: 0.01 fmax: 0.03 # Specfic upper frequency limit for finite fault inversions (optional). fmax_extended: 0.05 # Initial epicentral distance range for strong motion or HR-GNSS # stations to be considered. distance_nearfield: - 0.0 - 100000 # Initial epicentral distance range for broad band stations to be # considered. distance_farfield: - 50000 - 100000 # Specific Grond parameter ranges to be sampled within the estimate. ranges: depth: !pf.Range start: 1000.0 stop: 35000.0 time: !pf.Range start: -20.0 stop: 20.0 relative: 'add' duration: !pf.Range start: 5. stop: 40. step: 1. lateral: !pf.Range start: -25e3 stop: 25e3 width: !pf.Range start: 5000. stop: 15000. # Give source estimation method, the magnitude group is applied for. # More options are the 'pdr' - pseudo dynamic rupture model, or the # 'ids' - IDS source model. apply_for: - 'mt' # Moment Tensor point source - 'rs' # Rectangular source model # Seismic waveform channels, the magnitude group shall be applied for. channels: - 'Z' # Vertical - 'R' # Radial ''' magmin = Float.T( default=6.0, help='Minimum moment magnitude Mw. Event Mw must be >= ``magmin``.') magmax = Float.T( default=7.0, help='Maxmimum moment magnitude Mw. Event Mw must be < ``magmax``.') tmin = Timing.T( default='{stored:P}', help='Minimum time to be considered.') tmax = Timing.T( default='{vel_surface:2.0}', help='Maximum time to be considered.') tfade = Float.T( optional=True, help='Filter padding time. If not given, it is determined from the ' '``fmin``.') fmin = Float.T( default=0.01, help='Minimum frequency in [Hz].') fmax = Float.T( default=0.03, help='Maximum frequency in [Hz]. If ``fmax_extended`` is given, ' '``fmax`` is applied to the point source inversion only.') fmax_extended = Float.T( default=0.05, optional=True, help='Maximum frequency in [Hz] for extended rupture estimation. If ' '``fmax_extended`` is not given, ``fmax`` is applied .') distance_nearfield = Tuple.T( 2, Float.T(), default=(0., 100e3), help='Distance range in [m] for nearfield accelerometric and gnss ' 'stations to be considered in the extended rupture inversions.') distance_farfield = Tuple.T( 2, Float.T(), default=(100e3, 1000e3), help='Distance range in [m] for farfield broadband ' 'stations to be considered in the inversions (both point and ' 'extended).') use_nearfield_stations_for_mt = Bool.T( default=True, help='Use strong motion and HRGNSS stations for MT inversions, if set ' 'to ``True``.') ranges = Dict.T( String.T(), Range.T(), default=dict( lateral=Range(start=-20e3, stop=20e3), depth=Range(start=0., stop=50e3), duration=Range(start=1., stop=50.), time=Range(start=-50, stop=50, relative='add')), help='Search space for the uniform and directed sampling. Depth and ' 'lateral ranges are defined in [m], duration and time range in ' '[s].') apply_for = List.T( StringChoice.T(choices=['mt', 'rs', 'pdr', 'ids']), default=['mt', 'rs', 'pdr', 'ids'], help='Give source estimation methods, the magnitude group is applied ' 'for (``mt`` - point source, ``rs`` - rectangular source, ' '``pdr`` - pseudo dynamic rupture, ``ids`` - IDS source.') channels = List.T( StringChoice.T(choices='Z R T N E'.split()), default=['Z', 'R'], help='Channels, inversion shall be done on. Choose between ``Z`` - ' 'vertical, ``R`` - radial, ``T`` - transversal, ``N`` - north ' 'or ``E`` - east.') def valid_for_event(self, event): if self.magmin <= event.magnitude and self.magmax > event.magnitude: return True return False def get_valid_sampling_groups(self, sampling_groups): if not isinstance(sampling_groups, list): sampling_groups = list(sampling_groups) sg_out = [] for sg in sampling_groups: if not any(map(lambda v: v in sg.apply_for, self.apply_for)): continue sg_out.append(sg) return sg_out def _extract_stations_from_sx( self, responses_xml, event, ranges=None, nearfield=False, farfield=False, quantity=''): if not nearfield and not farfield: raise AttributeError('Set ``nearfield`` or ``farfield``.') if nearfield: distance = self.distance_nearfield if farfield: distance = self.distance_farfield if not quantity: quantity = list(quantity2unit.keys()) if not isinstance(quantity, list): quantity = [quantity] units = [quantity2unit[q] for q in quantity] stations = responses_xml.get_pyrocko_stations() stations_out = [] for s in stations: d = event.distance_to(s) if not (d >= distance[0] and d <= distance[1]): continue for _, _, c in responses_xml.iter_network_station_channels( net=s.network, sta=s.station, loc=s.location): unit_in = c.response.instrument_sensitivity.input_units.name if unit_in.upper() in units: stations_out.append(s) return stations_out def update_grond_config(self, config, responses_xml, event): config = deepcopy(config) pc = config.problem_config ranges = deepcopy(self.ranges) if 'lateral' in ranges: ranges['north_shift'] = ranges['lateral'] ranges['east_shift'] = ranges['lateral'] del ranges['lateral'] for k, r in pc.ranges.items(): if k in ranges: continue ranges[k] = r margin = 0. dist_nf = self.distance_nearfield dist_ff = self.distance_farfield if ranges is not None: ns = ranges['north_shift'] es = ranges['east_shift'] margin = num.linalg.norm([ num.abs([ns.start, ns.stop]).max(), num.abs([es.start, es.stop]).max() ]) dist_nf = (max(dist_nf[0] - margin, 0.), dist_nf[1] + margin) dist_ff = (max(dist_ff[0] - margin, 0.), dist_ff[1] + margin) tg = config.target_groups[0] tg.weight = 1.0 tg.channels = self.channels tg.misfit_config.fmin = self.fmin tg.misfit_config.tmin = self.tmin tg.misfit_config.tmax = self.tmax tg.misfit_config.tfade = None if self.tfade: tg.misfit_config.tfade = self.tfade tg.exclude = [] tg.misfit_config.autoshift_penalty_max = 0.1 nf_for_mt = self.use_nearfield_stations_for_mt tg_out = [] if isinstance(pc, CMTProblemConfig) and nf_for_mt is False: ranges['magnitude'] = Range(start=self.magmin, stop=self.magmax) tg.misfit_config.fmax = self.fmax tg.misfit_config.tautoshift_max = 0.125 / tg.misfit_config.fmax tg.path = '.'.join([tg.normalisation_family, 'ffvel']) tg.misfit_config.quantity = 'velocity' tg.distance_min, tg.distance_max = dist_ff stations = self._extract_stations_from_sx( responses_xml, event, ranges=ranges, farfield=True, quantity='velocity') include = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations])) include.sort() tg.include = include tg_out.append(tg) else: tg.misfit_config.fmax = self.fmax_extended or self.fmax tg.misfit_config.tautoshift_max = 0.125 / tg.misfit_config.fmax tg_disp = deepcopy(tg) tg_acc = deepcopy(tg) tg_far = deepcopy(tg) tg_disp.path = '.'.join([tg_disp.normalisation_family, 'nfdisp']) tg_acc.path = '.'.join([tg_acc.normalisation_family, 'nfacc']) tg_far.path = '.'.join([tg_far.normalisation_family, 'ffvel']) tg_disp.misfit_config.quantity = 'displacement' tg_acc.misfit_config.quantity = 'acceleration' tg_far.misfit_config.quantity = 'displacement' tg_disp.distance_min, tg_disp.distance_max = dist_nf tg_acc.distance_min, tg_acc.distance_max = dist_nf tg_far.distance_min, tg_far.distance_max = dist_ff stations_disp = self._extract_stations_from_sx( responses_xml, event, ranges=ranges, nearfield=True, quantity='displacement') stations_acc = self._extract_stations_from_sx( responses_xml, event, ranges=ranges, nearfield=True, quantity='acceleration') stations_far = self._extract_stations_from_sx( responses_xml, event, ranges=ranges, farfield=True, quantity='velocity') disp_include = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_disp])) acc_include = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_acc])) far_include = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_far])) if disp_include: disp_include.sort() tg_disp.include = disp_include if 'Z' in tg_disp.channels: ind = tg_disp.channels.index('Z') tg_disp.channels[ind] = 'U' tg_out.append(tg_disp) if acc_include: acc_include.sort() tg_acc.include = acc_include tg_out.append(tg_acc) if far_include: far_include.sort() tg_far.include = far_include tg_out.append(tg_far) config.target_groups = tg_out pc.ranges = ranges config.problem_config = pc return config def update_ids_config(self, config, responses_xml, event): from siria.ids.config import FilterSetting from siria.ids.targets.waveform import ComponentWeight config = deepcopy(config) tg = config.waveform_config[0] cw = ComponentWeight(north=0., east=0., up=0.) if 'Z' in self.channels: cw.up = 1.0 if any(c in self.channels for c in 'R T N E'.split()): cw.north = 1.0 cw.east = 1.0 tg.component_weights = cw tg.restitution_config.t_min = self.tmin tg.restitution_config.t_max = self.tmax if self.tfade: tg.restitution_config.t_fade = self.tfade ffactor = 1.5 fmax = self.fmax if self.fmax_extended is None else self.fmax_extended tg.restitution_config.frequency_limits = ( self.fmin / ffactor, self.fmin, fmax, fmax * ffactor) config.engine_config.ids_synthetic_bp_filter = FilterSetting( f_min=self.fmin, f_max=fmax, order=3) tg.blocklist = [] # TODO # tg.snr_min = 5. tg_out = [] tg_disp = deepcopy(tg) tg_acc = deepcopy(tg) tg_far = deepcopy(tg) tg_disp.restitution_config.target_quantity = 'displacement' tg_acc.restitution_config.target_quantity = 'acceleration' tg_far.restitution_config.target_quantity = 'velocity' tg_disp.distance_min, tg_disp.distance_max = self.distance_nearfield tg_acc.distance_min, tg_acc.distance_max = self.distance_nearfield tg_far.distance_min, tg_far.distance_max = self.distance_farfield stations_disp = self._extract_stations_from_sx( responses_xml, event, nearfield=True, quantity='displacement') stations_acc = self._extract_stations_from_sx( responses_xml, event, nearfield=True, quantity='acceleration') stations_far = self._extract_stations_from_sx( responses_xml, event, farfield=True, quantity='velocity') tg_disp.allowlist = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_disp])) tg_acc.allowlist = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_acc])) tg_far.allowlist = list(set([ '.'.join([c for c in s.nsl() if c]) for s in stations_far])) if tg_disp.allowlist: tg_out.append(tg_disp) if tg_acc.allowlist: tg_out.append(tg_acc) if tg_far.allowlist: tg_out.append(tg_far) config.waveform_config = tg_out return config
[docs]class SamplingPhase(Object): ''' Sampling phases base class. ''' iterations = Int.T( optional=True) def get_phase(self, *args, **kwargs): raise AttributeError('Not implemented') def get_depth_range(self, *args, **kwargs): raise AttributeError('Not implemented')
[docs]class UniformSamplingPhase(SamplingPhase): ''' Phase with uniform sampling of the model space. It wraps :class:`grond.optimisers.highscore.optimiser.UniformSamplerPhase`. ''' def get_phase(self, *args, **kwargs): from grond import UniformSamplerPhase return UniformSamplerPhase(niterations=self.iterations)
[docs]class PDFSamplingPhase(SamplingPhase): ''' Phase sampling the PDF of an earlier Grond run. It wraps :class:`grond.optimisers.highscore.optimiser.PDFSamplerPhase`. This phase is currently only implemented in the EWRICA specific version of Grond provided at https://git.gfz-potsdam.de/ewrica/grond. ''' run_dir = Path.T( help='Path to Grond run, PDFs are extracted from for each parameter.', optional=True) subset = String.T( default='harvest', optional=True, help='Grond subset of models to be used. `None` loads all models.') bins = Int.T( default=360, optional=True, help='Number of bins within parameter statistics. More bins lead to ' 'a better resolution.') def get_phase(self, *args, **kwargs): from grond import PDFSamplerPhase return PDFSamplerPhase( niterations=self.iterations, run_dir=self.run_dir, subset=self.subset)
[docs]class DirectedSamplingPhase(SamplingPhase): ''' Phase with directed sampling of the model space. It wraps :class:`grond.optimisers.highscore.optimiser.DirectedSamplerPhase`. ''' scatter_scale_begin = Float.T( default=2.0, help='Scaling factor at beginning of the phase.') scatter_scale_end = Float.T( default=0.45, help='Scaling factor at the end of the directed phase.') def get_phase(self, *args, **kwargs): from grond import DirectedSamplerPhase return DirectedSamplerPhase( niterations=self.iterations, scatter_scale_begin=self.scatter_scale_begin, scatter_scale_end=self.scatter_scale_end)
[docs]class InjectionSamplingPhase(SamplingPhase): ''' It generates models based on TSUMAPS or fault database priors. ''' config = SourceInjectionConfig.T( default=SourceInjectionConfig()) def __init__(self, *args, **kwargs): SamplingPhase.__init__(self, *args, **kwargs) self._sources = [] def get_phase( self, event, store, apply_for, nparallel, base_path, fmax, grond_config, *args, **kwargs): from pyrocko.guts import dump_all from grond import InjectionSamplerPhase source_types = dict( mt='point', rs='rectangular', pdr='dynamic') if self.config.fmax is None: self.config.fmax = fmax injector = SourceInjector(config=self.config) source_type = source_types[apply_for] sources = injector.get_sources( event, store, grond_config=grond_config, source_type=source_type, nparallel=nparallel) injection_fn = op.join( base_path, source_types_injection_fn.format(source_type=apply_for)) dump_all(sources, filename=injection_fn) self._sources = sources return InjectionSamplerPhase( niterations=len(sources), sources_path=injection_fn) def get_depth_range(self, apply_for, store, grond_config, *args, **kwargs): crange = grond_config.problem_config.ranges['depth'] if apply_for == 'mt': drange = Range( start=store.config.source_depth_min, stop=store.config.source_depth_max) else: delta_depth = num.max([ num.sin(d2r * src.dip) * src.width for src in self._sources]) drange = Range( start=store.config.source_depth_min, stop=store.config.source_depth_max - delta_depth) return Range( start=max(drange.start, crange.start), stop=min(drange.stop, crange.stop))
[docs]class PriorSamplingPhase(SamplingPhase): ''' It allows for changing sampling phases in consecuitive runs. In fast near-real-time earthquake parameter estimations inversion settings may change with first reliable inversion results available. This behaviour is accomplished with the PriorSamplingPhase. In the first, initial stage, e.g., model samples are drawn from prior information as provided with the :class:`~siria.si.demonstrator.InjectionSamplingPhase`. When finished, the secondary stage can use results from the initial stage, e.g., in the :class:`~siria.si.demonstrator.PDFSamplingPhase`. ''' initial_stage = SamplingPhase.T( default=InjectionSamplingPhase( config=SourceInjectionConfig()), help='Settings for the initial run (all runs, until any previous run' 'is finished.') secondary_stage = SamplingPhase.T( default=PDFSamplingPhase( iterations=2000, run_dir='grond_rundir'), help='Settings for the all runs started when a previous run is ' 'finished.', optional=True) def _get_current_phase(self, old_runs): from siria.io.grond import get_environment old_runs = [run_dir for run_dir in old_runs if op.exists(run_dir)] mod_time = [op.getmtime(run_dir) for run_dir in old_runs] idx = num.argsort(mod_time)[::-1] env = None for i in idx: env_cur = get_environment(old_runs[i]) if env_cur.is_running(): continue try: env_cur.get_history(subset=None) except ProblemDataNotAvailable: continue env = env_cur logger.info(f'Previous run {old_runs[i]} used for sampling') break if env is None or self.secondary_stage is None: phase = self.initial_stage else: phase = self.secondary_stage if hasattr(phase, 'run_dir'): phase.run_dir = env.get_rundir_path() return phase def get_phase(self, old_runs=[], *args, **kwargs): phase = self._get_current_phase(old_runs) return phase.get_phase(*args, **kwargs) def get_depth_range(self, old_runs=[], *args, **kwargs): phase = self._get_current_phase(old_runs) return phase.get_depth_range(*args, **kwargs)
[docs]class SamplingGroupLabel(StringChoice): choices = ['fast', 'regular']
[docs]class SamplingGroup(Object): ''' Grouping for sampler phases ''' sampling_phases = List.T( SamplingPhase.T(), default=[ InjectionSamplingPhase(config=SourceInjectionConfig()), UniformSamplingPhase(iterations=2000), DirectedSamplingPhase(iterations=20000)]) apply_for = List.T( StringChoice.T(choices=['mt', 'rs', 'pdr']), default=['mt', 'rs', 'pdr'], help='Give source estimation methods, the sampling group is valid ' 'for (``mt`` - point source, ``rs`` - rectangular source, ' '``pdr`` - pseudo dynamic rupture') label = SamplingGroupLabel.T( default='regular') def update_grond_config(self, config, old_runs, *args, **kwargs): fmax = num.max([tg.misfit_config.fmax for tg in config.target_groups]) phases = [] phases = [ sp.get_phase( fmax=fmax, grond_config=config, old_runs=old_runs, *args, **kwargs) for sp in self.sampling_phases] for sp in self.sampling_phases: try: drange = sp.get_depth_range( old_runs=old_runs, apply_for=kwargs.get('apply_for'), store=kwargs.get('store'), grond_config=config) config.problem_config.ranges['depth'] = drange except AttributeError: continue config.optimiser_config.sampler_phases = phases return config
[docs]class EngineConfig(Object): ''' Controlls for the used ground models, and processing settings. ''' store_dirs = List.T( String.T(), default=['.']) store_id_grond = String.T( default='ak135_2000km_1Hz') store_id_ids_waveform = String.T( default='waveform_store_ids') store_id_ids_geodetic = String.T( default='geodetic_store_ids') nparallel = Int.T( default=1, help='Number of parallel processes.') nthreads = Int.T( default=1, help='Number of threads.') pphase = String.T( default='{stored:any_P}', help='Pyrocko P-wave arrival time definition for IDS.') def update_grond_config(self, config): ec = config.engine_config ec.gf_store_superdirs = self.store_dirs for tg in config.target_groups: tg.store_id = self.store_id_grond return config def update_ids_config(self, config): ec = config.engine_config ec.ids_store_superdirs = self.store_dirs ec.pyrocko_store_superdirs = self.store_dirs ec.ids_waveform_store_id = self.store_id_ids_waveform ec.ids_geodetic_store_id = self.store_id_ids_geodetic for tg in config.waveform_config: tg.store_id = self.store_id_grond tg.pphase = self.pphase return config
[docs]class DemonstratorConfig(Object): ''' Configuration of the source inversion demonstrator. ''' nstations_min = Int.T( default=0, optional=True, help='Minimum number of available stations to issue a run.') nstations_max = Int.T( default=30, optional=True, help='Maximum number of available stations included within one run.') blocklist = List.T( default=[], help='Blocked stations given as STA, NET.STA or NET.STA.LOC code.') magnitude_groups = List.T( MagnitudeGroup.T(), default=[MagnitudeGroup()], help='Set of magnitude specific settings.') sampling_groups = List.T( SamplingGroup.T(), default=[SamplingGroup()]) engine_config = EngineConfig.T( default=EngineConfig()) create_report = Bool.T( default=False, help='If ``True``, IDS and Grond reports are generated.') hrgnss_snr_min = Float.T( default=2., optional=True, help='Minimium absolute Signal-To-Noise amplitude ratio for HRGNSS ' 'station records.')
[docs]class SourceInversionDemonstrator(Object): ''' Core inversion wrapper executing Grond and IDS inversion codes. ''' config = DemonstratorConfig.T( default=DemonstratorConfig()) event = Event.T( default=Event()) origin = Event.T( optional=True) def __init__( self, squirrel=None, responses_xml=None, *args, **kwargs): Object.__init__(self, *args, **kwargs) self._squirrel = squirrel self._sx = responses_xml self._engine = None self._store = None @property def engine(self): ''' :returns: The Pyrocko engine. :rtype: :py:class:`pyrocko.gf.LocalEngine` ''' if self._engine is None: ec = self.config.engine_config self._engine = LocalEngine(store_superdirs=ec.store_dirs) return self._engine @property def store(self): ''' :returns: The Pyrocko Green's Function store. :rtype: :py:class:`pyrocko.gf.store.Store` ''' if self._store is None: ec = self.config.engine_config self._store = self.engine.get_store(ec.store_id_grond) return self._store
[docs] def get_grond_config( self, magnitude_group, sampling_group, apply_for, base_path, old_runs): ''' Generate a Grond configuration file from the configurations. ''' from siria.util import example_file from pyrocko.guts import load mg = magnitude_group sg = sampling_group base_config_fns = dict( mt='grond_mt_source.gronf', rs='grond_rectangular_source.gronf', pdr='grond_dynamic_source.gronf',) config = load(filename=example_file(base_config_fns[apply_for])) config.dataset_config.waveform_paths = [] config.dataset_config.events_path = '' config.dataset_config.stations_stationxml_paths = [] config.dataset_config.responses_stationxml_paths = [] config.dataset_config.blacklist_paths = [] config.dataset_config.blacklist = [] config.dataset_config.whitelist_paths = [] config.dataset_config.whitelist = [] config.dataset_config.extend_incomplete = True # TODO check config.problem_config.distance_min = 0. config = mg.update_grond_config(config, self._sx, self.event) config = sg.update_grond_config( config, event=self.event, store=self.store, apply_for=apply_for, nparallel=self.config.engine_config.nparallel, base_path=base_path, old_runs=old_runs) config = self.config.engine_config.update_grond_config(config) return config
[docs] def weed_stations(self, codes): ''' Select stations with a good azimuthal and distance coverage. ''' event = self.event nstations_max = self.config.nstations_max for ic, c in enumerate(codes): if len(c) == 2: codes[ic] = c + ('*',) stations = self._squirrel.get_stations(codes=codes, model='squirrel') stations_pyr = [sta.get_pyrocko_station() for sta in stations] for i, s in enumerate(stations_pyr): s.set_event_relative_data(event) if s.nsl()[2] == '*': s.location = '' stations_pyr[i] = s stations_selected = weeding.weed_stations( stations_pyr, nstations_max)[0] logger.info('Reduced number of stations from %d to %d', len(stations), len(stations_selected)) return [sta.nsl()[:2] for sta in stations_selected]
[docs] def validate_pipeline_and_data(self, config): ''' Quality controll for data before inversions. The function removes doubled, or blocked records. ''' nstations_min = self.config.nstations_min nstations_max = self.config.nstations_max sq = self._squirrel store = self._store store_deltat = 1. / store.config.sample_rate quantity2codes = dict( displacement=['LB?'], velocity=[f'{c}H?' for c in ('B', 'H')], acceleration=['?N?']) # remove doubled station records as e.g HH* and BH* available_traces_nsl = [ '.'.join(c.nsl) if c.nsl[2] else '.'.join(c.nsl[:2]) for c in sq.get_codes(kind='waveform')] payload = [] for tg in config.target_groups: quantity = tg.misfit_config.quantity if 'vel' in tg.path: quantity = 'velocity' # get stations according to target group stations_codes = tg.include # get sensors from squirrel according to quantity codes_template = quantity2codes[quantity] stations_codes = [tuple(sc.split('.')) for sc in stations_codes] stations_codes = [ sc + ('',) if len(sc) == 2 else sc for sc in stations_codes] stations_codes = remove_blocklisted_stations( nsl=stations_codes, stations_blocklist=self.config.blocklist) stations_codes = keep_allowlisted_stations( nsl=stations_codes, stations_allowlist=available_traces_nsl) payload += [ (sc, codes_template, quantity, tg.misfit_config.fmin, tg.misfit_config.fmax) for sc in stations_codes] tpad = 1. / min([tg.misfit_config.fmin for tg in config.target_groups]) def validate_sensors( station_code, code_template, quantity, fmin, fmax, tpad): nsl = station_code codes = [nsl + (c,) for c in code_template] sensors = sq.get_sensors( codes=codes, time=self.event.time) if len(sensors) == 0: return [] # create ranking of sensors sensor_deltats = { s.channels[0].deltat: s for s in sensors if s.channels[0].deltat is not None} deltats_sorted = num.sort( num.array(list(sensor_deltats.keys()))) deltats_ranking = num.concatenate(( deltats_sorted[deltats_sorted <= store_deltat][::-1], deltats_sorted[deltats_sorted > store_deltat])) for deltat in deltats_ranking: sensor = sensor_deltats[deltat] tmin = store.t( tg.misfit_config.tmin, self.event, sensor) - tpad tmax = store.t( tg.misfit_config.tmax, self.event, sensor) + 600. \ + tpad tmin += self.event.time tmax += self.event.time traces_out = [] for cha in sensor.channels: # cha_traces = traces_in.get(cha.codes.nslc, []) cha_traces = sq.get_waveforms( cha, tmin=tmin, tmax=tmax, degap=True, maxgap=0, want_incomplete=True) # logger.debug(cha.summary) # logger.debug(len(cha_traces)) # Return to next deltat in case of missing data if not cha_traces: traces_out = [] break traces_out += cha_traces if not traces_out: continue else: break if quantity == 'displacement' and \ self.config.hrgnss_snr_min is not None: accepted, _ = snr_filter( traces_out, cut_time=self.event.time, snr_min=self.config.hrgnss_snr_min, fmin=fmin, fmax=fmax) if not accepted: logger.info( f'Removed {sensor.codes.network}.' f'{sensor.codes.station}.' f'{sensor.codes.location} due to bad SNR.') return [] return traces_out if not payload: logger.warning( 'Could not find any data matching the distance and time ' 'constraints') return False, None, None, None traces_subset = [] for args in payload: traces_out = validate_sensors(*args, tpad=tpad) traces_subset += traces_out sq_out = squirrel.Squirrel() if traces_subset: sq_out.add_volatile_waveforms(traces_subset) grond_ns = [] for tg in config.target_groups: grond_ns += tg.include grond_ns = set([tuple(c.split('.')[:2]) for c in grond_ns]) if len(grond_ns) < nstations_min: logger.warning( f'Less stations ({len(grond_ns)}) available for grond ' f'than required by minimum stations ({nstations_min}).') return False, None, None, None # check available data for stations in squirrel sq_ns = set([c.ns for c in sq_out.get_codes(kind='waveform')]) if len(sq_ns) < nstations_min: logger.warning( f'Less waveforms ({len(sq_ns)}) available for grond than ' f'required by minimum stations ({nstations_min}).') return False, None, None, None intersection = list(sq_ns & grond_ns) if len(intersection) < nstations_min: logger.warning( f'Less waveforms ({len(intersection)}) available for required ' 'stations than required by minimum stations ' f'({nstations_min}).') return False, None, None, None if len(intersection) > nstations_max: logger.info('Found more stations than required. Weed stations ...') intersection = self.weed_stations(intersection) logger.info( f'Found {len(grond_ns)} stations and {len(sq_ns)} station ' f'records. {len(intersection)} records are associated with ' f'available stations. Continue to grond ...') # This should be the option for the future but is buggy # sx_out = sq.get_stationxml(time=self.event.time) sx_out = self._sx return True, sq_out, sx_out, intersection
[docs] def run_grond_inversion( self, label, apply_for, config, base_path, *args, **kwargs): ''' Grond inversion wrapper ''' from grond import Environment from grond.core import go, expand_template proceed, sq_out, sx_out, allowlist = self.validate_pipeline_and_data( config) if not proceed: logger.error( 'Waveforms within the run are not compatible ' 'with the current settings of grond. Check your ' 'distance settings or the quality of your ' 'waveforms (e.g. gaps).') return [], {} conf = config.clone() conf.problem_config.name_template = '{}_{}_{}'.format( self.event.name, apply_for, label) conf.dataset_config.whitelist = ['.'.join(al) for al in allowlist] conf.event_names = [self.event.name] cwd = os.getcwd() os.chdir(base_path) conf.set_basepath(base_path, parent_path_prefix=base_path) conf.path_prefix = base_path env = Environment( config=conf, event_names=conf.event_names, event=self.event, squirrel=sq_out, responses_xml=sx_out) rundir_path = expand_template( conf.rundir_template, dict(problem_name=conf.problem_config.name_template)) env.set_rundir_path(rundir_path) go( environment=env, nparallel=self.config.engine_config.nparallel, nthreads=self.config.engine_config.nthreads, status='state', *args, **kwargs) env.set_rundir_path(rundir_path) ds = env.get_config().get_dataset( self.event.name, event=self.event, squirrel=self._squirrel, responses_xml=self._sx) env._dataset = ds try: history = env.get_history(subset=None) except ProblemDataNotAvailable: logger.error( 'Waveforms within the run are not compatible ' 'with the current settings of grond. Check your ' 'distance settings or the quality of your ' 'waveforms (e.g. gaps).') return [], {} sources = [history.problem.get_source(mod) for mod in history.get_sorted_models()] misfits = history.get_sorted_misfits() source_ensemble = pyrocko_to_sources(sources, misfits) _, source_dict = sources_to_json(source_ensemble) if self.config.create_report: self.run_grond_report(env) os.chdir(cwd) return source_ensemble, source_dict
[docs] def run_grond_report(self, env): ''' Generate Grond report. ''' from grond.plot import get_plot_config_collection from grond.report import report_index, report, ReportConfig conf = ReportConfig( plot_config_collection=get_plot_config_collection()) conf.set_basepath('.') conf.make_archive = False env.set_current_event_name(self.event.name) report( env, conf, update_without_plotting=False, make_archive=False, nthreads=self.config.engine_config.nthreads) report_index(conf)
[docs] def get_ids_config(self, magnitude_group, base_path): ''' Prepare the configuration for IDS. ''' from siria.util import example_file mg = magnitude_group base_config_fn = example_file('ids.conf') config = IDSConfigFull.load(filename=base_config_fn) config.path_prefix = base_path config.run_config.outpath_template = op.join( base_path, '{}_ids_regular'.format(self.event.name)) config.dataset_config.events_path = '' config.dataset_config.event_name = self.event.name config.dataset_config.nstations_wanted = 30 config.dataset_config.correct_event_duration = True for tg in config.waveform_config: # tg.static_weight = 0.5 tg.waveform_paths = [] tg.response_stationxml_paths = [] config = mg.update_ids_config(config, self._sx, self.event) config = self.config.engine_config.update_ids_config(config) return config
[docs] def run_ids_inversion(self, config, base_path, force): ''' IDS inversion wrapper. ''' from pyrocko.guts import clone source_ensemble = [] for np in ('nodalplane1', 'nodalplane2'): conf_new = deepcopy(config) conf_new.dataset_config.fault_plane = np conf_new.run_config.outpath_template += '_{}'.format(np) conf_new.bind_parent() runner = IDSRunner(config=conf_new) runner.prepare( responses_xml=self._sx, squirrel=self._squirrel, event=clone(self.event), origin=clone(self.origin)) runner.run() if runner.errmess: logger.warning('Run was not completed. Aborting ...') else: logger.info('IDS calculation done. Harvest results ...') source = runner.harvest(force=force) source_ensemble.append(source) if self.config.create_report: runner.report(cwd=base_path) _, source_dict = sources_to_json(source_ensemble) return source_ensemble, source_dict
[docs] def get_valid_magnitude_group(self): ''' Extract magnitude groups valid for the given event. ''' mag_groups = self.config.magnitude_groups return [mg for mg in mag_groups if mg.valid_for_event(self.event)]
[docs] def get_inversion_pipeline(self, base_path_grond, base_path_ids, old_runs): ''' Prepare the pipeline of inversions required for the current event. ''' mag_groups = self.get_valid_magnitude_group() smp_groups = self.config.sampling_groups pipeline_grond = [] pipeline_ids = [] logger.info('Found {} valid magnitude groups'.format(len(mag_groups))) logger.info( 'Check validity of {} sampling groups'.format(len(smp_groups))) for mg in mag_groups: for sg in smp_groups: commons = list(set(mg.apply_for) & set(sg.apply_for)) logger.info('Prepare configurations:' + ''.join(['\n - {}'.format(c) for c in commons])) pipeline_grond += [ (apply_for, sg.label, self.get_grond_config( mg, sg, apply_for, base_path_grond, old_runs)) for apply_for in commons if apply_for != 'ids'] if 'ids' in mg.apply_for: conf = self.get_ids_config(mg, base_path_ids) pipeline_ids.append((conf,)) def pipeline_sorting(pipe): source_to_num = dict( mt='1', rs='2', pdr='3') label_to_num = dict( fast='1', regular='2') return int(label_to_num[pipe[1]] + source_to_num[pipe[0]]) pipeline_grond.sort(key=pipeline_sorting) return pipeline_grond, pipeline_ids
def _quakeml_stream_to_events(stream): from pyrocko.io import quakeml qml = quakeml.QuakeML.load_xml(stream=stream) return qml.get_pyrocko_events() def _mseed_stream_to_squirrel(stream, sq): import tempfile from pyrocko.io import mseed try: with tempfile.NamedTemporaryFile(prefix='mseed-stream') as f: f.write(stream) f.flush() traces = mseed.iload(f.name) for tr in traces: sq.add_volatile_waveforms([tr]) except Exception as e: logger.debug(e) return sq def _stationxml_stream_to_stationxml(stream): if not isinstance(stream, list): sx = stationxml.FDSNStationXML.load_xml(stream=stream) else: sxs = [stationxml.FDSNStationXML.load_xml(stream=st) for st in stream] sx = stationxml.primitive_merge(sxs) return sx def _stationxml_to_squirrel_nuts( sx, content=['station', 'channel', 'response']): import copy import time from pyrocko.io import stationxml from pyrocko.squirrel import model Y = 60*60*24*365 far_future = time.time() + 20*Y value_or_none = stationxml.value_or_none inut = 0 for network in sx.network_list: for station in network.station_list: net = network.code sta = station.code tmin = station.start_date tmax = station.end_date if tmax is not None and tmax > far_future: tmax = None station_nut = model.make_station_nut( file_segment=0, file_element=inut, codes=model.CodesNSL(net, sta, '*'), tmin=tmin, tmax=tmax) if 'station' in content: station_nut.content = model.Station( lat=station.latitude.value, lon=station.longitude.value, elevation=value_or_none(station.elevation), **station_nut.station_kwargs) station_copy = copy.copy(station) station_copy.channel_list = [] station_nut.raw_content['stationxml'] = station_copy yield station_nut inut += 1 for channel in station.channel_list: cha = channel.code loc = channel.location_code.strip() tmin = channel.start_date tmax = channel.end_date if tmax is not None and tmax > far_future: tmax = None deltat = None if channel.sample_rate is not None \ and channel.sample_rate.value != 0.0: deltat = 1.0 / channel.sample_rate.value nut = model.make_channel_nut( file_segment=0, file_element=inut, codes=model.CodesNSLCE(net, sta, loc, cha, ''), tmin=tmin, tmax=tmax, deltat=deltat) if 'channel' in content: nut.content = model.Channel( lat=channel.latitude.value, lon=channel.longitude.value, elevation=value_or_none(channel.elevation), depth=value_or_none(channel.depth), azimuth=value_or_none(channel.azimuth), dip=value_or_none(channel.dip), **nut.channel_kwargs) channel_copy = copy.copy(channel) channel_copy.response = None nut.raw_content['stationxml'] = channel_copy yield nut inut += 1 context = '%s.%s.%s.%s' % (net, sta, loc, cha) if channel.response and 'response' in content: nut = model.make_response_nut( file_segment=0, file_element=inut, codes=model.CodesNSLCE(net, sta, loc, cha, ''), tmin=tmin, tmax=tmax, deltat=deltat) try: resp = channel.response.get_squirrel_response( context, **nut.response_kwargs) if 'response' in content: nut.content = resp nut.raw_content['stationxml'] = channel.response yield nut inut += 1 except stationxml.StationXMLError as e: logger.debug('Bad instrument response: %s' % str(e)) def stationxml_stream_to_squirrel(stream, sq): from pyrocko.squirrel.model import random_name sx = _stationxml_stream_to_stationxml(stream) name = random_name() for nut in _stationxml_to_squirrel_nuts( sx=sx, content=['station', 'channel', 'response']): nut.file_path = 'virtual:volatile:%s%d' % (name, nut.file_element) nut.file_format = 'virtual' nut.file_mtime = 0 sq.add_volatile([nut]) def _collect_grond_runs(rundir): import glob if rundir.endswith('.grun'): return [rundir] cwd = os.getcwd() os.chdir(rundir) runs = [] for d in glob.iglob('**/*', recursive=True): if d.endswith('.grun'): runs.append(op.join(rundir, d)) os.chdir(cwd) return runs
[docs]def run_source_inversion_stream( config, event_stream, responses_xml_stream=None, responses_xml=None, sq=None, waveforms_stream=None, obspy_streams=[], obspy_traces=[], origin_stream=None, *args, **kwargs): ''' Start inversion with streamed waveforms and meta data Further inversion settings are derived from the configuration file. :param config: Demonstrator configuration. :type config: :py:class:`siria.si.demonstrator.DemonstratorConfig` :param event_stream: Event information as QuakeML. :type event_stream: bytes :param responses_xml_stream: Station meta data in StationXML format. :type responses_xml_stream: optional, bytes :param sq: Squirrel instance containing meta data and/or waveform data. :type sq: optional, :py:class:`pyrocko.squirrel.Squirrel` :param waveforms_stream: Waveforms as miniSEED. :type waveforms_stream: optional, bytes :param obspy_streams: Waveforms in obspy stream format. :type obspy_streams: optional, iterable of :py:class:`obspy.core.stream.Stream` :param obspy_traces: Waveforms in obspy format. :type obspy_traces: optional, iterable of :py:class:`obspy.core.trace.Trace` :param origin_stream: If given, event location, time and duration are updated with the origin. :type origin_stream: optional, bytes :returns: Sourve inversion results as source list and dict :rtype: iterator of tuple(list(:py:class:`siria.sources.SiriaMTSource` or :py:class:`siria.sources.SiriaPseudoDynamicRupture` or :py:class:`siria.sources.SiriaRectangularSource` or :py:class:`siria.sources.SiriaIDSSource`)), dict ''' logger.info('Started stream source inversion') events = _quakeml_stream_to_events(event_stream) # Simplified case (only one event in event file) if len(events) != 1: raise IndexError('Please provide only one event.') event = events[0] origin = None if origin_stream is not None: origins = _quakeml_stream_to_events(origin_stream) # Simplified case (only one event in event file) if len(origins) != 1: raise IndexError('Please provide only one origin.') origin = origins[0] if sq is None: sq = squirrel.Squirrel() sx = None if responses_xml_stream is not None: sx = _stationxml_stream_to_stationxml(responses_xml_stream) # TODO fix the following code lines # stationxml_stream_to_squirrel(responses_xml_stream, sq) # sx = sq.get_stationxml(level='response') elif responses_xml is not None: sx = responses_xml if sx is None: raise ValueError('No stationxml given') if waveforms_stream is None and obspy_traces is None and \ obspy_streams is None: raise AttributeError( 'Please give waveforms as stream, list of obspy_traces or both.') if waveforms_stream is not None: _mseed_stream_to_squirrel(waveforms_stream, sq) if obspy_streams: obspy_streams = list(obspy_streams) for st in obspy_streams: obspy_traces += st.traces if obspy_traces: sq.add_volatile_waveforms( [tr.to_pyrocko_trace() for tr in obspy_traces]) if type(config) == str: if not op.exists(config): raise ValueError('Configuration file {} not found.'.format(config)) config = DemonstratorConfig.load(filename=config) return run_source_inversion( config, event, sq, sx, origin=origin, *args, **kwargs)
[docs]def run_source_inversion( config, event, sq, responses_xml, origin=None, arrivals={}, old_runs=[], force=False, nstations_min=0, enable_profiling=False): ''' Start source inversion with Pyrocko understandable data formats. :param config: Demonstrator configuration. :type config: :py:class:`siria.si.demonstrator.DemonstratorConfig` :param event: Event information. :type event_stream: :py:class:`pyrocko.model.event.Event` :param sq: Waveforms stored in Squirrel. :type sq: :py:class:`pyrocko.squirrel.Squirrel` :param responses_xml: Station meta data. :type responses_xml_stream: list of :py:class:`pyrocko.io.stationxml.FDSNStationXML` :param origin: If given, event location, time and duration are updated with the origin. :type origin: optional, :py:class:`pyrocko.model.event.Event` :param old_runs: List of previous runs performed. PDFs can be extracted from them and used for refined sampling, if runs are finished. If several runs are available the last finished is used. :type old_runs: optional, list of str :param arrivals: Absolut P wave arrival times at the stations. So far not used. :type origin: optional, :py:class:`obspy.core.utcdatetime.UTCDateTime` :param force: If ``True``, old runs within the same runing directory are overwritten. Default is ``False``. :type force: optional, bool :param nstations_min: If larger than ``0``, runs are only issued for ``nstations_min`` or or more available stations. :type nstations_min: optional, int :param enable_profiling: If ``True``, cprofile is called. Results are stored in `cprofile.prof` :type enable_profiling: optional, bool :returns: Sourve inversion results as source list and dict :rtype: iterator of tuples of list of :py:class:`siria.sources.SiriaMTSource` or :py:class:`siria.sources.SiriaPseudoDynamicRupture` or :py:class:`siria.sources.SiriaRectangularSource` or :py:class:`siria.sources.SiriaIDSSource`, dict ''' logger.info('Started local source inversion') from pyrocko.util import ensuredir if enable_profiling: import cProfile import pstats profiler = cProfile.Profile() profiler.enable() event.name = event.name.replace( '/', '_').replace( ':', '_').replace( '.', '_').replace( '#', '_') cwd = os.getcwd() base_path_grond = op.join(cwd, 'grond') base_path_ids = op.join(cwd, 'ids') for p in (base_path_grond, base_path_ids): ensuredir(p) if isinstance(responses_xml, list): responses_xml = stationxml.primitive_merge(responses_xml) old_runs = [ run for oldrun in old_runs for run in _collect_grond_runs(oldrun)] config.nstations_min = nstations_min if config.nstations_max is None: config.nstations_max = len(sq.get_codes(kind='station')) demonstrator = SourceInversionDemonstrator( config=config, event=event, origin=origin, squirrel=sq, responses_xml=responses_xml) pipeline_grond, pipeline_ids = demonstrator.get_inversion_pipeline( base_path_grond, base_path_ids, old_runs) for pipe in pipeline_grond: yield demonstrator.run_grond_inversion( *pipe, base_path=base_path_grond, force=force) for pipe in pipeline_ids: yield demonstrator.run_ids_inversion( *pipe, base_path=base_path_ids, force=force) if enable_profiling: profiler.disable() stats = pstats.Stats(profiler) stats.dump_stats('cprofile.prof')
# TODO enable multiprocessing (each inversion and sampling group)