Source code for siria.si.invert

# GPLv3
#
# The Developers, 21st Century
import logging
import os
import os.path as op
import glob

import subprocess

import numpy as num

from pyrocko.guts import (
    Bool, Float, Int, List, Object, String, StringChoice, dump_all)
from pyrocko.guts_array import Array
from pyrocko import gf, model, moment_tensor as pmt, orthodrome as pod

from grond import core, InjectionSamplerPhase, report, plot as grondplot
from grond.config import Config
from grond.dataset import DatasetError
from grond.environment import Environment
from grond.meta import expand_template, Path

from siria.scaling import length_blaser, width_blaser
from siria.dataset import faults, tsumaps


logger = logging.getLogger('siria.si.invert')


d2r = num.pi / 180.
km = 1e3


[docs]class TimeReference(StringChoice): choices = ['origin', 'centroid']
[docs]class NEAMTHM18Sampler(Object): ''' Defines no. of samples drawn from NEAMTHM18 ''' interpolation = StringChoice.T( choices=( 'nearest', 'quadratic'), default='nearest', help='Defines, if the samples are drawn from the closest point or ' 'extraced from an interpolated mixing distribution.') n_samples = Int.T( default=2000, help='Number of samples drawn from the TSUMAPS distribution')
[docs]class SourceInjectionConfig(Object): ''' Configuration for prior based source injection ''' time_reference = TimeReference.T( default='origin', help='Choice, if the given event time is associate with the "origin" ' 'or the "centroid" time. That may lead to time corrections ' 'depending on the used source model.') nucleation_x = Array.T( default=num.array([-0.66, 0., 0.66]), serialize_as='list', dtype=num.float64, help='Relative origin x-coordinates to be tested.', optional=True) nucleation_y = Array.T( default=num.array([0., 0., 0.]), serialize_as='list', dtype=num.float64, help='Relative origin y-coordinates to be tested.', optional=True) tractions = Array.T( default=num.array([0.5, 1., 2.5, 5.]) * 1e6, serialize_as='list', dtype=num.float64, help='Set of stress drops to be tested.', optional=True) asperity = List.T( default=['total'], help='Which possible locations shall be tested. Choose and combine ' 'out of "total", "left", "center" and "right".', optional=True) neamthm18_sampler = NEAMTHM18Sampler.T( default=NEAMTHM18Sampler( interpolation='nearest', n_samples=2000), help='Settings, how samples are drawn from the NEAHMTHM18 database.') fault_database = StringChoice.T( choices=('DISS', 'GEM', 'EDSF', 'NOA'), default='EDSF', optional=True) fault_radius_max = Float.T( help='If set, only orientation data from faults within radius in [m].' ' Else the radius is derived from scaling relation.', optional=True) length_factor = Float.T( default=1., help='Factor to be applied on fault lengths from scaling relation.', optional=True) width_factor = Float.T( default=1., help='Factor to be applied on fault widths from scaling relation.', optional=True) include_dimension_uncertainty = Bool.T( default=False, help='If True, uncertainties within the scaling relations are applied ' ' to lengths and widths.', optional=True) fmax = Float.T( help='Maximum wave frequency of interest to be used for source model ' 'patch size scaling.', optional=True) vmin = Float.T( default=1000., help='Approximative minimum seismic wave speed [m/s] to be used for ' 'source model patch size scaling.', optional=True) correct_event_duration = Bool.T( default=True, help='If True and event duration is set, the model origin time is ' 'defined as event time - (event duration * 0.5).', optional=True) @property def nucleation(self): ''' Nucleation points as 2D array. :returns: Nucleation points to be injected. :rtype: :py:class:`numpy.ndarray`, shape: ``(N, 2)`` ''' if self.nucleation_x.shape != self.nucleation_y.shape: raise IndexError( 'nucleation_x and nucleation_y must be of same shape') nucl = num.zeros((self.nucleation_x.shape[0], 2)) nucl[:, 0] = self.nucleation_x nucl[:, 1] = self.nucleation_y return nucl
[docs]class SourceInjector(Object): ''' Extract sources to be injected in Grond. ''' config = SourceInjectionConfig.T( default=SourceInjectionConfig()) rseed = Int.T( default=1230, help='Set rseed for custom random state') def __init__(self, *args, **kwargs): Object.__init__(self, *args, **kwargs) self._state = num.random.RandomState(self.rseed) def _extract_from_tsumaps(self, lat, lon, grond_config): ''' Get orientation from tsumaps (NEAMTHM18) database for given location. ''' gronf = grond_config pc = gronf.problem_config sampler = self.config.neamthm18_sampler nsamples = sampler.n_samples points = num.zeros((nsamples, 4)) points[:, 0] = lat points[:, 1] = lon points[:, 2] = self._state.uniform( size=(nsamples,), low=pc.ranges['north_shift'].start, high=pc.ranges['north_shift'].stop) points[:, 3] = self._state.uniform( size=(nsamples,), low=pc.ranges['east_shift'].start, high=pc.ranges['east_shift'].stop) samples = tsumaps.sample_probs_from_distribution( points, interpolation=sampler.interpolation, rstate=self._state) logger.info('Extracted %d models from NEAMTHM18' % samples.shape[0]) return ( points, list(samples[:, 0]), list(samples[:, 1]), list(samples[:, 2])) def _extract_from_faults( self, lat, lon, mt=None, magnitude=None, grond_config=None): ''' Get faults from chosen fault database for given location. ''' conf = self.config if conf.fault_radius_max is not None: radius = conf.fault_radius_max elif mt is not None: radius = 0.5 * num.mean([ length_blaser(mt.magnitude, rake, use_errors=False) for rake in (mt.rake1, mt.rake2)]) logger.info( 'Calculated fault search radius of %f m using the MT', radius) elif magnitude is not None: radius = 0.5 * num.mean([ length_blaser(magnitude, rake, use_errors=False) for rake in (0., 90., 270.)]) logger.info('Caluclated fault search radius of %f m using ' 'magnitude', radius) rg = grond_config.problem_config.ranges radius_grond = num.sqrt( num.max([rg['north_shift'].start, rg['north_shift'].stop])**2. + num.max([rg['east_shift'].start, rg['east_shift'].stop])**2.) radius += radius_grond logger.info('Lateral model space extent of %f m added to fault ' 'search radius', radius_grond) fault_list = faults.load_faults_from_database( database=conf.fault_database, lat=lat, lon=lon, radius=radius, restrict_to_radius=True) lats, lons, strikes, dips, rakes = [], [], [], [], [] for f in fault_list: if any([p is None for p in ( f.lat, f.lon, f.strike_min, f.strike_max, f.dip_min, f.dip_max, f.rake_min, f.rake_max)]): logger.warning('Ignored fault %s due to missing orientation', f.__str__()) continue lats += list(num.repeat(f.lat, 3)) lons += list(num.repeat(f.lon, 3)) npoints = len(f.lat) strikes += list(num.tile([ f.strike_min, f.strike_max, num.mean([f.strike_min, f.strike_max])], npoints)) dips += list(num.tile([ f.dip_min, f.dip_max, num.mean([f.dip_min, f.dip_max])], npoints)) rakes += list(num.tile([ f.rake_min, f.rake_max, num.mean([f.rake_min, f.rake_max])], npoints)) norths, easts = pod.latlon_to_ne_numpy( lat, lon, num.array(lats), num.array(lons)) return norths, easts, strikes, dips, rakes def _assemble_dataset(self, event, grond_config, store): conf = self.config gronf = grond_config lat, lon = event.effective_lat, event.effective_lon mt = event.moment_tensor norths = [] easts = [] strikes = [] dips = [] rakes = [] lengths = [] widths = [] if mt is not None: norths += [0., 0.] easts += [0., 0.] strikes += [mt.strike1, mt.strike2] dips += [mt.dip1, mt.dip2] rakes += [mt.rake1, mt.rake2] tpoints, tstrikes, tdips, trakes = self._extract_from_tsumaps( lat, lon, gronf) norths += list(tpoints[:, 2]) easts += list(tpoints[:, 3]) strikes += list(tstrikes) dips += list(tdips) rakes += list(trakes) fnorths, feasts, fstrikes, fdips, frakes = self._extract_from_faults( lat, lon, mt=event.moment_tensor, magnitude=event.magnitude, grond_config=gronf) norths += list(fnorths) easts += list(feasts) strikes += fstrikes dips += fdips rakes += frakes nsamples = len(norths) mags = mt.magnitude if mt is not None else event.magnitude mags = num.ones(nsamples) * mags durations = 0. if event.duration is None else num.ones(event.duration) durations = num.ones(nsamples) * durations if gronf is not None: if 'magnitude' in gronf.problem_config.ranges: mg_range = gronf.problem_config.ranges['magnitude'] mags = self._state.uniform( size=(nsamples,), low=mg_range.start, high=mg_range.stop) if 'duration' in gronf.problem_config.ranges: dur_range = gronf.problem_config.ranges['duration'] durations = self._state.uniform( size=(nsamples,), low=dur_range.start, high=dur_range.stop) depth_min = store.config.source_depth_min depth_max = store.config.source_depth_max if gronf is not None and 'depth' in gronf.problem_config.ranges: depth_range = gronf.problem_config.ranges['depth'] depth_min = depth_range.start depth_max = depth_range.stop depths = num.random.uniform( size=(nsamples,), low=depth_min, high=depth_max) if gronf is not None and 'time' in gronf.problem_config.ranges: time_range = gronf.problem_config.ranges['time'] time_min = time_range.start time_max = time_range.stop times = num.random.uniform( size=(nsamples,), low=time_min, high=time_max) lengths = [ conf.length_factor * length_blaser( mag, rake, use_errors=conf.include_dimension_uncertainty) for mag, rake in zip(mags, rakes)] widths = [ conf.width_factor * width_blaser( mag, rake, use_errors=conf.include_dimension_uncertainty) for mag, rake in zip(mags, rakes)] strikes = num.array(strikes) strikes %= 360. strikes[strikes > 180.] -= 360. rakes = num.array(rakes) rakes %= 360. rakes[rakes > 180.] -= 360. data = [ times, norths, easts, depths, mags, strikes, dips, rakes, lengths, widths, durations] dataset = num.zeros(shape=(nsamples, len(data))) for i, d in enumerate(data): dataset[:, i] = d[:] return dataset def _generate_dynamic_sources( self, event, store, dataset, nparallel): ''' Generate dynamic sources based on prior data and injection config. ''' from pyrocko import parimap conf = self.config mt = event.moment_tensor base_source = gf.PseudoDynamicRupture.from_pyrocko_event( event, anchor='top', smooth_rupture=True, pure_shear=True, decimation_factor=1., gamma=0.8, stf=None, rake=None, slip=None) if event.duration is not None and conf.time_reference == 'centroid': base_source.time -= event.duration / 2. # Subfault spacing delta_patch = num.min(store.config.deltas) / 2. wavelength_min = conf.vmin / conf.fmax delta_patch = delta_patch \ if wavelength_min / 2. < delta_patch else wavelength_min / 2. def calculate_inject_sources( north, east, strike, dip, rake, length, width): src = base_source.clone( north_shift=north, east_shift=east, length=length, width=width, strike=strike, dip=dip, tractions=gf.tractions.DirectedTractions( rake=rake, traction=1.), nx=int(num.ceil(length / delta_patch)), ny=int(num.ceil(width / delta_patch))) store_dmin = store.config.source_depth_min store_dmax = store.config.source_depth_max # Reset depth, N, E to account for top edge # Check with store delta depth, source.depth and source.width if dip != 0.: delta_depth = num.sin(src.dip * d2r) * 0.5 * src.width if delta_depth > (src.depth - store_dmin): delta_depth = src.depth - store_dmin x = -delta_depth / num.tan(src.dip * d2r) else: delta_depth = 0. x = -src.width * 0.5 n = -num.sin(src.strike * d2r) * x e = num.cos(src.strike * d2r) * x src.north_shift += n src.east_shift += e src.depth -= delta_depth delta_depth_store = store_dmax - store_dmin delta_depth = num.sin(src.dip * d2r) * src.width if delta_depth > delta_depth_store: src.update(width=delta_depth_store / num.sin(src.dip * d2r)) logger.info('Source width reduced to %g m due to limited ' 'store source depth range.', src.width) if delta_depth + src.depth > store_dmax: src.update( width=(store_dmax - src.depth) / num.sin(src.dip * d2r)) logger.info('Source width reduced to %g m due to limited ' 'store source depth range.', src.width) # Sample different tractions unique_sources = {} for t in conf.tractions: src_t = src.clone(patch_mask=None) src_t.tractions.traction = t mrate, mtimes = src_t.get_moment_rate(store) moment = num.nancumsum(mrate * store.config.deltat, axis=0) try: t_max = mtimes[ num.where(moment - mt.moment >= 0.)[0][0]] except IndexError: t_max = None # patch masking slip = num.linalg.norm( src_t.get_slip(time=t_max), axis=1).ravel() src_t.patch_mask = slip != 0. patches = [p for p, msk in zip(src_t.patches, src_t.patch_mask) if msk] ul = num.array([(p.ix + p.al1, p.iy + p.aw1) for p in patches]) lr = num.array([(p.ix + p.al2, p.iy + p.aw2) for p in patches]) src_t.update( length=lr[:, 0].max() - ul[:, 0].min(), width=lr[:, 1].max() - ul[:, 1].min()) # check for unique lengths, widths -> remove redundant models if (src_t.length, src_t.width) in unique_sources: continue src_t._interpolators = {} src_t.update( nx=int(num.ceil(src_t.length / delta_patch)), ny=int(num.ceil(src_t.width / delta_patch)), patches=None, coef_mat=None, patch_mask=None) for asperity in conf.asperity: if asperity != 'total': src_a = src_t.clone(length=src_t.length / 2.) else: src_a = src_t.clone() sin_strike = num.sin(src_a.strike * d2r) cos_strike = num.cos(src_a.strike * d2r) if asperity == 'left': src_a.north_shift -= cos_strike * src_a.length * 0.5 src_a.east_shift -= sin_strike * src_a.length * 0.5 elif asperity == 'right': src_a.north_shift += cos_strike * src_a.length * 0.5 src_a.east_shift += sin_strike * src_a.length * 0.5 # update source and get maximum slip (important for Grond) src_a.rescale_slip(moment=mt.moment, store=store) src_a.update( patches=None, coef_mat=None, patch_mask=None, tractions=None, rake=rake) id_tuple = (asperity, src_a.length, src_a.width) unique_sources[id_tuple] = src_a return list(unique_sources.values()) sources = [] payload = [( dataset[i, 1], dataset[i, 2], dataset[i, 5], dataset[i, 6], dataset[i, 7], dataset[i, 8], dataset[i, 9]) for i in range(dataset.shape[0])] sources = [] for sources_out in parimap.parimap( calculate_inject_sources, *zip(*payload), nprocs=nparallel): sources += sources_out # iterate through origins and append to sources return [src.clone(nucleation_x=nx, nucleation_y=ny) for src in sources for nx, ny in conf.nucleation] def _generate_rectangular_sources( self, event, store, dataset, nparallel, grond_config): ''' Generate rectangular sources based on prior data and injection config. ''' from pyrocko import parimap from siria.gm.util import calc_rise_time gronf = grond_config pc = gronf.problem_config conf = self.config base_source = gf.RectangularSource.from_pyrocko_event( event, magnitude=None, slip=1., anchor='top', decimation_factor=1., velocity=store.config.get_vs( event.lat, event.lon, num.array([[ 0., 0., event.depth]]), interpolation='nearest_neighbor')[0]) if event.duration is not None and conf.time_reference == 'centroid': base_source.time -= event.duration / 2. magnitude = event.magnitude or event.moment_tensor.magnitude if base_source.stf is None: base_source.stf = gf.HalfSinusoidSTF() base_source.stf.duration = calc_rise_time(mag=magnitude) # base_source.magnitude = magnitude def calculate_inject_sources( north, east, mag, strike, dip, rake, length, width, velocity): src = base_source.clone( north_shift=north, east_shift=east, length=length, width=width, strike=strike, dip=dip, rake=rake, velocity=velocity) store_dmin = store.config.source_depth_min store_dmax = store.config.source_depth_max # Reset depth, N, E to account for top edge # Check with store delta depth, source.depth and source.width if dip != 0.: delta_depth = num.sin(src.dip * d2r) * 0.5 * src.width if delta_depth > (src.depth - store_dmin): delta_depth = src.depth - store_dmin x = -delta_depth / num.tan(src.dip * d2r) else: delta_depth = 0. x = -src.width * 0.5 n = -num.sin(src.strike * d2r) * x e = num.cos(src.strike * d2r) * x src.north_shift += n src.east_shift += e src.depth -= delta_depth delta_depth_store = store_dmax - store_dmin delta_depth = num.sin(src.dip * d2r) * src.width if delta_depth > delta_depth_store: src.update(width=delta_depth_store / num.sin(src.dip * d2r)) logger.info('Source width reduced to %g m due to limited ' 'store source depth range.', src.width) if delta_depth + src.depth > store_dmax: src.update( width=(store_dmax - src.depth) / num.sin(src.dip * d2r)) logger.info('Source width reduced to %g m due to limited ' 'store source depth range.', src.width) target = gf.Target( interpolation='nearest_neighbor') unique_sources = {} for asperity in conf.asperity: if asperity != 'total': src_a = src.clone(length=src.length / 2.) else: src_a = src.clone() sin_strike = num.sin(src_a.strike * d2r) cos_strike = num.cos(src_a.strike * d2r) if asperity == 'left': src_a.north_shift -= cos_strike * src_a.length * 0.5 src_a.east_shift -= sin_strike * src_a.length * 0.5 elif asperity == 'right': src_a.north_shift += cos_strike * src_a.length * 0.5 src_a.east_shift += sin_strike * src_a.length * 0.5 mag_old = src_a.get_magnitude(store, target=target) src_a.slip *= pmt.magnitude_to_moment( mag) / pmt.magnitude_to_moment(mag_old) id_tuple = (asperity, src_a.length, src_a.width) unique_sources[id_tuple] = src_a return list(unique_sources.values()) sources = [] if 'velocity' in pc.ranges: vmin = pc.ranges['velocity'].start vmax = pc.ranges['velocity'].stop else: vs = store.config.get_vs( event.lat, event.lon, num.array([[ 0., 0., event.depth]]), interpolation='nearest_neighbor') vmin, vmax = vs.min(), vs.max() velocity = self._state.uniform( size=(dataset.shape[0],), low=vmin, high=vmax) payload = [( dataset[i, 1], dataset[i, 2], dataset[i, 4], dataset[i, 5], dataset[i, 6], dataset[i, 7], dataset[i, 8], dataset[i, 9], velocity[i]) for i in range(dataset.shape[0])] sources = [] for sources_out in parimap.parimap( calculate_inject_sources, *zip(*payload), nprocs=nparallel): sources += sources_out # iterate through origins and append to sources return [src.clone(nucleation_x=nx, nucleation_y=ny) for src in sources for nx, ny in conf.nucleation] def _generate_point_sources(self, event, store, dataset): ''' Generate point sources based on prior data and injection config. ''' from pyrocko import moment_tensor as pmt conf = self.config base_source = gf.MTSource.from_pyrocko_event(event) if event.duration is not None and conf.time_reference == 'origin': base_source.time += event.duration / 2. payload = [( dataset[i, 0], dataset[i, 1], dataset[i, 2], dataset[i, 3], dataset[i, 4], dataset[i, 5], dataset[i, 6], dataset[i, 7], dataset[i, 10]) for i in range(dataset.shape[0])] payload = list(set(payload)) sources = [] for p in payload: mt_src = pmt.MomentTensor( strike=p[5], dip=p[6], rake=p[7], magnitude=p[4]) source = base_source.clone( time=base_source.time + p[0], north_shift=p[1], east_shift=p[2], depth=p[3], m6=mt_src.m6()) source.stf = gf.HalfSinusoidSTF(duration=p[8]) sources.append(source) return sources
[docs] def get_sources( self, event, store, grond_config=None, source_type='dynamic', nparallel=1): ''' Extract source models to be tested for the event location. :param event: Event to extract likely source models for. Both location, magnitude and moment tensor information (if available) are used. :type event: :py:class:`pyrocko.model.event.Event` :param store: Green's function store needed for source generation. :type store: :py:class:`pyrocko.gf.store.Store` :param grond_config: Grond configuration file used to define lateral ranges and others. :type grond_config: :py:class:`grond.config.Config` :param source_type: If ``dynamic``, pseudo dynamic rupture models are extracted. If ``point``, moment tensor point sources are returned instead. ``rectangular`` yields rectangular extended rupture models. :type source_type: optional, str :param nparallel: Number of processes which run in parallel. :type nparallel: optional, int :returns: Source models to be injected. :rtype: list of :py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture` or :py:class:`~pyrocko.gf.seismosizer.RectangularSource` or :py:class:`~pyrocko.gf.seismosizer.MTSource` ''' logger.info('Extract sources based on injection configuration ...') dataset = self._assemble_dataset(event, grond_config, store) logger.info('Generate %s sources for %d inputs ...' % (source_type, dataset.shape[0])) if source_type == 'dynamic': return self._generate_dynamic_sources( event, store, dataset, nparallel=nparallel) elif source_type == 'rectangular': return self._generate_rectangular_sources( event, store, dataset, nparallel=nparallel, grond_config=grond_config) elif source_type == 'point': return self._generate_point_sources( event, store, dataset)
def expand(base_path, path): return op.abspath(op.join(base_path, path))
[docs]class Inverter(Object): ''' Inverter base class. ''' def run(self): pass def dump_results(self): pass
[docs]class GrondInverter(Inverter): grond_config = Config.T( optional=True, help='Grond config') injection_config = SourceInjectionConfig.T( optional=True, help='Source injector config') problem_name = String.T( optional=True) event_id = String.T( optional=True, help='Event id') base_path = Path.T( default='.') report_path = Path.T( optional=True) store_id = String.T( optional=True, help='Store id to be used to generate the injected sources. If not ' 'given, the store of the first target group is used.') threads = Int.T( optional=True, default=1, help='Number of maximum threads used.') parallel = Int.T( optional=True, default=1, help='Number of maximum processes running in parallel (for source ' 'generation)') def __init__(self, *args, **kwargs): Inverter.__init__(self, *args, **kwargs) self._store = None self._event = None self._sources = [] self._source_type = None self._cwd = os.popen('pwd').readline().strip() @property def store(self): ''' Pyrocko Green's Function store. :returns: Green's function store. :rtype: :py:class:`pyrocko.gf.store.Store` ''' if self._store is not None: return self._store if self.store_id is not None: store_id = self.store_id else: store_id = self.grond_config.target_groups[0].store_id engine = self.grond_config.engine_config.get_engine() self._store = engine.get_store(store_id=store_id) return self._store @property def event(self): ''' Pyrocko event. :returns: Event. :rtype: :py:class:`pyrocko.model.event.Event` ''' if self._event is not None: return self._event dc = self.grond_config.dataset_config def extra(path): return expand_template(path, dict(event_name=self.event_id)) def fp(path): p = dc.expand_path(path, extra=extra) if p is None: return None if isinstance(p, list): for path in p: if not op.exists(path): logger.warning('Path %s does not exist.' % path) else: if not op.exists(p): logger.warning('Path %s does not exist.' % p) return p def check_event(event, fn): if not event.name: logger.warning('Event in %s has no name!', fn) return if not event.lat or not event.lon: logger.warning('Event %s has inconsistent coordinates!', event.name) if not event.depth: logger.warning('Event %s has no depth!', event.name) if not event.time: logger.warning('Event %s has no time!', event.name) events_path = fp(dc.events_path) fns = glob.glob(events_path) if not fns: raise DatasetError('No event files matching "%s".' % events_path) for fn in fns: logger.debug('Loading events from file %s' % fn) events = model.load_events(filename=fn) for ev in events: if ev.name == self.event_id: check_event(ev, fn) self._event = ev return self._event @property def reportconf_path(self): ''' Path to the Grond report configuration file. :returns: Path to Grond report configuration file. :rtype: str ''' if self.report_path is None: raise ValueError('report_path needed') return '_'.join([ expand(self.base_path, self.report_path), 'config.conf']) @property def sources_inject(self): ''' Injected models from :py:class:`siria.si.invert.SourceInjector`. :returns: Sources to be injected within the run. :rtype: list of :py:class:`pyrocko.gf.seismosizer.Source` ''' from grond.problems.cmt.problem import CMTProblemConfig from grond.problems.rectangular.problem import RectangularProblemConfig from grond.problems.dynamic_rupture.problem import \ DynamicRuptureProblemConfig if self._sources: return self._sources if self.injection_config.fmax is None: self.injection_config.fmax = num.max([ g.misfit_config.fmax for g in self.grond_config.target_groups]) injector = SourceInjector(config=self.injection_config) source_type = '' if isinstance( self.grond_config.problem_config, CMTProblemConfig): source_type = 'point' elif isinstance( self.grond_config.problem_config, RectangularProblemConfig): source_type = 'rectangular' elif isinstance( self.grond_config.problem_config, DynamicRuptureProblemConfig): source_type = 'dynamic' else: raise ValueError('Can not handle a problem of type {}'.format( self.grond_config.problem_config.T.class_name)) self._source_type = source_type self._sources = injector.get_sources( self.event, self.store, grond_config=self.grond_config, source_type=self._source_type, nparallel=self.parallel) return self._sources @property def fn_sources_inject(self): ''' Path to the Grond source injection file. :returns: Path to Grond source injection file. :rtype: str ''' if self.report_path is None: raise ValueError('report_path needed') return '_'.join([ expand(self.base_path, self.report_path), self.problem_name, 'sources_inject.yaml'])
[docs] def sources_to_injector_config(self): ''' Update Grond InjectionConfig for injected sources. ''' if self.grond_config is not None: conf = self.grond_config.optimiser_config sp = conf.sampler_phases conf.sampler_phases = [InjectionSamplerPhase( niterations=len(self.sources_inject), sources_path=self.fn_sources_inject)] + sp self.grond_config.optimiser_config = conf logger.info('Updated config to include injected sources') else: logger.warning('Could not update config - No config given')
[docs] def save_sources_inject(self): ''' Save inject sources in file. ''' if self.fn_sources_inject is not None: dump_all(self.sources_inject, filename=self.fn_sources_inject) logger.info('Saved sources to %s', self.fn_sources_inject) else: logger.warning('Could not save sources - No filename given')
[docs] def generate_reportconf(self): ''' Generate and save Grond report config file. ''' if self.report_path is not None: report_conf = report.ReportConfig( plot_config_collection=grondplot.get_plot_config_collection(), report_base_path=self.report_path) report_conf.set_basepath(self.base_path) report_conf.dump(filename=self.reportconf_path) logger.info( 'Dumped specific grond report config %s', self.reportconf_path)
[docs] def run(self, force=False): ''' Run Grond optimization. :param force: If ``True``, possible existing older runs are overwritten :type force: bool ''' if self.grond_config is None: raise ValueError('Config file needed to run Grond Inversion') conf = self.grond_config.clone() conf.problem_config.name_template = self.problem_name conf.set_basepath(self.base_path) if self.event_id is not None: conf.event_names = [self.event_id] os.chdir(self.base_path) env = Environment(config=conf, event_names=conf.event_names) core.go( environment=env, force=force, nparallel=1, nthreads=self.threads, status='state') os.chdir(self._cwd)
def _grond_run_dir(self): return expand_template( self.grond_config.rundir_template, dict(problem_name=self.problem_name))
[docs] def report(self): ''' Generate Grond report for run for easier result validation. ''' os.chdir(self.base_path) fn_report_conf = op.relpath( self.reportconf_path, start=op.abspath(os.getcwd())) subprocess.call([( 'grond report {} ' '--config={} ' '--no-archive ' '--threads={} ' '--parallel=1').format( self._grond_run_dir(), fn_report_conf, self.threads)], shell=True) os.chdir(self._cwd)
[docs] def dump_siria_sources(self, result_dir): ''' Read and converts results from grond run directory to SiriaSources. ''' import time from pyrocko.util import tts from siria.io import grond as grondio from siria.sources import dump_sources os.chdir(self.base_path) sources = grondio.grond_to_sources(rundir=self._grond_run_dir()) dump_sources( sources, dump_header=True, filename=op.join( result_dir, '{}_{}.yaml'.format( self._source_type, tts(time.time(), format='%Y%m%d_%H%M%S')))) os.chdir(self._cwd)
[docs] def dump_best_event(self, result_dir, fn): ''' Read Grond results and stores best event in event file for later use. ''' from siria.io import grond as grondio os.chdir(self.base_path) source, _ = grondio.get_best_source_and_misfit( rundir=self._grond_run_dir()) event = source.pyrocko_event() model.dump_events( [event], filename=op.join(result_dir, fn)) os.chdir(self._cwd)
[docs] def run_full( self, create_report=True, result_dir='', best_mt_fn='', dump_result_as_sources=False, *args, **kwargs): ''' Run full optimization loop. Different steps as source injection, report config generation, the Grond run and reporting (if wished) are performed. Arguments and keyword arguments for :py:meth:`siria.si.invert.GrondInverter.run` can be given. :param create_report: If ``True``, a Grond report is created after completing the run. :type create_report: optional, bool :param result_dir: Problem result directory. :type result_dir: optional, str :param: dump_result_as_sources: If ``True``, all tested sources are dumped as yaml file within the result directory. :type: dump_result_as_sources: optional, bool ''' self.sources_to_injector_config() self.save_sources_inject() self.run(*args, **kwargs) if best_mt_fn: self.dump_best_event(result_dir, best_mt_fn) if dump_result_as_sources: self.dump_siria_sources(result_dir=result_dir) if create_report: self.generate_reportconf() self.report()