Source code for pycif.plugins.simulators.gausscost_flexpart.svd
from past.utils import old_div
import numpy as np
import pandas as pd
[docs]
def svd_init(datastore):
"""Pre-compute the SVD of daily-averaged reference observations per parameter.
Equivalent to :func:`gausscost.svd.svd_init` but uses the ``past``
compatibility shim (``old_div``) for Python-2/3 portability.
Args:
datastore (pd.DataFrame): observation datastore with columns
``parameter``, ``station``, and ``obs``, indexed by datetime.
Returns:
dict: mapping ``{parameter: {"U": U, "Vh": Vh, "s": s}}``.
"""
parameters = datastore['parameter'].unique()
SVD = {}
for param in parameters:
ds = datastore.loc[datastore['parameter'] == param]
stations = ds['station'].unique()
ds_obs_ref = pd.DataFrame(
{s: ds.loc[ds['station'] == s, 'obs']
.resample('1D').mean()
for s in stations})
# Filling NaNs assuming existing patterns can explain missing values
valid = ~np.isnan(ds_obs_ref)
mu_hat = np.nanmean(ds_obs_ref, axis=0, keepdims=True)
ds_obs = np.where(valid, ds_obs_ref, mu_hat)
for _ in range(20):
U, s, Vh = np.linalg.svd(ds_obs, full_matrices=False)
ds_obs[~valid] = \
(U.dot(np.diag(s).dot(Vh)))[~valid]
SVD[param] = {'U': U, 'Vh': Vh, 's': s}
return SVD
[docs]
def svd_cost(self, datastore, Jref):
"""Compute the SVD-based observation cost term (FLEXPART variant).
Equivalent to :func:`gausscost.svd.svd_cost` but uses
:math:`s_k^{-1/2}` instead of :math:`s_k^{-1/4}` for the error
weighting.
Args:
self (Plugin): FLEXPART simulator plugin instance with
``svd_vectors`` and optionally ``crop_svd``.
datastore (pd.DataFrame): observation datastore; ``obs_incr``
column is updated in-place with the adjoint increment.
Jref (float): reference observation cost (unused).
Returns:
float: SVD-based observation cost :math:`J_o`.
"""
j_o = 0
parameters = datastore['parameter'].unique()
for param in parameters:
mask_param = (datastore['parameter'] == param)
# Projecting at daily scale
ds = datastore.loc[mask_param]
dds = ds.loc[:, 'sim'] - ds.loc[:, 'obs']
stations = ds['station'].unique()
resamples = {s: dds.loc[ds['station'] == s].resample('1D') for s in stations}
ds_delta = pd.DataFrame({s: resamples[s].sum() for s in stations})
ds_counts = pd.DataFrame({s: resamples[s].size() for s in stations})
ds_delta /= ds_counts
mu_hat = np.nanmean(ds_delta, axis=0, keepdims=True)
valid = ~np.isnan(ds_delta)
ds_delta = np.where(valid, ds_delta, mu_hat)
# Fetching SVD vectors
svd_vect = self.svd_vectors[param]
U = svd_vect['U']
s_obs = svd_vect['s']
# SVD and cost function
Vh_sim = np.dot(U.T, ds_delta)
# Errors are chosen as inversely proportional to the singular values
# It is possible to relax constraints on singular vectors from a
# given index with the argument 'crop_svd' in the Yaml
errors = 1. / np.sqrt(s_obs[:, np.newaxis])
if hasattr(self, 'crop_svd'):
errors[self.crop_svd:] = np.inf
j_o += np.nansum(old_div(Vh_sim ** 2, errors ** 2))
# Now get the adjoint for the gradient
departures = U.dot(old_div(Vh_sim, errors ** 2))
ds.loc[:, 'obs_incr'] = np.nan
for _, stat in enumerate(stations):
col_stat = ds_counts.columns.get_loc(stat)
ds_depart = pd.Series(departures[:, col_stat], index=ds_counts.index)
mask_stat = ds['station'] == stat
index = ds["date"][mask_stat].floor('D')
# Propagating mu_hat to adjoint
nvalid = valid.loc[:, stat].sum()
ds_depart.loc[valid.loc[:, stat]] += \
old_div(ds_depart.loc[~valid.loc[:, stat]].sum(), nvalid)
# De-aggregating daily scale
ds.loc[mask_stat, 'obs_incr'] = \
old_div(ds_depart.loc[index].values, ds_counts.loc[index, stat].values)
datastore.loc[mask_param, 'obs_incr'] = ds.loc[:, 'obs_incr']
return j_o