# GPLv3
#
# The Developers, 21st Century
import logging
import os
import os.path as op
import glob
import subprocess
import numpy as num
from pyrocko.guts import (
Bool, Float, Int, List, Object, String, StringChoice, dump_all)
from pyrocko.guts_array import Array
from pyrocko import gf, model, moment_tensor as pmt, orthodrome as pod
from grond import core, InjectionSamplerPhase, report, plot as grondplot
from grond.config import Config
from grond.dataset import DatasetError
from grond.environment import Environment
from grond.meta import expand_template, Path
from siria.scaling import length_blaser, width_blaser
from siria.dataset import faults, tsumaps
logger = logging.getLogger('siria.si.invert')
d2r = num.pi / 180.
km = 1e3
[docs]class TimeReference(StringChoice):
choices = ['origin', 'centroid']
[docs]class NEAMTHM18Sampler(Object):
'''
Defines no. of samples drawn from NEAMTHM18
'''
interpolation = StringChoice.T(
choices=(
'nearest',
'quadratic'),
default='nearest',
help='Defines, if the samples are drawn from the closest point or '
'extraced from an interpolated mixing distribution.')
n_samples = Int.T(
default=2000,
help='Number of samples drawn from the TSUMAPS distribution')
[docs]class SourceInjectionConfig(Object):
'''
Configuration for prior based source injection
'''
time_reference = TimeReference.T(
default='origin',
help='Choice, if the given event time is associate with the "origin" '
'or the "centroid" time. That may lead to time corrections '
'depending on the used source model.')
nucleation_x = Array.T(
default=num.array([-0.66, 0., 0.66]),
serialize_as='list',
dtype=num.float64,
help='Relative origin x-coordinates to be tested.',
optional=True)
nucleation_y = Array.T(
default=num.array([0., 0., 0.]),
serialize_as='list',
dtype=num.float64,
help='Relative origin y-coordinates to be tested.',
optional=True)
tractions = Array.T(
default=num.array([0.5, 1., 2.5, 5.]) * 1e6,
serialize_as='list',
dtype=num.float64,
help='Set of stress drops to be tested.',
optional=True)
asperity = List.T(
default=['total'],
help='Which possible locations shall be tested. Choose and combine '
'out of "total", "left", "center" and "right".',
optional=True)
neamthm18_sampler = NEAMTHM18Sampler.T(
default=NEAMTHM18Sampler(
interpolation='nearest',
n_samples=2000),
help='Settings, how samples are drawn from the NEAHMTHM18 database.')
fault_database = StringChoice.T(
choices=('DISS', 'GEM', 'EDSF', 'NOA'),
default='EDSF',
optional=True)
fault_radius_max = Float.T(
help='If set, only orientation data from faults within radius in [m].'
' Else the radius is derived from scaling relation.',
optional=True)
length_factor = Float.T(
default=1.,
help='Factor to be applied on fault lengths from scaling relation.',
optional=True)
width_factor = Float.T(
default=1.,
help='Factor to be applied on fault widths from scaling relation.',
optional=True)
include_dimension_uncertainty = Bool.T(
default=False,
help='If True, uncertainties within the scaling relations are applied '
' to lengths and widths.',
optional=True)
fmax = Float.T(
help='Maximum wave frequency of interest to be used for source model '
'patch size scaling.',
optional=True)
vmin = Float.T(
default=1000.,
help='Approximative minimum seismic wave speed [m/s] to be used for '
'source model patch size scaling.',
optional=True)
correct_event_duration = Bool.T(
default=True,
help='If True and event duration is set, the model origin time is '
'defined as event time - (event duration * 0.5).',
optional=True)
@property
def nucleation(self):
'''
Nucleation points as 2D array.
:returns:
Nucleation points to be injected.
:rtype:
:py:class:`numpy.ndarray`, shape: ``(N, 2)``
'''
if self.nucleation_x.shape != self.nucleation_y.shape:
raise IndexError(
'nucleation_x and nucleation_y must be of same shape')
nucl = num.zeros((self.nucleation_x.shape[0], 2))
nucl[:, 0] = self.nucleation_x
nucl[:, 1] = self.nucleation_y
return nucl
[docs]class SourceInjector(Object):
'''
Extract sources to be injected in Grond.
'''
config = SourceInjectionConfig.T(
default=SourceInjectionConfig())
rseed = Int.T(
default=1230,
help='Set rseed for custom random state')
def __init__(self, *args, **kwargs):
Object.__init__(self, *args, **kwargs)
self._state = num.random.RandomState(self.rseed)
def _extract_from_tsumaps(self, lat, lon, grond_config):
'''
Get orientation from tsumaps (NEAMTHM18) database for given location.
'''
gronf = grond_config
pc = gronf.problem_config
sampler = self.config.neamthm18_sampler
nsamples = sampler.n_samples
points = num.zeros((nsamples, 4))
points[:, 0] = lat
points[:, 1] = lon
points[:, 2] = self._state.uniform(
size=(nsamples,),
low=pc.ranges['north_shift'].start,
high=pc.ranges['north_shift'].stop)
points[:, 3] = self._state.uniform(
size=(nsamples,),
low=pc.ranges['east_shift'].start,
high=pc.ranges['east_shift'].stop)
samples = tsumaps.sample_probs_from_distribution(
points,
interpolation=sampler.interpolation,
rstate=self._state)
logger.info('Extracted %d models from NEAMTHM18' % samples.shape[0])
return (
points,
list(samples[:, 0]),
list(samples[:, 1]),
list(samples[:, 2]))
def _extract_from_faults(
self, lat, lon, mt=None, magnitude=None, grond_config=None):
'''
Get faults from chosen fault database for given location.
'''
conf = self.config
if conf.fault_radius_max is not None:
radius = conf.fault_radius_max
elif mt is not None:
radius = 0.5 * num.mean([
length_blaser(mt.magnitude, rake, use_errors=False)
for rake in (mt.rake1, mt.rake2)])
logger.info(
'Calculated fault search radius of %f m using the MT', radius)
elif magnitude is not None:
radius = 0.5 * num.mean([
length_blaser(magnitude, rake, use_errors=False)
for rake in (0., 90., 270.)])
logger.info('Caluclated fault search radius of %f m using '
'magnitude', radius)
rg = grond_config.problem_config.ranges
radius_grond = num.sqrt(
num.max([rg['north_shift'].start, rg['north_shift'].stop])**2. +
num.max([rg['east_shift'].start, rg['east_shift'].stop])**2.)
radius += radius_grond
logger.info('Lateral model space extent of %f m added to fault '
'search radius', radius_grond)
fault_list = faults.load_faults_from_database(
database=conf.fault_database,
lat=lat,
lon=lon,
radius=radius,
restrict_to_radius=True)
lats, lons, strikes, dips, rakes = [], [], [], [], []
for f in fault_list:
if any([p is None for p in (
f.lat, f.lon,
f.strike_min, f.strike_max,
f.dip_min, f.dip_max,
f.rake_min, f.rake_max)]):
logger.warning('Ignored fault %s due to missing orientation',
f.__str__())
continue
lats += list(num.repeat(f.lat, 3))
lons += list(num.repeat(f.lon, 3))
npoints = len(f.lat)
strikes += list(num.tile([
f.strike_min,
f.strike_max,
num.mean([f.strike_min, f.strike_max])], npoints))
dips += list(num.tile([
f.dip_min,
f.dip_max,
num.mean([f.dip_min, f.dip_max])], npoints))
rakes += list(num.tile([
f.rake_min,
f.rake_max,
num.mean([f.rake_min, f.rake_max])], npoints))
norths, easts = pod.latlon_to_ne_numpy(
lat, lon,
num.array(lats), num.array(lons))
return norths, easts, strikes, dips, rakes
def _assemble_dataset(self, event, grond_config, store):
conf = self.config
gronf = grond_config
lat, lon = event.effective_lat, event.effective_lon
mt = event.moment_tensor
norths = []
easts = []
strikes = []
dips = []
rakes = []
lengths = []
widths = []
if mt is not None:
norths += [0., 0.]
easts += [0., 0.]
strikes += [mt.strike1, mt.strike2]
dips += [mt.dip1, mt.dip2]
rakes += [mt.rake1, mt.rake2]
tpoints, tstrikes, tdips, trakes = self._extract_from_tsumaps(
lat, lon, gronf)
norths += list(tpoints[:, 2])
easts += list(tpoints[:, 3])
strikes += list(tstrikes)
dips += list(tdips)
rakes += list(trakes)
fnorths, feasts, fstrikes, fdips, frakes = self._extract_from_faults(
lat,
lon,
mt=event.moment_tensor,
magnitude=event.magnitude,
grond_config=gronf)
norths += list(fnorths)
easts += list(feasts)
strikes += fstrikes
dips += fdips
rakes += frakes
nsamples = len(norths)
mags = mt.magnitude if mt is not None else event.magnitude
mags = num.ones(nsamples) * mags
durations = 0. if event.duration is None else num.ones(event.duration)
durations = num.ones(nsamples) * durations
if gronf is not None:
if 'magnitude' in gronf.problem_config.ranges:
mg_range = gronf.problem_config.ranges['magnitude']
mags = self._state.uniform(
size=(nsamples,),
low=mg_range.start,
high=mg_range.stop)
if 'duration' in gronf.problem_config.ranges:
dur_range = gronf.problem_config.ranges['duration']
durations = self._state.uniform(
size=(nsamples,),
low=dur_range.start,
high=dur_range.stop)
depth_min = store.config.source_depth_min
depth_max = store.config.source_depth_max
if gronf is not None and 'depth' in gronf.problem_config.ranges:
depth_range = gronf.problem_config.ranges['depth']
depth_min = depth_range.start
depth_max = depth_range.stop
depths = num.random.uniform(
size=(nsamples,),
low=depth_min,
high=depth_max)
if gronf is not None and 'time' in gronf.problem_config.ranges:
time_range = gronf.problem_config.ranges['time']
time_min = time_range.start
time_max = time_range.stop
times = num.random.uniform(
size=(nsamples,),
low=time_min,
high=time_max)
lengths = [
conf.length_factor * length_blaser(
mag,
rake,
use_errors=conf.include_dimension_uncertainty)
for mag, rake in zip(mags, rakes)]
widths = [
conf.width_factor * width_blaser(
mag,
rake,
use_errors=conf.include_dimension_uncertainty)
for mag, rake in zip(mags, rakes)]
strikes = num.array(strikes)
strikes %= 360.
strikes[strikes > 180.] -= 360.
rakes = num.array(rakes)
rakes %= 360.
rakes[rakes > 180.] -= 360.
data = [
times,
norths,
easts,
depths,
mags,
strikes,
dips,
rakes,
lengths,
widths,
durations]
dataset = num.zeros(shape=(nsamples, len(data)))
for i, d in enumerate(data):
dataset[:, i] = d[:]
return dataset
def _generate_dynamic_sources(
self,
event,
store,
dataset,
nparallel):
'''
Generate dynamic sources based on prior data and injection config.
'''
from pyrocko import parimap
conf = self.config
mt = event.moment_tensor
base_source = gf.PseudoDynamicRupture.from_pyrocko_event(
event,
anchor='top',
smooth_rupture=True,
pure_shear=True,
decimation_factor=1.,
gamma=0.8,
stf=None,
rake=None,
slip=None)
if event.duration is not None and conf.time_reference == 'centroid':
base_source.time -= event.duration / 2.
# Subfault spacing
delta_patch = num.min(store.config.deltas) / 2.
wavelength_min = conf.vmin / conf.fmax
delta_patch = delta_patch \
if wavelength_min / 2. < delta_patch else wavelength_min / 2.
def calculate_inject_sources(
north,
east,
strike,
dip,
rake,
length,
width):
src = base_source.clone(
north_shift=north,
east_shift=east,
length=length,
width=width,
strike=strike,
dip=dip,
tractions=gf.tractions.DirectedTractions(
rake=rake,
traction=1.),
nx=int(num.ceil(length / delta_patch)),
ny=int(num.ceil(width / delta_patch)))
store_dmin = store.config.source_depth_min
store_dmax = store.config.source_depth_max
# Reset depth, N, E to account for top edge
# Check with store delta depth, source.depth and source.width
if dip != 0.:
delta_depth = num.sin(src.dip * d2r) * 0.5 * src.width
if delta_depth > (src.depth - store_dmin):
delta_depth = src.depth - store_dmin
x = -delta_depth / num.tan(src.dip * d2r)
else:
delta_depth = 0.
x = -src.width * 0.5
n = -num.sin(src.strike * d2r) * x
e = num.cos(src.strike * d2r) * x
src.north_shift += n
src.east_shift += e
src.depth -= delta_depth
delta_depth_store = store_dmax - store_dmin
delta_depth = num.sin(src.dip * d2r) * src.width
if delta_depth > delta_depth_store:
src.update(width=delta_depth_store / num.sin(src.dip * d2r))
logger.info('Source width reduced to %g m due to limited '
'store source depth range.', src.width)
if delta_depth + src.depth > store_dmax:
src.update(
width=(store_dmax - src.depth) / num.sin(src.dip * d2r))
logger.info('Source width reduced to %g m due to limited '
'store source depth range.', src.width)
# Sample different tractions
unique_sources = {}
for t in conf.tractions:
src_t = src.clone(patch_mask=None)
src_t.tractions.traction = t
mrate, mtimes = src_t.get_moment_rate(store)
moment = num.nancumsum(mrate * store.config.deltat, axis=0)
try:
t_max = mtimes[
num.where(moment - mt.moment >= 0.)[0][0]]
except IndexError:
t_max = None
# patch masking
slip = num.linalg.norm(
src_t.get_slip(time=t_max),
axis=1).ravel()
src_t.patch_mask = slip != 0.
patches = [p for p, msk in zip(src_t.patches, src_t.patch_mask)
if msk]
ul = num.array([(p.ix + p.al1, p.iy + p.aw1) for p in patches])
lr = num.array([(p.ix + p.al2, p.iy + p.aw2) for p in patches])
src_t.update(
length=lr[:, 0].max() - ul[:, 0].min(),
width=lr[:, 1].max() - ul[:, 1].min())
# check for unique lengths, widths -> remove redundant models
if (src_t.length, src_t.width) in unique_sources:
continue
src_t._interpolators = {}
src_t.update(
nx=int(num.ceil(src_t.length / delta_patch)),
ny=int(num.ceil(src_t.width / delta_patch)),
patches=None,
coef_mat=None,
patch_mask=None)
for asperity in conf.asperity:
if asperity != 'total':
src_a = src_t.clone(length=src_t.length / 2.)
else:
src_a = src_t.clone()
sin_strike = num.sin(src_a.strike * d2r)
cos_strike = num.cos(src_a.strike * d2r)
if asperity == 'left':
src_a.north_shift -= cos_strike * src_a.length * 0.5
src_a.east_shift -= sin_strike * src_a.length * 0.5
elif asperity == 'right':
src_a.north_shift += cos_strike * src_a.length * 0.5
src_a.east_shift += sin_strike * src_a.length * 0.5
# update source and get maximum slip (important for Grond)
src_a.rescale_slip(moment=mt.moment, store=store)
src_a.update(
patches=None,
coef_mat=None,
patch_mask=None,
tractions=None,
rake=rake)
id_tuple = (asperity, src_a.length, src_a.width)
unique_sources[id_tuple] = src_a
return list(unique_sources.values())
sources = []
payload = [(
dataset[i, 1],
dataset[i, 2],
dataset[i, 5],
dataset[i, 6],
dataset[i, 7],
dataset[i, 8],
dataset[i, 9])
for i in range(dataset.shape[0])]
sources = []
for sources_out in parimap.parimap(
calculate_inject_sources,
*zip(*payload),
nprocs=nparallel):
sources += sources_out
# iterate through origins and append to sources
return [src.clone(nucleation_x=nx, nucleation_y=ny)
for src in sources for nx, ny in conf.nucleation]
def _generate_rectangular_sources(
self,
event,
store,
dataset,
nparallel,
grond_config):
'''
Generate rectangular sources based on prior data and injection config.
'''
from pyrocko import parimap
from siria.gm.util import calc_rise_time
gronf = grond_config
pc = gronf.problem_config
conf = self.config
base_source = gf.RectangularSource.from_pyrocko_event(
event,
magnitude=None,
slip=1.,
anchor='top',
decimation_factor=1.,
velocity=store.config.get_vs(
event.lat, event.lon,
num.array([[
0., 0., event.depth]]),
interpolation='nearest_neighbor')[0])
if event.duration is not None and conf.time_reference == 'centroid':
base_source.time -= event.duration / 2.
magnitude = event.magnitude or event.moment_tensor.magnitude
if base_source.stf is None:
base_source.stf = gf.HalfSinusoidSTF()
base_source.stf.duration = calc_rise_time(mag=magnitude)
# base_source.magnitude = magnitude
def calculate_inject_sources(
north,
east,
mag,
strike,
dip,
rake,
length,
width,
velocity):
src = base_source.clone(
north_shift=north,
east_shift=east,
length=length,
width=width,
strike=strike,
dip=dip,
rake=rake,
velocity=velocity)
store_dmin = store.config.source_depth_min
store_dmax = store.config.source_depth_max
# Reset depth, N, E to account for top edge
# Check with store delta depth, source.depth and source.width
if dip != 0.:
delta_depth = num.sin(src.dip * d2r) * 0.5 * src.width
if delta_depth > (src.depth - store_dmin):
delta_depth = src.depth - store_dmin
x = -delta_depth / num.tan(src.dip * d2r)
else:
delta_depth = 0.
x = -src.width * 0.5
n = -num.sin(src.strike * d2r) * x
e = num.cos(src.strike * d2r) * x
src.north_shift += n
src.east_shift += e
src.depth -= delta_depth
delta_depth_store = store_dmax - store_dmin
delta_depth = num.sin(src.dip * d2r) * src.width
if delta_depth > delta_depth_store:
src.update(width=delta_depth_store / num.sin(src.dip * d2r))
logger.info('Source width reduced to %g m due to limited '
'store source depth range.', src.width)
if delta_depth + src.depth > store_dmax:
src.update(
width=(store_dmax - src.depth) / num.sin(src.dip * d2r))
logger.info('Source width reduced to %g m due to limited '
'store source depth range.', src.width)
target = gf.Target(
interpolation='nearest_neighbor')
unique_sources = {}
for asperity in conf.asperity:
if asperity != 'total':
src_a = src.clone(length=src.length / 2.)
else:
src_a = src.clone()
sin_strike = num.sin(src_a.strike * d2r)
cos_strike = num.cos(src_a.strike * d2r)
if asperity == 'left':
src_a.north_shift -= cos_strike * src_a.length * 0.5
src_a.east_shift -= sin_strike * src_a.length * 0.5
elif asperity == 'right':
src_a.north_shift += cos_strike * src_a.length * 0.5
src_a.east_shift += sin_strike * src_a.length * 0.5
mag_old = src_a.get_magnitude(store, target=target)
src_a.slip *= pmt.magnitude_to_moment(
mag) / pmt.magnitude_to_moment(mag_old)
id_tuple = (asperity, src_a.length, src_a.width)
unique_sources[id_tuple] = src_a
return list(unique_sources.values())
sources = []
if 'velocity' in pc.ranges:
vmin = pc.ranges['velocity'].start
vmax = pc.ranges['velocity'].stop
else:
vs = store.config.get_vs(
event.lat, event.lon,
num.array([[
0., 0., event.depth]]),
interpolation='nearest_neighbor')
vmin, vmax = vs.min(), vs.max()
velocity = self._state.uniform(
size=(dataset.shape[0],),
low=vmin,
high=vmax)
payload = [(
dataset[i, 1],
dataset[i, 2],
dataset[i, 4],
dataset[i, 5],
dataset[i, 6],
dataset[i, 7],
dataset[i, 8],
dataset[i, 9],
velocity[i])
for i in range(dataset.shape[0])]
sources = []
for sources_out in parimap.parimap(
calculate_inject_sources,
*zip(*payload),
nprocs=nparallel):
sources += sources_out
# iterate through origins and append to sources
return [src.clone(nucleation_x=nx, nucleation_y=ny)
for src in sources for nx, ny in conf.nucleation]
def _generate_point_sources(self, event, store, dataset):
'''
Generate point sources based on prior data and injection config.
'''
from pyrocko import moment_tensor as pmt
conf = self.config
base_source = gf.MTSource.from_pyrocko_event(event)
if event.duration is not None and conf.time_reference == 'origin':
base_source.time += event.duration / 2.
payload = [(
dataset[i, 0],
dataset[i, 1],
dataset[i, 2],
dataset[i, 3],
dataset[i, 4],
dataset[i, 5],
dataset[i, 6],
dataset[i, 7],
dataset[i, 10])
for i in range(dataset.shape[0])]
payload = list(set(payload))
sources = []
for p in payload:
mt_src = pmt.MomentTensor(
strike=p[5],
dip=p[6],
rake=p[7],
magnitude=p[4])
source = base_source.clone(
time=base_source.time + p[0],
north_shift=p[1],
east_shift=p[2],
depth=p[3],
m6=mt_src.m6())
source.stf = gf.HalfSinusoidSTF(duration=p[8])
sources.append(source)
return sources
[docs] def get_sources(
self,
event,
store,
grond_config=None,
source_type='dynamic',
nparallel=1):
'''
Extract source models to be tested for the event location.
:param event:
Event to extract likely source models for. Both location, magnitude
and moment tensor information (if available) are used.
:type event:
:py:class:`pyrocko.model.event.Event`
:param store:
Green's function store needed for source generation.
:type store:
:py:class:`pyrocko.gf.store.Store`
:param grond_config:
Grond configuration file used to define lateral ranges and others.
:type grond_config:
:py:class:`grond.config.Config`
:param source_type:
If ``dynamic``, pseudo dynamic rupture models are extracted. If
``point``, moment tensor point sources are returned instead.
``rectangular`` yields rectangular extended rupture models.
:type source_type:
optional, str
:param nparallel:
Number of processes which run in parallel.
:type nparallel:
optional, int
:returns:
Source models to be injected.
:rtype:
list of
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture` or
:py:class:`~pyrocko.gf.seismosizer.RectangularSource` or
:py:class:`~pyrocko.gf.seismosizer.MTSource`
'''
logger.info('Extract sources based on injection configuration ...')
dataset = self._assemble_dataset(event, grond_config, store)
logger.info('Generate %s sources for %d inputs ...' %
(source_type, dataset.shape[0]))
if source_type == 'dynamic':
return self._generate_dynamic_sources(
event,
store,
dataset,
nparallel=nparallel)
elif source_type == 'rectangular':
return self._generate_rectangular_sources(
event,
store,
dataset,
nparallel=nparallel,
grond_config=grond_config)
elif source_type == 'point':
return self._generate_point_sources(
event,
store,
dataset)
def expand(base_path, path):
return op.abspath(op.join(base_path, path))
[docs]class Inverter(Object):
'''
Inverter base class.
'''
def run(self):
pass
def dump_results(self):
pass
[docs]class GrondInverter(Inverter):
grond_config = Config.T(
optional=True,
help='Grond config')
injection_config = SourceInjectionConfig.T(
optional=True,
help='Source injector config')
problem_name = String.T(
optional=True)
event_id = String.T(
optional=True,
help='Event id')
base_path = Path.T(
default='.')
report_path = Path.T(
optional=True)
store_id = String.T(
optional=True,
help='Store id to be used to generate the injected sources. If not '
'given, the store of the first target group is used.')
threads = Int.T(
optional=True,
default=1,
help='Number of maximum threads used.')
parallel = Int.T(
optional=True,
default=1,
help='Number of maximum processes running in parallel (for source '
'generation)')
def __init__(self, *args, **kwargs):
Inverter.__init__(self, *args, **kwargs)
self._store = None
self._event = None
self._sources = []
self._source_type = None
self._cwd = os.popen('pwd').readline().strip()
@property
def store(self):
'''
Pyrocko Green's Function store.
:returns:
Green's function store.
:rtype:
:py:class:`pyrocko.gf.store.Store`
'''
if self._store is not None:
return self._store
if self.store_id is not None:
store_id = self.store_id
else:
store_id = self.grond_config.target_groups[0].store_id
engine = self.grond_config.engine_config.get_engine()
self._store = engine.get_store(store_id=store_id)
return self._store
@property
def event(self):
'''
Pyrocko event.
:returns:
Event.
:rtype:
:py:class:`pyrocko.model.event.Event`
'''
if self._event is not None:
return self._event
dc = self.grond_config.dataset_config
def extra(path):
return expand_template(path, dict(event_name=self.event_id))
def fp(path):
p = dc.expand_path(path, extra=extra)
if p is None:
return None
if isinstance(p, list):
for path in p:
if not op.exists(path):
logger.warning('Path %s does not exist.' % path)
else:
if not op.exists(p):
logger.warning('Path %s does not exist.' % p)
return p
def check_event(event, fn):
if not event.name:
logger.warning('Event in %s has no name!', fn)
return
if not event.lat or not event.lon:
logger.warning('Event %s has inconsistent coordinates!',
event.name)
if not event.depth:
logger.warning('Event %s has no depth!', event.name)
if not event.time:
logger.warning('Event %s has no time!', event.name)
events_path = fp(dc.events_path)
fns = glob.glob(events_path)
if not fns:
raise DatasetError('No event files matching "%s".' % events_path)
for fn in fns:
logger.debug('Loading events from file %s' % fn)
events = model.load_events(filename=fn)
for ev in events:
if ev.name == self.event_id:
check_event(ev, fn)
self._event = ev
return self._event
@property
def reportconf_path(self):
'''
Path to the Grond report configuration file.
:returns:
Path to Grond report configuration file.
:rtype:
str
'''
if self.report_path is None:
raise ValueError('report_path needed')
return '_'.join([
expand(self.base_path, self.report_path),
'config.conf'])
@property
def sources_inject(self):
'''
Injected models from :py:class:`siria.si.invert.SourceInjector`.
:returns:
Sources to be injected within the run.
:rtype:
list of :py:class:`pyrocko.gf.seismosizer.Source`
'''
from grond.problems.cmt.problem import CMTProblemConfig
from grond.problems.rectangular.problem import RectangularProblemConfig
from grond.problems.dynamic_rupture.problem import \
DynamicRuptureProblemConfig
if self._sources:
return self._sources
if self.injection_config.fmax is None:
self.injection_config.fmax = num.max([
g.misfit_config.fmax for g in self.grond_config.target_groups])
injector = SourceInjector(config=self.injection_config)
source_type = ''
if isinstance(
self.grond_config.problem_config, CMTProblemConfig):
source_type = 'point'
elif isinstance(
self.grond_config.problem_config, RectangularProblemConfig):
source_type = 'rectangular'
elif isinstance(
self.grond_config.problem_config, DynamicRuptureProblemConfig):
source_type = 'dynamic'
else:
raise ValueError('Can not handle a problem of type {}'.format(
self.grond_config.problem_config.T.class_name))
self._source_type = source_type
self._sources = injector.get_sources(
self.event,
self.store,
grond_config=self.grond_config,
source_type=self._source_type,
nparallel=self.parallel)
return self._sources
@property
def fn_sources_inject(self):
'''
Path to the Grond source injection file.
:returns:
Path to Grond source injection file.
:rtype:
str
'''
if self.report_path is None:
raise ValueError('report_path needed')
return '_'.join([
expand(self.base_path, self.report_path),
self.problem_name,
'sources_inject.yaml'])
[docs] def sources_to_injector_config(self):
'''
Update Grond InjectionConfig for injected sources.
'''
if self.grond_config is not None:
conf = self.grond_config.optimiser_config
sp = conf.sampler_phases
conf.sampler_phases = [InjectionSamplerPhase(
niterations=len(self.sources_inject),
sources_path=self.fn_sources_inject)] + sp
self.grond_config.optimiser_config = conf
logger.info('Updated config to include injected sources')
else:
logger.warning('Could not update config - No config given')
[docs] def save_sources_inject(self):
'''
Save inject sources in file.
'''
if self.fn_sources_inject is not None:
dump_all(self.sources_inject, filename=self.fn_sources_inject)
logger.info('Saved sources to %s', self.fn_sources_inject)
else:
logger.warning('Could not save sources - No filename given')
[docs] def generate_reportconf(self):
'''
Generate and save Grond report config file.
'''
if self.report_path is not None:
report_conf = report.ReportConfig(
plot_config_collection=grondplot.get_plot_config_collection(),
report_base_path=self.report_path)
report_conf.set_basepath(self.base_path)
report_conf.dump(filename=self.reportconf_path)
logger.info(
'Dumped specific grond report config %s', self.reportconf_path)
[docs] def run(self, force=False):
'''
Run Grond optimization.
:param force:
If ``True``, possible existing older runs are overwritten
:type force:
bool
'''
if self.grond_config is None:
raise ValueError('Config file needed to run Grond Inversion')
conf = self.grond_config.clone()
conf.problem_config.name_template = self.problem_name
conf.set_basepath(self.base_path)
if self.event_id is not None:
conf.event_names = [self.event_id]
os.chdir(self.base_path)
env = Environment(config=conf, event_names=conf.event_names)
core.go(
environment=env,
force=force,
nparallel=1,
nthreads=self.threads,
status='state')
os.chdir(self._cwd)
def _grond_run_dir(self):
return expand_template(
self.grond_config.rundir_template,
dict(problem_name=self.problem_name))
[docs] def report(self):
'''
Generate Grond report for run for easier result validation.
'''
os.chdir(self.base_path)
fn_report_conf = op.relpath(
self.reportconf_path,
start=op.abspath(os.getcwd()))
subprocess.call([(
'grond report {} '
'--config={} '
'--no-archive '
'--threads={} '
'--parallel=1').format(
self._grond_run_dir(),
fn_report_conf,
self.threads)],
shell=True)
os.chdir(self._cwd)
[docs] def dump_siria_sources(self, result_dir):
'''
Read and converts results from grond run directory to SiriaSources.
'''
import time
from pyrocko.util import tts
from siria.io import grond as grondio
from siria.sources import dump_sources
os.chdir(self.base_path)
sources = grondio.grond_to_sources(rundir=self._grond_run_dir())
dump_sources(
sources,
dump_header=True,
filename=op.join(
result_dir,
'{}_{}.yaml'.format(
self._source_type,
tts(time.time(), format='%Y%m%d_%H%M%S'))))
os.chdir(self._cwd)
[docs] def dump_best_event(self, result_dir, fn):
'''
Read Grond results and stores best event in event file for later use.
'''
from siria.io import grond as grondio
os.chdir(self.base_path)
source, _ = grondio.get_best_source_and_misfit(
rundir=self._grond_run_dir())
event = source.pyrocko_event()
model.dump_events(
[event],
filename=op.join(result_dir, fn))
os.chdir(self._cwd)
[docs] def run_full(
self,
create_report=True,
result_dir='',
best_mt_fn='',
dump_result_as_sources=False,
*args,
**kwargs):
'''
Run full optimization loop.
Different steps as source injection, report config generation, the
Grond run and reporting (if wished) are performed.
Arguments and keyword arguments for
:py:meth:`siria.si.invert.GrondInverter.run` can be given.
:param create_report:
If ``True``, a Grond report is created after completing the run.
:type create_report:
optional, bool
:param result_dir:
Problem result directory.
:type result_dir:
optional, str
:param: dump_result_as_sources:
If ``True``, all tested sources are dumped as yaml file within the
result directory.
:type: dump_result_as_sources:
optional, bool
'''
self.sources_to_injector_config()
self.save_sources_inject()
self.run(*args, **kwargs)
if best_mt_fn:
self.dump_best_event(result_dir, best_mt_fn)
if dump_result_as_sources:
self.dump_siria_sources(result_dir=result_dir)
if create_report:
self.generate_reportconf()
self.report()