# GPLv3
#
# The Developers, 21st Century
import logging
from copy import deepcopy
import os
import os.path as op
import numpy as num
from pyrocko import squirrel, weeding
from pyrocko.gf import Timing, Range, LocalEngine
from pyrocko.guts import (
Object, Float, StringChoice, Tuple, Dict, List, String, Int, Bool)
from pyrocko.io import stationxml
from pyrocko.model import Event
from pyrocko import obspy_compat
import grond # noqa
from grond import CMTProblemConfig
from grond.meta import Path
from grond.problems.base import ProblemDataNotAvailable
from siria.util import \
remove_blocklisted_stations, keep_allowlisted_stations, snr_filter
from siria.ids import IDSConfigFull, IDSRunner
from siria.io import sources_to_json
from siria.io.grond import pyrocko_to_sources
from siria.si.invert import SourceInjectionConfig, SourceInjector
logger = logging.getLogger('siria.si.demonstrator')
obspy_compat.plant()
d2r = num.pi / 180.
quantity2unit = dict([
('displacement', 'M'),
('velocity', 'M/S'),
('acceleration', 'M/S**2')])
source_types_injection_fn = 'sources_inject_{source_type}.yaml'
[docs]class MagnitudeGroup(Object):
'''
Definition of inversion parameters specific to a certain magnitude range.
The core of a magnitude group is the magnitude range
(:attr:`~siria.si.demonstrator.MagnitudeGroup.magmin`,
:attr:`~siria.si.demonstrator.MagnitudeGroup.magmax`). It limits the
application of the magnitue group to earthquakes with the initial
magnitude in the given range. Detailed inversion settings can be set:
* the waveform time window,
* which waveform channels are used for inversion.
* the bandpass filter frequencies and fading time,
* distance ranges for stations to be considered,
* inversion parameter search space ranges, and
* for which source models the magnitude group is applied for.
.. code-block :: yaml
- !siria.si.demonstrator.MagnitudeGroup
# Range of the initally given magnitude to use this Magnitude Group.
magmin: 6.0
magmax: 9.0
# Waveform time window settings.
tmin: '{stored:begin}'
tmax: '{vel_surface:2.0}'
tfade:
# Waveform bandpass filter settings.
fmin: 0.01
fmax: 0.03
# Specfic upper frequency limit for finite fault inversions (optional).
fmax_extended: 0.05
# Initial epicentral distance range for strong motion or HR-GNSS
# stations to be considered.
distance_nearfield:
- 0.0
- 100000
# Initial epicentral distance range for broad band stations to be
# considered.
distance_farfield:
- 50000
- 100000
# Specific Grond parameter ranges to be sampled within the estimate.
ranges:
depth: !pf.Range
start: 1000.0
stop: 35000.0
time: !pf.Range
start: -20.0
stop: 20.0
relative: 'add'
duration: !pf.Range
start: 5.
stop: 40.
step: 1.
lateral: !pf.Range
start: -25e3
stop: 25e3
width: !pf.Range
start: 5000.
stop: 15000.
# Give source estimation method, the magnitude group is applied for.
# More options are the 'pdr' - pseudo dynamic rupture model, or the
# 'ids' - IDS source model.
apply_for:
- 'mt' # Moment Tensor point source
- 'rs' # Rectangular source model
# Seismic waveform channels, the magnitude group shall be applied for.
channels:
- 'Z' # Vertical
- 'R' # Radial
'''
magmin = Float.T(
default=6.0,
help='Minimum moment magnitude Mw. Event Mw must be >= ``magmin``.')
magmax = Float.T(
default=7.0,
help='Maxmimum moment magnitude Mw. Event Mw must be < ``magmax``.')
tmin = Timing.T(
default='{stored:P}',
help='Minimum time to be considered.')
tmax = Timing.T(
default='{vel_surface:2.0}',
help='Maximum time to be considered.')
tfade = Float.T(
optional=True,
help='Filter padding time. If not given, it is determined from the '
'``fmin``.')
fmin = Float.T(
default=0.01,
help='Minimum frequency in [Hz].')
fmax = Float.T(
default=0.03,
help='Maximum frequency in [Hz]. If ``fmax_extended`` is given, '
'``fmax`` is applied to the point source inversion only.')
fmax_extended = Float.T(
default=0.05,
optional=True,
help='Maximum frequency in [Hz] for extended rupture estimation. If '
'``fmax_extended`` is not given, ``fmax`` is applied .')
distance_nearfield = Tuple.T(
2,
Float.T(),
default=(0., 100e3),
help='Distance range in [m] for nearfield accelerometric and gnss '
'stations to be considered in the extended rupture inversions.')
distance_farfield = Tuple.T(
2,
Float.T(),
default=(100e3, 1000e3),
help='Distance range in [m] for farfield broadband '
'stations to be considered in the inversions (both point and '
'extended).')
use_nearfield_stations_for_mt = Bool.T(
default=True,
help='Use strong motion and HRGNSS stations for MT inversions, if set '
'to ``True``.')
ranges = Dict.T(
String.T(),
Range.T(),
default=dict(
lateral=Range(start=-20e3, stop=20e3),
depth=Range(start=0., stop=50e3),
duration=Range(start=1., stop=50.),
time=Range(start=-50, stop=50, relative='add')),
help='Search space for the uniform and directed sampling. Depth and '
'lateral ranges are defined in [m], duration and time range in '
'[s].')
apply_for = List.T(
StringChoice.T(choices=['mt', 'rs', 'pdr', 'ids']),
default=['mt', 'rs', 'pdr', 'ids'],
help='Give source estimation methods, the magnitude group is applied '
'for (``mt`` - point source, ``rs`` - rectangular source, '
'``pdr`` - pseudo dynamic rupture, ``ids`` - IDS source.')
channels = List.T(
StringChoice.T(choices='Z R T N E'.split()),
default=['Z', 'R'],
help='Channels, inversion shall be done on. Choose between ``Z`` - '
'vertical, ``R`` - radial, ``T`` - transversal, ``N`` - north '
'or ``E`` - east.')
def valid_for_event(self, event):
if self.magmin <= event.magnitude and self.magmax > event.magnitude:
return True
return False
def get_valid_sampling_groups(self, sampling_groups):
if not isinstance(sampling_groups, list):
sampling_groups = list(sampling_groups)
sg_out = []
for sg in sampling_groups:
if not any(map(lambda v: v in sg.apply_for, self.apply_for)):
continue
sg_out.append(sg)
return sg_out
def _extract_stations_from_sx(
self,
responses_xml,
event,
ranges=None,
nearfield=False,
farfield=False,
quantity=''):
if not nearfield and not farfield:
raise AttributeError('Set ``nearfield`` or ``farfield``.')
if nearfield:
distance = self.distance_nearfield
if farfield:
distance = self.distance_farfield
if not quantity:
quantity = list(quantity2unit.keys())
if not isinstance(quantity, list):
quantity = [quantity]
units = [quantity2unit[q] for q in quantity]
stations = responses_xml.get_pyrocko_stations()
stations_out = []
for s in stations:
d = event.distance_to(s)
if not (d >= distance[0] and d <= distance[1]):
continue
for _, _, c in responses_xml.iter_network_station_channels(
net=s.network,
sta=s.station,
loc=s.location):
unit_in = c.response.instrument_sensitivity.input_units.name
if unit_in.upper() in units:
stations_out.append(s)
return stations_out
def update_grond_config(self, config, responses_xml, event):
config = deepcopy(config)
pc = config.problem_config
ranges = deepcopy(self.ranges)
if 'lateral' in ranges:
ranges['north_shift'] = ranges['lateral']
ranges['east_shift'] = ranges['lateral']
del ranges['lateral']
for k, r in pc.ranges.items():
if k in ranges:
continue
ranges[k] = r
margin = 0.
dist_nf = self.distance_nearfield
dist_ff = self.distance_farfield
if ranges is not None:
ns = ranges['north_shift']
es = ranges['east_shift']
margin = num.linalg.norm([
num.abs([ns.start, ns.stop]).max(),
num.abs([es.start, es.stop]).max()
])
dist_nf = (max(dist_nf[0] - margin, 0.), dist_nf[1] + margin)
dist_ff = (max(dist_ff[0] - margin, 0.), dist_ff[1] + margin)
tg = config.target_groups[0]
tg.weight = 1.0
tg.channels = self.channels
tg.misfit_config.fmin = self.fmin
tg.misfit_config.tmin = self.tmin
tg.misfit_config.tmax = self.tmax
tg.misfit_config.tfade = None
if self.tfade:
tg.misfit_config.tfade = self.tfade
tg.exclude = []
tg.misfit_config.autoshift_penalty_max = 0.1
nf_for_mt = self.use_nearfield_stations_for_mt
tg_out = []
if isinstance(pc, CMTProblemConfig) and nf_for_mt is False:
ranges['magnitude'] = Range(start=self.magmin, stop=self.magmax)
tg.misfit_config.fmax = self.fmax
tg.misfit_config.tautoshift_max = 0.125 / tg.misfit_config.fmax
tg.path = '.'.join([tg.normalisation_family, 'ffvel'])
tg.misfit_config.quantity = 'velocity'
tg.distance_min, tg.distance_max = dist_ff
stations = self._extract_stations_from_sx(
responses_xml,
event,
ranges=ranges,
farfield=True,
quantity='velocity')
include = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations]))
include.sort()
tg.include = include
tg_out.append(tg)
else:
tg.misfit_config.fmax = self.fmax_extended or self.fmax
tg.misfit_config.tautoshift_max = 0.125 / tg.misfit_config.fmax
tg_disp = deepcopy(tg)
tg_acc = deepcopy(tg)
tg_far = deepcopy(tg)
tg_disp.path = '.'.join([tg_disp.normalisation_family, 'nfdisp'])
tg_acc.path = '.'.join([tg_acc.normalisation_family, 'nfacc'])
tg_far.path = '.'.join([tg_far.normalisation_family, 'ffvel'])
tg_disp.misfit_config.quantity = 'displacement'
tg_acc.misfit_config.quantity = 'acceleration'
tg_far.misfit_config.quantity = 'displacement'
tg_disp.distance_min, tg_disp.distance_max = dist_nf
tg_acc.distance_min, tg_acc.distance_max = dist_nf
tg_far.distance_min, tg_far.distance_max = dist_ff
stations_disp = self._extract_stations_from_sx(
responses_xml,
event,
ranges=ranges,
nearfield=True,
quantity='displacement')
stations_acc = self._extract_stations_from_sx(
responses_xml,
event,
ranges=ranges,
nearfield=True,
quantity='acceleration')
stations_far = self._extract_stations_from_sx(
responses_xml,
event,
ranges=ranges,
farfield=True,
quantity='velocity')
disp_include = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_disp]))
acc_include = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_acc]))
far_include = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_far]))
if disp_include:
disp_include.sort()
tg_disp.include = disp_include
if 'Z' in tg_disp.channels:
ind = tg_disp.channels.index('Z')
tg_disp.channels[ind] = 'U'
tg_out.append(tg_disp)
if acc_include:
acc_include.sort()
tg_acc.include = acc_include
tg_out.append(tg_acc)
if far_include:
far_include.sort()
tg_far.include = far_include
tg_out.append(tg_far)
config.target_groups = tg_out
pc.ranges = ranges
config.problem_config = pc
return config
def update_ids_config(self, config, responses_xml, event):
from siria.ids.config import FilterSetting
from siria.ids.targets.waveform import ComponentWeight
config = deepcopy(config)
tg = config.waveform_config[0]
cw = ComponentWeight(north=0., east=0., up=0.)
if 'Z' in self.channels:
cw.up = 1.0
if any(c in self.channels for c in 'R T N E'.split()):
cw.north = 1.0
cw.east = 1.0
tg.component_weights = cw
tg.restitution_config.t_min = self.tmin
tg.restitution_config.t_max = self.tmax
if self.tfade:
tg.restitution_config.t_fade = self.tfade
ffactor = 1.5
fmax = self.fmax if self.fmax_extended is None else self.fmax_extended
tg.restitution_config.frequency_limits = (
self.fmin / ffactor,
self.fmin,
fmax,
fmax * ffactor)
config.engine_config.ids_synthetic_bp_filter = FilterSetting(
f_min=self.fmin,
f_max=fmax,
order=3)
tg.blocklist = []
# TODO
# tg.snr_min = 5.
tg_out = []
tg_disp = deepcopy(tg)
tg_acc = deepcopy(tg)
tg_far = deepcopy(tg)
tg_disp.restitution_config.target_quantity = 'displacement'
tg_acc.restitution_config.target_quantity = 'acceleration'
tg_far.restitution_config.target_quantity = 'velocity'
tg_disp.distance_min, tg_disp.distance_max = self.distance_nearfield
tg_acc.distance_min, tg_acc.distance_max = self.distance_nearfield
tg_far.distance_min, tg_far.distance_max = self.distance_farfield
stations_disp = self._extract_stations_from_sx(
responses_xml,
event,
nearfield=True,
quantity='displacement')
stations_acc = self._extract_stations_from_sx(
responses_xml,
event,
nearfield=True,
quantity='acceleration')
stations_far = self._extract_stations_from_sx(
responses_xml,
event,
farfield=True,
quantity='velocity')
tg_disp.allowlist = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_disp]))
tg_acc.allowlist = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_acc]))
tg_far.allowlist = list(set([
'.'.join([c for c in s.nsl() if c]) for s in stations_far]))
if tg_disp.allowlist:
tg_out.append(tg_disp)
if tg_acc.allowlist:
tg_out.append(tg_acc)
if tg_far.allowlist:
tg_out.append(tg_far)
config.waveform_config = tg_out
return config
[docs]class SamplingPhase(Object):
'''
Sampling phases base class.
'''
iterations = Int.T(
optional=True)
def get_phase(self, *args, **kwargs):
raise AttributeError('Not implemented')
def get_depth_range(self, *args, **kwargs):
raise AttributeError('Not implemented')
[docs]class PDFSamplingPhase(SamplingPhase):
'''
Phase sampling the PDF of an earlier Grond run.
It wraps :class:`grond.optimisers.highscore.optimiser.PDFSamplerPhase`.
This phase is currently only implemented in the EWRICA specific version of
Grond provided at https://git.gfz-potsdam.de/ewrica/grond.
'''
run_dir = Path.T(
help='Path to Grond run, PDFs are extracted from for each parameter.',
optional=True)
subset = String.T(
default='harvest',
optional=True,
help='Grond subset of models to be used. `None` loads all models.')
bins = Int.T(
default=360,
optional=True,
help='Number of bins within parameter statistics. More bins lead to '
'a better resolution.')
def get_phase(self, *args, **kwargs):
from grond import PDFSamplerPhase
return PDFSamplerPhase(
niterations=self.iterations,
run_dir=self.run_dir,
subset=self.subset)
[docs]class DirectedSamplingPhase(SamplingPhase):
'''
Phase with directed sampling of the model space.
It wraps
:class:`grond.optimisers.highscore.optimiser.DirectedSamplerPhase`.
'''
scatter_scale_begin = Float.T(
default=2.0,
help='Scaling factor at beginning of the phase.')
scatter_scale_end = Float.T(
default=0.45,
help='Scaling factor at the end of the directed phase.')
def get_phase(self, *args, **kwargs):
from grond import DirectedSamplerPhase
return DirectedSamplerPhase(
niterations=self.iterations,
scatter_scale_begin=self.scatter_scale_begin,
scatter_scale_end=self.scatter_scale_end)
[docs]class InjectionSamplingPhase(SamplingPhase):
'''
It generates models based on TSUMAPS or fault database priors.
'''
config = SourceInjectionConfig.T(
default=SourceInjectionConfig())
def __init__(self, *args, **kwargs):
SamplingPhase.__init__(self, *args, **kwargs)
self._sources = []
def get_phase(
self,
event,
store,
apply_for,
nparallel,
base_path,
fmax,
grond_config,
*args,
**kwargs):
from pyrocko.guts import dump_all
from grond import InjectionSamplerPhase
source_types = dict(
mt='point',
rs='rectangular',
pdr='dynamic')
if self.config.fmax is None:
self.config.fmax = fmax
injector = SourceInjector(config=self.config)
source_type = source_types[apply_for]
sources = injector.get_sources(
event,
store,
grond_config=grond_config,
source_type=source_type,
nparallel=nparallel)
injection_fn = op.join(
base_path,
source_types_injection_fn.format(source_type=apply_for))
dump_all(sources, filename=injection_fn)
self._sources = sources
return InjectionSamplerPhase(
niterations=len(sources),
sources_path=injection_fn)
def get_depth_range(self, apply_for, store, grond_config, *args, **kwargs):
crange = grond_config.problem_config.ranges['depth']
if apply_for == 'mt':
drange = Range(
start=store.config.source_depth_min,
stop=store.config.source_depth_max)
else:
delta_depth = num.max([
num.sin(d2r * src.dip) * src.width
for src in self._sources])
drange = Range(
start=store.config.source_depth_min,
stop=store.config.source_depth_max - delta_depth)
return Range(
start=max(drange.start, crange.start),
stop=min(drange.stop, crange.stop))
[docs]class PriorSamplingPhase(SamplingPhase):
'''
It allows for changing sampling phases in consecuitive runs.
In fast near-real-time earthquake parameter estimations inversion settings
may change with first reliable inversion results available. This behaviour
is accomplished with the PriorSamplingPhase. In the first, initial stage,
e.g., model samples are drawn from prior information as provided with the
:class:`~siria.si.demonstrator.InjectionSamplingPhase`. When finished, the
secondary stage can use results from the initial stage, e.g., in the
:class:`~siria.si.demonstrator.PDFSamplingPhase`.
'''
initial_stage = SamplingPhase.T(
default=InjectionSamplingPhase(
config=SourceInjectionConfig()),
help='Settings for the initial run (all runs, until any previous run'
'is finished.')
secondary_stage = SamplingPhase.T(
default=PDFSamplingPhase(
iterations=2000,
run_dir='grond_rundir'),
help='Settings for the all runs started when a previous run is '
'finished.',
optional=True)
def _get_current_phase(self, old_runs):
from siria.io.grond import get_environment
old_runs = [run_dir for run_dir in old_runs if op.exists(run_dir)]
mod_time = [op.getmtime(run_dir) for run_dir in old_runs]
idx = num.argsort(mod_time)[::-1]
env = None
for i in idx:
env_cur = get_environment(old_runs[i])
if env_cur.is_running():
continue
try:
env_cur.get_history(subset=None)
except ProblemDataNotAvailable:
continue
env = env_cur
logger.info(f'Previous run {old_runs[i]} used for sampling')
break
if env is None or self.secondary_stage is None:
phase = self.initial_stage
else:
phase = self.secondary_stage
if hasattr(phase, 'run_dir'):
phase.run_dir = env.get_rundir_path()
return phase
def get_phase(self, old_runs=[], *args, **kwargs):
phase = self._get_current_phase(old_runs)
return phase.get_phase(*args, **kwargs)
def get_depth_range(self, old_runs=[], *args, **kwargs):
phase = self._get_current_phase(old_runs)
return phase.get_depth_range(*args, **kwargs)
[docs]class SamplingGroupLabel(StringChoice):
choices = ['fast', 'regular']
[docs]class SamplingGroup(Object):
'''
Grouping for sampler phases
'''
sampling_phases = List.T(
SamplingPhase.T(),
default=[
InjectionSamplingPhase(config=SourceInjectionConfig()),
UniformSamplingPhase(iterations=2000),
DirectedSamplingPhase(iterations=20000)])
apply_for = List.T(
StringChoice.T(choices=['mt', 'rs', 'pdr']),
default=['mt', 'rs', 'pdr'],
help='Give source estimation methods, the sampling group is valid '
'for (``mt`` - point source, ``rs`` - rectangular source, '
'``pdr`` - pseudo dynamic rupture')
label = SamplingGroupLabel.T(
default='regular')
def update_grond_config(self, config, old_runs, *args, **kwargs):
fmax = num.max([tg.misfit_config.fmax for tg in config.target_groups])
phases = []
phases = [
sp.get_phase(
fmax=fmax,
grond_config=config,
old_runs=old_runs,
*args,
**kwargs)
for sp in self.sampling_phases]
for sp in self.sampling_phases:
try:
drange = sp.get_depth_range(
old_runs=old_runs,
apply_for=kwargs.get('apply_for'),
store=kwargs.get('store'),
grond_config=config)
config.problem_config.ranges['depth'] = drange
except AttributeError:
continue
config.optimiser_config.sampler_phases = phases
return config
[docs]class EngineConfig(Object):
'''
Controlls for the used ground models, and processing settings.
'''
store_dirs = List.T(
String.T(),
default=['.'])
store_id_grond = String.T(
default='ak135_2000km_1Hz')
store_id_ids_waveform = String.T(
default='waveform_store_ids')
store_id_ids_geodetic = String.T(
default='geodetic_store_ids')
nparallel = Int.T(
default=1,
help='Number of parallel processes.')
nthreads = Int.T(
default=1,
help='Number of threads.')
pphase = String.T(
default='{stored:any_P}',
help='Pyrocko P-wave arrival time definition for IDS.')
def update_grond_config(self, config):
ec = config.engine_config
ec.gf_store_superdirs = self.store_dirs
for tg in config.target_groups:
tg.store_id = self.store_id_grond
return config
def update_ids_config(self, config):
ec = config.engine_config
ec.ids_store_superdirs = self.store_dirs
ec.pyrocko_store_superdirs = self.store_dirs
ec.ids_waveform_store_id = self.store_id_ids_waveform
ec.ids_geodetic_store_id = self.store_id_ids_geodetic
for tg in config.waveform_config:
tg.store_id = self.store_id_grond
tg.pphase = self.pphase
return config
[docs]class DemonstratorConfig(Object):
'''
Configuration of the source inversion demonstrator.
'''
nstations_min = Int.T(
default=0,
optional=True,
help='Minimum number of available stations to issue a run.')
nstations_max = Int.T(
default=30,
optional=True,
help='Maximum number of available stations included within one run.')
blocklist = List.T(
default=[],
help='Blocked stations given as STA, NET.STA or NET.STA.LOC code.')
magnitude_groups = List.T(
MagnitudeGroup.T(),
default=[MagnitudeGroup()],
help='Set of magnitude specific settings.')
sampling_groups = List.T(
SamplingGroup.T(),
default=[SamplingGroup()])
engine_config = EngineConfig.T(
default=EngineConfig())
create_report = Bool.T(
default=False,
help='If ``True``, IDS and Grond reports are generated.')
hrgnss_snr_min = Float.T(
default=2.,
optional=True,
help='Minimium absolute Signal-To-Noise amplitude ratio for HRGNSS '
'station records.')
[docs]class SourceInversionDemonstrator(Object):
'''
Core inversion wrapper executing Grond and IDS inversion codes.
'''
config = DemonstratorConfig.T(
default=DemonstratorConfig())
event = Event.T(
default=Event())
origin = Event.T(
optional=True)
def __init__(
self,
squirrel=None,
responses_xml=None,
*args,
**kwargs):
Object.__init__(self, *args, **kwargs)
self._squirrel = squirrel
self._sx = responses_xml
self._engine = None
self._store = None
@property
def engine(self):
'''
:returns:
The Pyrocko engine.
:rtype:
:py:class:`pyrocko.gf.LocalEngine`
'''
if self._engine is None:
ec = self.config.engine_config
self._engine = LocalEngine(store_superdirs=ec.store_dirs)
return self._engine
@property
def store(self):
'''
:returns:
The Pyrocko Green's Function store.
:rtype:
:py:class:`pyrocko.gf.store.Store`
'''
if self._store is None:
ec = self.config.engine_config
self._store = self.engine.get_store(ec.store_id_grond)
return self._store
[docs] def get_grond_config(
self,
magnitude_group,
sampling_group,
apply_for,
base_path,
old_runs):
'''
Generate a Grond configuration file from the configurations.
'''
from siria.util import example_file
from pyrocko.guts import load
mg = magnitude_group
sg = sampling_group
base_config_fns = dict(
mt='grond_mt_source.gronf',
rs='grond_rectangular_source.gronf',
pdr='grond_dynamic_source.gronf',)
config = load(filename=example_file(base_config_fns[apply_for]))
config.dataset_config.waveform_paths = []
config.dataset_config.events_path = ''
config.dataset_config.stations_stationxml_paths = []
config.dataset_config.responses_stationxml_paths = []
config.dataset_config.blacklist_paths = []
config.dataset_config.blacklist = []
config.dataset_config.whitelist_paths = []
config.dataset_config.whitelist = []
config.dataset_config.extend_incomplete = True # TODO check
config.problem_config.distance_min = 0.
config = mg.update_grond_config(config, self._sx, self.event)
config = sg.update_grond_config(
config,
event=self.event,
store=self.store,
apply_for=apply_for,
nparallel=self.config.engine_config.nparallel,
base_path=base_path,
old_runs=old_runs)
config = self.config.engine_config.update_grond_config(config)
return config
[docs] def weed_stations(self, codes):
'''
Select stations with a good azimuthal and distance coverage.
'''
event = self.event
nstations_max = self.config.nstations_max
for ic, c in enumerate(codes):
if len(c) == 2:
codes[ic] = c + ('*',)
stations = self._squirrel.get_stations(codes=codes, model='squirrel')
stations_pyr = [sta.get_pyrocko_station() for sta in stations]
for i, s in enumerate(stations_pyr):
s.set_event_relative_data(event)
if s.nsl()[2] == '*':
s.location = ''
stations_pyr[i] = s
stations_selected = weeding.weed_stations(
stations_pyr, nstations_max)[0]
logger.info('Reduced number of stations from %d to %d',
len(stations), len(stations_selected))
return [sta.nsl()[:2] for sta in stations_selected]
[docs] def validate_pipeline_and_data(self, config):
'''
Quality controll for data before inversions.
The function removes doubled, or blocked records.
'''
nstations_min = self.config.nstations_min
nstations_max = self.config.nstations_max
sq = self._squirrel
store = self._store
store_deltat = 1. / store.config.sample_rate
quantity2codes = dict(
displacement=['LB?'],
velocity=[f'{c}H?' for c in ('B', 'H')],
acceleration=['?N?'])
# remove doubled station records as e.g HH* and BH*
available_traces_nsl = [
'.'.join(c.nsl) if c.nsl[2] else '.'.join(c.nsl[:2])
for c in sq.get_codes(kind='waveform')]
payload = []
for tg in config.target_groups:
quantity = tg.misfit_config.quantity
if 'vel' in tg.path:
quantity = 'velocity'
# get stations according to target group
stations_codes = tg.include
# get sensors from squirrel according to quantity
codes_template = quantity2codes[quantity]
stations_codes = [tuple(sc.split('.')) for sc in stations_codes]
stations_codes = [
sc + ('',) if len(sc) == 2 else sc for sc in stations_codes]
stations_codes = remove_blocklisted_stations(
nsl=stations_codes,
stations_blocklist=self.config.blocklist)
stations_codes = keep_allowlisted_stations(
nsl=stations_codes,
stations_allowlist=available_traces_nsl)
payload += [
(sc,
codes_template,
quantity,
tg.misfit_config.fmin,
tg.misfit_config.fmax) for sc in stations_codes]
tpad = 1. / min([tg.misfit_config.fmin for tg in config.target_groups])
def validate_sensors(
station_code, code_template, quantity, fmin, fmax, tpad):
nsl = station_code
codes = [nsl + (c,) for c in code_template]
sensors = sq.get_sensors(
codes=codes,
time=self.event.time)
if len(sensors) == 0:
return []
# create ranking of sensors
sensor_deltats = {
s.channels[0].deltat: s
for s in sensors if s.channels[0].deltat is not None}
deltats_sorted = num.sort(
num.array(list(sensor_deltats.keys())))
deltats_ranking = num.concatenate((
deltats_sorted[deltats_sorted <= store_deltat][::-1],
deltats_sorted[deltats_sorted > store_deltat]))
for deltat in deltats_ranking:
sensor = sensor_deltats[deltat]
tmin = store.t(
tg.misfit_config.tmin, self.event, sensor) - tpad
tmax = store.t(
tg.misfit_config.tmax, self.event, sensor) + 600. \
+ tpad
tmin += self.event.time
tmax += self.event.time
traces_out = []
for cha in sensor.channels:
# cha_traces = traces_in.get(cha.codes.nslc, [])
cha_traces = sq.get_waveforms(
cha,
tmin=tmin,
tmax=tmax,
degap=True,
maxgap=0,
want_incomplete=True)
# logger.debug(cha.summary)
# logger.debug(len(cha_traces))
# Return to next deltat in case of missing data
if not cha_traces:
traces_out = []
break
traces_out += cha_traces
if not traces_out:
continue
else:
break
if quantity == 'displacement' and \
self.config.hrgnss_snr_min is not None:
accepted, _ = snr_filter(
traces_out,
cut_time=self.event.time,
snr_min=self.config.hrgnss_snr_min,
fmin=fmin,
fmax=fmax)
if not accepted:
logger.info(
f'Removed {sensor.codes.network}.'
f'{sensor.codes.station}.'
f'{sensor.codes.location} due to bad SNR.')
return []
return traces_out
if not payload:
logger.warning(
'Could not find any data matching the distance and time '
'constraints')
return False, None, None, None
traces_subset = []
for args in payload:
traces_out = validate_sensors(*args, tpad=tpad)
traces_subset += traces_out
sq_out = squirrel.Squirrel()
if traces_subset:
sq_out.add_volatile_waveforms(traces_subset)
grond_ns = []
for tg in config.target_groups:
grond_ns += tg.include
grond_ns = set([tuple(c.split('.')[:2]) for c in grond_ns])
if len(grond_ns) < nstations_min:
logger.warning(
f'Less stations ({len(grond_ns)}) available for grond '
f'than required by minimum stations ({nstations_min}).')
return False, None, None, None
# check available data for stations in squirrel
sq_ns = set([c.ns for c in sq_out.get_codes(kind='waveform')])
if len(sq_ns) < nstations_min:
logger.warning(
f'Less waveforms ({len(sq_ns)}) available for grond than '
f'required by minimum stations ({nstations_min}).')
return False, None, None, None
intersection = list(sq_ns & grond_ns)
if len(intersection) < nstations_min:
logger.warning(
f'Less waveforms ({len(intersection)}) available for required '
'stations than required by minimum stations '
f'({nstations_min}).')
return False, None, None, None
if len(intersection) > nstations_max:
logger.info('Found more stations than required. Weed stations ...')
intersection = self.weed_stations(intersection)
logger.info(
f'Found {len(grond_ns)} stations and {len(sq_ns)} station '
f'records. {len(intersection)} records are associated with '
f'available stations. Continue to grond ...')
# This should be the option for the future but is buggy
# sx_out = sq.get_stationxml(time=self.event.time)
sx_out = self._sx
return True, sq_out, sx_out, intersection
[docs] def run_grond_inversion(
self,
label,
apply_for,
config,
base_path,
*args,
**kwargs):
'''
Grond inversion wrapper
'''
from grond import Environment
from grond.core import go, expand_template
proceed, sq_out, sx_out, allowlist = self.validate_pipeline_and_data(
config)
if not proceed:
logger.error(
'Waveforms within the run are not compatible '
'with the current settings of grond. Check your '
'distance settings or the quality of your '
'waveforms (e.g. gaps).')
return [], {}
conf = config.clone()
conf.problem_config.name_template = '{}_{}_{}'.format(
self.event.name, apply_for, label)
conf.dataset_config.whitelist = ['.'.join(al) for al in allowlist]
conf.event_names = [self.event.name]
cwd = os.getcwd()
os.chdir(base_path)
conf.set_basepath(base_path, parent_path_prefix=base_path)
conf.path_prefix = base_path
env = Environment(
config=conf,
event_names=conf.event_names,
event=self.event,
squirrel=sq_out,
responses_xml=sx_out)
rundir_path = expand_template(
conf.rundir_template,
dict(problem_name=conf.problem_config.name_template))
env.set_rundir_path(rundir_path)
go(
environment=env,
nparallel=self.config.engine_config.nparallel,
nthreads=self.config.engine_config.nthreads,
status='state',
*args,
**kwargs)
env.set_rundir_path(rundir_path)
ds = env.get_config().get_dataset(
self.event.name,
event=self.event,
squirrel=self._squirrel,
responses_xml=self._sx)
env._dataset = ds
try:
history = env.get_history(subset=None)
except ProblemDataNotAvailable:
logger.error(
'Waveforms within the run are not compatible '
'with the current settings of grond. Check your '
'distance settings or the quality of your '
'waveforms (e.g. gaps).')
return [], {}
sources = [history.problem.get_source(mod)
for mod in history.get_sorted_models()]
misfits = history.get_sorted_misfits()
source_ensemble = pyrocko_to_sources(sources, misfits)
_, source_dict = sources_to_json(source_ensemble)
if self.config.create_report:
self.run_grond_report(env)
os.chdir(cwd)
return source_ensemble, source_dict
[docs] def run_grond_report(self, env):
'''
Generate Grond report.
'''
from grond.plot import get_plot_config_collection
from grond.report import report_index, report, ReportConfig
conf = ReportConfig(
plot_config_collection=get_plot_config_collection())
conf.set_basepath('.')
conf.make_archive = False
env.set_current_event_name(self.event.name)
report(
env, conf,
update_without_plotting=False,
make_archive=False,
nthreads=self.config.engine_config.nthreads)
report_index(conf)
[docs] def get_ids_config(self, magnitude_group, base_path):
'''
Prepare the configuration for IDS.
'''
from siria.util import example_file
mg = magnitude_group
base_config_fn = example_file('ids.conf')
config = IDSConfigFull.load(filename=base_config_fn)
config.path_prefix = base_path
config.run_config.outpath_template = op.join(
base_path, '{}_ids_regular'.format(self.event.name))
config.dataset_config.events_path = ''
config.dataset_config.event_name = self.event.name
config.dataset_config.nstations_wanted = 30
config.dataset_config.correct_event_duration = True
for tg in config.waveform_config:
# tg.static_weight = 0.5
tg.waveform_paths = []
tg.response_stationxml_paths = []
config = mg.update_ids_config(config, self._sx, self.event)
config = self.config.engine_config.update_ids_config(config)
return config
[docs] def run_ids_inversion(self, config, base_path, force):
'''
IDS inversion wrapper.
'''
from pyrocko.guts import clone
source_ensemble = []
for np in ('nodalplane1', 'nodalplane2'):
conf_new = deepcopy(config)
conf_new.dataset_config.fault_plane = np
conf_new.run_config.outpath_template += '_{}'.format(np)
conf_new.bind_parent()
runner = IDSRunner(config=conf_new)
runner.prepare(
responses_xml=self._sx,
squirrel=self._squirrel,
event=clone(self.event),
origin=clone(self.origin))
runner.run()
if runner.errmess:
logger.warning('Run was not completed. Aborting ...')
else:
logger.info('IDS calculation done. Harvest results ...')
source = runner.harvest(force=force)
source_ensemble.append(source)
if self.config.create_report:
runner.report(cwd=base_path)
_, source_dict = sources_to_json(source_ensemble)
return source_ensemble, source_dict
[docs] def get_valid_magnitude_group(self):
'''
Extract magnitude groups valid for the given event.
'''
mag_groups = self.config.magnitude_groups
return [mg for mg in mag_groups if mg.valid_for_event(self.event)]
[docs] def get_inversion_pipeline(self, base_path_grond, base_path_ids, old_runs):
'''
Prepare the pipeline of inversions required for the current event.
'''
mag_groups = self.get_valid_magnitude_group()
smp_groups = self.config.sampling_groups
pipeline_grond = []
pipeline_ids = []
logger.info('Found {} valid magnitude groups'.format(len(mag_groups)))
logger.info(
'Check validity of {} sampling groups'.format(len(smp_groups)))
for mg in mag_groups:
for sg in smp_groups:
commons = list(set(mg.apply_for) & set(sg.apply_for))
logger.info('Prepare configurations:' +
''.join(['\n - {}'.format(c) for c in commons]))
pipeline_grond += [
(apply_for, sg.label, self.get_grond_config(
mg, sg, apply_for, base_path_grond, old_runs))
for apply_for in commons if apply_for != 'ids']
if 'ids' in mg.apply_for:
conf = self.get_ids_config(mg, base_path_ids)
pipeline_ids.append((conf,))
def pipeline_sorting(pipe):
source_to_num = dict(
mt='1',
rs='2',
pdr='3')
label_to_num = dict(
fast='1',
regular='2')
return int(label_to_num[pipe[1]] + source_to_num[pipe[0]])
pipeline_grond.sort(key=pipeline_sorting)
return pipeline_grond, pipeline_ids
def _quakeml_stream_to_events(stream):
from pyrocko.io import quakeml
qml = quakeml.QuakeML.load_xml(stream=stream)
return qml.get_pyrocko_events()
def _mseed_stream_to_squirrel(stream, sq):
import tempfile
from pyrocko.io import mseed
try:
with tempfile.NamedTemporaryFile(prefix='mseed-stream') as f:
f.write(stream)
f.flush()
traces = mseed.iload(f.name)
for tr in traces:
sq.add_volatile_waveforms([tr])
except Exception as e:
logger.debug(e)
return sq
def _stationxml_stream_to_stationxml(stream):
if not isinstance(stream, list):
sx = stationxml.FDSNStationXML.load_xml(stream=stream)
else:
sxs = [stationxml.FDSNStationXML.load_xml(stream=st) for st in stream]
sx = stationxml.primitive_merge(sxs)
return sx
def _stationxml_to_squirrel_nuts(
sx,
content=['station', 'channel', 'response']):
import copy
import time
from pyrocko.io import stationxml
from pyrocko.squirrel import model
Y = 60*60*24*365
far_future = time.time() + 20*Y
value_or_none = stationxml.value_or_none
inut = 0
for network in sx.network_list:
for station in network.station_list:
net = network.code
sta = station.code
tmin = station.start_date
tmax = station.end_date
if tmax is not None and tmax > far_future:
tmax = None
station_nut = model.make_station_nut(
file_segment=0,
file_element=inut,
codes=model.CodesNSL(net, sta, '*'),
tmin=tmin,
tmax=tmax)
if 'station' in content:
station_nut.content = model.Station(
lat=station.latitude.value,
lon=station.longitude.value,
elevation=value_or_none(station.elevation),
**station_nut.station_kwargs)
station_copy = copy.copy(station)
station_copy.channel_list = []
station_nut.raw_content['stationxml'] = station_copy
yield station_nut
inut += 1
for channel in station.channel_list:
cha = channel.code
loc = channel.location_code.strip()
tmin = channel.start_date
tmax = channel.end_date
if tmax is not None and tmax > far_future:
tmax = None
deltat = None
if channel.sample_rate is not None \
and channel.sample_rate.value != 0.0:
deltat = 1.0 / channel.sample_rate.value
nut = model.make_channel_nut(
file_segment=0,
file_element=inut,
codes=model.CodesNSLCE(net, sta, loc, cha, ''),
tmin=tmin,
tmax=tmax,
deltat=deltat)
if 'channel' in content:
nut.content = model.Channel(
lat=channel.latitude.value,
lon=channel.longitude.value,
elevation=value_or_none(channel.elevation),
depth=value_or_none(channel.depth),
azimuth=value_or_none(channel.azimuth),
dip=value_or_none(channel.dip),
**nut.channel_kwargs)
channel_copy = copy.copy(channel)
channel_copy.response = None
nut.raw_content['stationxml'] = channel_copy
yield nut
inut += 1
context = '%s.%s.%s.%s' % (net, sta, loc, cha)
if channel.response and 'response' in content:
nut = model.make_response_nut(
file_segment=0,
file_element=inut,
codes=model.CodesNSLCE(net, sta, loc, cha, ''),
tmin=tmin,
tmax=tmax,
deltat=deltat)
try:
resp = channel.response.get_squirrel_response(
context, **nut.response_kwargs)
if 'response' in content:
nut.content = resp
nut.raw_content['stationxml'] = channel.response
yield nut
inut += 1
except stationxml.StationXMLError as e:
logger.debug('Bad instrument response: %s' % str(e))
def stationxml_stream_to_squirrel(stream, sq):
from pyrocko.squirrel.model import random_name
sx = _stationxml_stream_to_stationxml(stream)
name = random_name()
for nut in _stationxml_to_squirrel_nuts(
sx=sx, content=['station', 'channel', 'response']):
nut.file_path = 'virtual:volatile:%s%d' % (name, nut.file_element)
nut.file_format = 'virtual'
nut.file_mtime = 0
sq.add_volatile([nut])
def _collect_grond_runs(rundir):
import glob
if rundir.endswith('.grun'):
return [rundir]
cwd = os.getcwd()
os.chdir(rundir)
runs = []
for d in glob.iglob('**/*', recursive=True):
if d.endswith('.grun'):
runs.append(op.join(rundir, d))
os.chdir(cwd)
return runs
[docs]def run_source_inversion_stream(
config,
event_stream,
responses_xml_stream=None,
responses_xml=None,
sq=None,
waveforms_stream=None,
obspy_streams=[],
obspy_traces=[],
origin_stream=None,
*args,
**kwargs):
'''
Start inversion with streamed waveforms and meta data
Further inversion settings are derived from the configuration file.
:param config:
Demonstrator configuration.
:type config:
:py:class:`siria.si.demonstrator.DemonstratorConfig`
:param event_stream:
Event information as QuakeML.
:type event_stream:
bytes
:param responses_xml_stream:
Station meta data in StationXML format.
:type responses_xml_stream:
optional, bytes
:param sq:
Squirrel instance containing meta data and/or waveform data.
:type sq:
optional, :py:class:`pyrocko.squirrel.Squirrel`
:param waveforms_stream:
Waveforms as miniSEED.
:type waveforms_stream:
optional, bytes
:param obspy_streams:
Waveforms in obspy stream format.
:type obspy_streams:
optional, iterable of :py:class:`obspy.core.stream.Stream`
:param obspy_traces:
Waveforms in obspy format.
:type obspy_traces:
optional, iterable of :py:class:`obspy.core.trace.Trace`
:param origin_stream:
If given, event location, time and duration are updated with the
origin.
:type origin_stream:
optional, bytes
:returns:
Sourve inversion results as source list and dict
:rtype:
iterator of tuple(list(:py:class:`siria.sources.SiriaMTSource` or
:py:class:`siria.sources.SiriaPseudoDynamicRupture` or
:py:class:`siria.sources.SiriaRectangularSource` or
:py:class:`siria.sources.SiriaIDSSource`)),
dict
'''
logger.info('Started stream source inversion')
events = _quakeml_stream_to_events(event_stream)
# Simplified case (only one event in event file)
if len(events) != 1:
raise IndexError('Please provide only one event.')
event = events[0]
origin = None
if origin_stream is not None:
origins = _quakeml_stream_to_events(origin_stream)
# Simplified case (only one event in event file)
if len(origins) != 1:
raise IndexError('Please provide only one origin.')
origin = origins[0]
if sq is None:
sq = squirrel.Squirrel()
sx = None
if responses_xml_stream is not None:
sx = _stationxml_stream_to_stationxml(responses_xml_stream)
# TODO fix the following code lines
# stationxml_stream_to_squirrel(responses_xml_stream, sq)
# sx = sq.get_stationxml(level='response')
elif responses_xml is not None:
sx = responses_xml
if sx is None:
raise ValueError('No stationxml given')
if waveforms_stream is None and obspy_traces is None and \
obspy_streams is None:
raise AttributeError(
'Please give waveforms as stream, list of obspy_traces or both.')
if waveforms_stream is not None:
_mseed_stream_to_squirrel(waveforms_stream, sq)
if obspy_streams:
obspy_streams = list(obspy_streams)
for st in obspy_streams:
obspy_traces += st.traces
if obspy_traces:
sq.add_volatile_waveforms(
[tr.to_pyrocko_trace() for tr in obspy_traces])
if type(config) == str:
if not op.exists(config):
raise ValueError('Configuration file {} not found.'.format(config))
config = DemonstratorConfig.load(filename=config)
return run_source_inversion(
config, event, sq, sx, origin=origin, *args, **kwargs)
[docs]def run_source_inversion(
config,
event,
sq,
responses_xml,
origin=None,
arrivals={},
old_runs=[],
force=False,
nstations_min=0,
enable_profiling=False):
'''
Start source inversion with Pyrocko understandable data formats.
:param config:
Demonstrator configuration.
:type config:
:py:class:`siria.si.demonstrator.DemonstratorConfig`
:param event:
Event information.
:type event_stream:
:py:class:`pyrocko.model.event.Event`
:param sq:
Waveforms stored in Squirrel.
:type sq:
:py:class:`pyrocko.squirrel.Squirrel`
:param responses_xml:
Station meta data.
:type responses_xml_stream:
list of :py:class:`pyrocko.io.stationxml.FDSNStationXML`
:param origin:
If given, event location, time and duration are updated with the
origin.
:type origin:
optional, :py:class:`pyrocko.model.event.Event`
:param old_runs:
List of previous runs performed. PDFs can be extracted from them and
used for refined sampling, if runs are finished. If several runs are
available the last finished is used.
:type old_runs:
optional, list of str
:param arrivals:
Absolut P wave arrival times at the stations. So far not used.
:type origin:
optional, :py:class:`obspy.core.utcdatetime.UTCDateTime`
:param force:
If ``True``, old runs within the same runing directory are overwritten.
Default is ``False``.
:type force:
optional, bool
:param nstations_min:
If larger than ``0``, runs are only issued for ``nstations_min`` or
or more available stations.
:type nstations_min:
optional, int
:param enable_profiling:
If ``True``, cprofile is called. Results are stored in `cprofile.prof`
:type enable_profiling:
optional, bool
:returns:
Sourve inversion results as source list and dict
:rtype:
iterator of tuples of
list of :py:class:`siria.sources.SiriaMTSource` or
:py:class:`siria.sources.SiriaPseudoDynamicRupture` or
:py:class:`siria.sources.SiriaRectangularSource` or
:py:class:`siria.sources.SiriaIDSSource`,
dict
'''
logger.info('Started local source inversion')
from pyrocko.util import ensuredir
if enable_profiling:
import cProfile
import pstats
profiler = cProfile.Profile()
profiler.enable()
event.name = event.name.replace(
'/', '_').replace(
':', '_').replace(
'.', '_').replace(
'#', '_')
cwd = os.getcwd()
base_path_grond = op.join(cwd, 'grond')
base_path_ids = op.join(cwd, 'ids')
for p in (base_path_grond, base_path_ids):
ensuredir(p)
if isinstance(responses_xml, list):
responses_xml = stationxml.primitive_merge(responses_xml)
old_runs = [
run
for oldrun in old_runs
for run in _collect_grond_runs(oldrun)]
config.nstations_min = nstations_min
if config.nstations_max is None:
config.nstations_max = len(sq.get_codes(kind='station'))
demonstrator = SourceInversionDemonstrator(
config=config,
event=event,
origin=origin,
squirrel=sq,
responses_xml=responses_xml)
pipeline_grond, pipeline_ids = demonstrator.get_inversion_pipeline(
base_path_grond, base_path_ids, old_runs)
for pipe in pipeline_grond:
yield demonstrator.run_grond_inversion(
*pipe,
base_path=base_path_grond,
force=force)
for pipe in pipeline_ids:
yield demonstrator.run_ids_inversion(
*pipe,
base_path=base_path_ids,
force=force)
if enable_profiling:
profiler.disable()
stats = pstats.Stats(profiler)
stats.dump_stats('cprofile.prof')
# TODO enable multiprocessing (each inversion and sampling group)