# GPLv3
#
# The Developers, 21st Century
import os.path as op
import numpy as num
from scipy import io as sio
from scipy.interpolate import griddata
from pyrocko.orthodrome import points_in_region, distance_accurate50m_numpy
from siria import util
class NotInRegion(Exception):
pass
def tsumaps_file():
name = 'tsumaps_all_params'
fn = util.data_file(op.join('%s.mat' % name))
if not op.exists(fn):
raise Exception('tsumaps file does not exist: %s' % fn)
return fn
[docs]def load(fn=None):
'''
Load tsumaps mat file into dictionary.
The tsumaps mat file stored in siria.data is loaded. The dictionary which
is returned has three keys:
============ ============================ ==============================
Name Size Description
============ ============================ ==============================
``'latlon'`` `(No points, 2)` coordinates of each grid point
in lat, lon.
``'fm'`` `(No focal mecs, 3)` | first nodal plane orientation
for used focal mechanisms
| in strike, dip, rake.
``'prob'`` `(No points, No focal mecs)` | probabilities for each given
focal mechanism nodal plane
| orientation at each point of
the grid.
============ ============================ ==============================
:rtype:
dict of :py:class:`~numpy.ndarray`
:returns:
Dictionary with ``latlon`` (geographical coordinates lat, lon in [deg]
of each point), ``fm`` (first nodal plane orientaion for used focal
mechanisms with columns being ``strike``, ``dip`` and ``rake``) and
``prob`` (Probabilities for each given focal mechanism nodal plane
orientation at each point of the grid).
'''
if fn is None:
fn = tsumaps_file()
return sio.loadmat(fn)
[docs]def get_probs(
lat,
lon,
interpolation='nearest_neighbor',
cumulative_probability_max=None,
probability_gradient_min=None,
nmodels_max=None,
*args,
**kwargs):
'''
Get tsumap focal mechanism probabilites at a given location.
:param lat:
Latitude of the chosen point in [deg].
:type lat:
float
:param lon:
Longitude of the chosen point in [deg].
:type lon:
float
:param interpolation:
Interpolation method to get probabilities at `lat, lon`. Choices are:
- nearest_neighbor
- multilinear.
:type interpolation:
optional, str
:param cumulative_probability_max:
Return sorted models needed to exceed the given cumulative probability.
:type cumulative_probability_max:
optional, float
:param probability_gradient_min:
Return sorted models needed to undercut the given proability gradient.
:type probability_gradient_min:
optional, float
:param nmodels_max:
Return only the given number of most probable models.
:type nmodels_max:
optional, int
:returns:
Focal mechanism nodal plane orientations (strike, dip, rake) and
the corresponding interpolated probabilities. The sum of all
probabilites is 1, if not giving any further constraint.
:rtype:
:py:class:`~numpy.ndarray`, size: ``(No focal mecs, 3)``,
:py:class:`~numpy.ndarray`, size: ``(No focal mecs, )``
'''
n_metrics = sum(1 for _ in filter(
None.__ne__, (
cumulative_probability_max,
probability_gradient_min,
nmodels_max)))
if n_metrics > 1:
raise ValueError(
'cumulative_probability_max, probability_gradient_min and '
'nmodels_max are mutually exclusive. Give only one.')
tsumap = load(*args, **kwargs)
latlon = tsumap['latlon']
fms = tsumap['fm']
probs = tsumap['prob']
if (lat > num.max(latlon[:, 0]) or lat < num.min(latlon[:, 0]) or
lon > num.max(latlon[:, 1]) or lon < num.min(latlon[:, 1])):
raise NotInRegion('Point not covered by tsumaps region')
choices = dict(
nearest_neighbor='nearest',
multilinear='linear')
if interpolation not in choices:
raise ValueError(
'interpolation must be either {}'.format(' or '.join(choices)))
probs_interp = num.zeros(fms.shape[0])
for i, fm in enumerate(fms):
probs_interp[i] = griddata(
points=latlon, values=probs[:, i], xi=num.array([lat, lon]),
method=choices[interpolation])
probs_interp = probs_interp / num.sum(probs_interp)
idx = num.argsort(probs_interp)[::-1]
fms = fms[idx]
probs_interp = probs_interp[idx]
if cumulative_probability_max is not None:
cum_prob = num.cumsum(probs_interp)
cum_prob /= cum_prob.max()
idx = num.where(cum_prob >= cumulative_probability_max)[0]
idx = idx[0] + 1 if idx.size else cum_prob.shape[0]
return fms[:idx], probs_interp[:idx]
elif probability_gradient_min is not None:
idx = num.where(probs_interp < probability_gradient_min)[0]
idx = idx[0] if idx.size else probs_interp.shape[0]
return fms[:idx], probs_interp[:idx]
elif nmodels_max is not None:
return fms[:nmodels_max], probs_interp[:nmodels_max]
else:
return fms, probs_interp
[docs]def get_probs_at_nodes(
lat=None,
lon=None,
wesn=None,
*args,
**kwargs):
'''
Get tsumap focal mechanism probabilites at grid node point(s).
:param lat:
Latitude of the chosen point in [deg].
:type lat:
optional, float
:param lon:
Longitude of the chosen point in [deg].
:type lon:
optional, float
:param wesn:
Rectangle bounding the region as (west, east, south, north) in [deg].
:type wesn:
optional, tuple(float, float, float, float)
:returns:
Iterator of:
- The grid node location as (lat, lon) in [deg]
- Focal mechanism nodal plane orientations (strike, dip, rake) and
- the corresponding interpolated probabilities.
:rtype:
:py:class:`~numpy.ndarray`, size: ``(2, )``,
:py:class:`~numpy.ndarray`, size: ``(No focal mecs, 3)``,
:py:class:`~numpy.ndarray`, size: ``(No focal mecs, )``
'''
tsumap = load(*args, **kwargs)
latlon = tsumap['latlon']
fms = tsumap['fm']
probs = tsumap['prob']
if lat is not None and lon is not None:
if (lat > num.max(latlon[:, 0]) or lat < num.min(latlon[:, 0]) or
lon > num.max(latlon[:, 1]) or lon < num.min(latlon[:, 1])):
raise NotInRegion('Point not covered by tsumaps region')
dist = distance_accurate50m_numpy(lat, lon, latlon[:, 0], latlon[:, 1])
mask = dist == dist.min()
elif wesn is not None:
mask = points_in_region(latlon, wesn)
else:
raise ValueError('Give either ``lat`` and ``lon`` or ``wesn``.')
latlon = latlon[mask, :]
probs = probs[mask, :]
if num.sum(mask) == 0.:
raise ValueError('Please enlarge the ``wesn`` region of interest.')
for i in range(latlon.shape[0]):
yield latlon[i, :], fms, probs[i, :]
[docs]def sample_probs_from_distribution(
points,
interpolation='nearest',
rstate=None):
'''
Sample probabilities from individual Kernel Density Estimates.
Individual KDE are determined for strike, dip and rake based on the closest
point(s) of the NEAMTHM18 TSUMAPS database. For the ``quadratic``
interpolation the four closest data nodes are taken into account with a
respective weighting of :math:`1/d^2`.
:param points:
Locations of points, where samples shall be drawn. Location are given
by latitude and longitude in [deg] and north and east shift in [m].
:type points:
:py:class:`~numpy.ndarray`, size ``(npoints, 4)``
:param interpolation:
If ``nearest``, samples are drawn from data of the closest TSUMAPS
node. If ``quadratic``, samples are drawn from a mixing distribution
consisting of the distance weighted data of the four closest nodes.
The weighting factor is :math:`1 / {distance^2}`.
:type interpolation:
optional, str
:param rstate:
Random state used for sampling.
:type rstate:
:py:class:`~numpy.random.RandomState`
:returns:
Samples from the distribution with strikes, dips and rakes.
:rtype:
:py:class:`~numpy.ndarray`, size ``(npoints, 3)``
'''
from pyrocko import orthodrome as pod
from sklearn.neighbors import KernelDensity
tsumap = load()
latlon = tsumap['latlon']
fms = tsumap['fm']
probs = tsumap['prob']
delta_strike = num.diff(num.sort(num.unique(fms[:, 0]))).max()
delta_dip = num.diff(num.sort(num.unique(fms[:, 1]))).max()
delta_rake = num.diff(num.sort(num.unique(fms[:, 2]))).max()
tnorth, teast = pod.latlon_to_ne_numpy(
latlon[0, 0], latlon[0, 1],
latlon[:, 0], latlon[:, 1])
pnorth, peast = pod.latlon_to_ne_numpy(
latlon[0, 0], latlon[0, 1],
points[:, 0], points[:, 1])
pnorth += points[:, 2]
peast += points[:, 3]
plat, plon = pod.ne_to_latlon(
points[:, 0], points[:, 1],
points[:, 2], points[:, 3])
mask_inside = pod.points_in_region(
num.stack((plat, plon)).T,
(latlon[:, 1].min(), latlon[:, 1].max(),
latlon[:, 0].min(), latlon[:, 0].max(),))
dists = num.zeros(shape=(latlon.shape[0], points.shape[0]))
for i in range(latlon.shape[0]):
dists[i, :] = num.linalg.norm(
[pnorth - tnorth[i], peast - teast[i]],
axis=0)
samples = num.zeros(shape=(points.shape[0], 3))
if interpolation == 'nearest':
idx = num.argmin(dists, axis=0)
inside = idx[mask_inside]
for i in num.unique(inside):
kwargs_kde = dict(
kernel='gaussian',
metric='euclidean',
algorithm='ball_tree')
nsamples = num.sum(idx == i)
for ii, bw in enumerate((delta_strike, delta_dip, delta_rake)):
kde = KernelDensity(
bandwidth=bw / 2.,
**kwargs_kde).fit(
fms[:, ii][:, num.newaxis], sample_weight=probs[i, :])
samples[idx == i, ii] = kde.sample(
n_samples=nsamples,
random_state=rstate).ravel()
nsamples = (~mask_inside).sum()
samples[~mask_inside, :] = rstate.uniform(-180., 180., (nsamples, 3))
samples[~mask_inside, 0] *= 360.
samples[~mask_inside, 1] *= 90.
samples[~mask_inside, 2] *= 360.
elif interpolation == 'quadratic':
# TODO mixing distribution of closest 4 points (weighted by 1/d**2)
raise AttributeError('Quadratic interpolation not implemented yet')
# Correct strike and dips for samples with dips > 90.
mask = samples[:, 1] > 90.
samples[mask, 0] = (samples[mask, 0] + 180)
samples[mask, 1] -= 2. * (samples[mask, 1] % 90.)
samples[samples[:, 1] < 0., 1] = 0.
samples[:, 0] %= 360.
samples[:, 1] %= 90.
samples[:, 2] %= 360.
return samples
__all__ = [
'load',
'get_probs',
'get_probs_at_nodes',
'sample_probs_from_distribution'
]