Source code for pycif.plugins.simulators.gausscost.svd
import numpy as np
import pandas as pd
[docs]
def svd_init(datastore):
"""Pre-compute the SVD of daily-averaged reference observations per parameter.
For each unique parameter in *datastore*, resamples observations to daily
means, imputes missing values iteratively using the leading SVD components,
and stores the resulting ``U``, ``Vh``, and singular values ``s``.
Args:
datastore (pd.DataFrame): observation datastore with at least the
columns ``parameter``, ``station``, and ``obs``, indexed by
datetime.
Returns:
dict: mapping ``{parameter: {"U": U, "Vh": Vh, "s": s}}`` where
``U`` and ``Vh`` are the left/right singular vectors and ``s`` the
singular values from :func:`numpy.linalg.svd`.
"""
parameters = datastore["parameter"].unique()
SVD = {}
for param in parameters:
ds = datastore.loc[datastore["parameter"] == param]
stations = ds["station"].unique()
ds_obs_ref = pd.DataFrame(
{
s: ds.loc[ds["station"] == s, "obs"].resample("1D").mean()
for s in stations
}
)
# Filling NaNs assuming existing patterns can explain missing values
valid = ~np.isnan(ds_obs_ref)
mu_hat = np.nanmean(ds_obs_ref, axis=0, keepdims=True)
ds_obs = np.where(valid, ds_obs_ref, mu_hat)
for k in range(20):
U, s, Vh = np.linalg.svd(ds_obs, full_matrices=False)
ds_obs[~valid] = (U.dot(np.diag(s).dot(Vh)))[~valid]
SVD[param] = {"U": U, "Vh": Vh, "s": s}
return SVD
[docs]
def svd_cost(self, datastore, Jref):
"""Compute the SVD-based observation cost term and its adjoint increment.
Projects daily-averaged model-observation departures onto the pre-computed
SVD basis from :func:`svd_init`. Errors are set proportional to
:math:`s_k^{-1/4}` (from ``gausscost``) so that poorly constrained
patterns are down-weighted. Writes the adjoint sensitivity back to
``datastore["obs_incr"]`` for use by the adjoint obs-operator.
Args:
self (Plugin): gausscost simulator plugin instance; must have
``self.svd_vectors`` populated by :func:`svd_init` and
optionally ``self.crop_svd`` (int) to discard trailing SVD modes.
datastore (pd.DataFrame): observation datastore; the ``obs_incr``
column is updated in-place with the adjoint increment.
Jref (float): reference observation cost (unused; kept for interface
consistency with the non-SVD code path).
Returns:
float: SVD-based observation cost :math:`J_o`.
"""
j_o = 0
parameters = datastore["parameter"].unique()
for param in parameters:
mask_param = datastore["parameter"] == param
# Projecting at daily scale
ds = datastore.loc[mask_param]
dds = ds.loc[:, "sim"] - ds.loc[:, "obs"]
stations = ds["station"].unique()
resamples = {
s: dds.loc[ds["station"] == s].resample("1D") for s in stations
}
ds_delta = pd.DataFrame({s: resamples[s].sum() for s in stations})
ds_counts = pd.DataFrame({s: resamples[s].size() for s in stations})
ds_delta /= ds_counts
mu_hat = np.nanmean(ds_delta, axis=0, keepdims=True)
valid = ~np.isnan(ds_delta)
ds_delta = np.where(valid, ds_delta, mu_hat)
# Fetching SVD vectors
svd_vect = self.svd_vectors[param]
U = svd_vect["U"]
s_obs = svd_vect["s"]
# SVD and cost function
Vh_sim = np.dot(U.T, ds_delta)
# Errors are chosen as inversely proportional to the singular values
# It is possible to relax constraints on singular vectors from a
# given index with the argument 'crop_svd' in the Yaml
errors = 1.0 / s_obs[:, np.newaxis] ** 0.25
if hasattr(self, "crop_svd"):
errors[self.crop_svd:] = np.inf
j_o += np.nansum(Vh_sim ** 2 / errors ** 2)
# Now get the adjoint for the gradient
departures = U.dot(Vh_sim / errors ** 2)
ds.loc[:, "obs_incr"] = np.nan
for k, stat in enumerate(stations):
col_stat = ds_counts.columns.get_loc(stat)
ds_depart = pd.Series(
departures[:, col_stat], index=ds_counts.index
)
mask_stat = ds["station"] == stat
index = ds["date"][mask_stat].floor("D")
# Propagating mu_hat to adjoint
nvalid = valid.loc[:, stat].sum()
ds_depart.loc[valid.loc[:, stat]] += (
ds_depart.loc[~valid.loc[:, stat]].sum() / nvalid
)
# De-aggregating daily scale
ds.loc[mask_stat, "obs_incr"] = (
ds_depart.loc[index].values / ds_counts.loc[index, stat].values
)
datastore.loc[mask_param, "obs_incr"] = ds.loc[:, "obs_incr"]
return j_o