Source code for pycif.plugins.simulators.gausscost_flexpart.svd


from past.utils import old_div
import numpy as np
import pandas as pd


[docs] def svd_init(datastore): """Pre-compute the SVD of daily-averaged reference observations per parameter. Equivalent to :func:`gausscost.svd.svd_init` but uses the ``past`` compatibility shim (``old_div``) for Python-2/3 portability. Args: datastore (pd.DataFrame): observation datastore with columns ``parameter``, ``station``, and ``obs``, indexed by datetime. Returns: dict: mapping ``{parameter: {"U": U, "Vh": Vh, "s": s}}``. """ parameters = datastore['parameter'].unique() SVD = {} for param in parameters: ds = datastore.loc[datastore['parameter'] == param] stations = ds['station'].unique() ds_obs_ref = pd.DataFrame( {s: ds.loc[ds['station'] == s, 'obs'] .resample('1D').mean() for s in stations}) # Filling NaNs assuming existing patterns can explain missing values valid = ~np.isnan(ds_obs_ref) mu_hat = np.nanmean(ds_obs_ref, axis=0, keepdims=True) ds_obs = np.where(valid, ds_obs_ref, mu_hat) for _ in range(20): U, s, Vh = np.linalg.svd(ds_obs, full_matrices=False) ds_obs[~valid] = \ (U.dot(np.diag(s).dot(Vh)))[~valid] SVD[param] = {'U': U, 'Vh': Vh, 's': s} return SVD
[docs] def svd_cost(self, datastore, Jref): """Compute the SVD-based observation cost term (FLEXPART variant). Equivalent to :func:`gausscost.svd.svd_cost` but uses :math:`s_k^{-1/2}` instead of :math:`s_k^{-1/4}` for the error weighting. Args: self (Plugin): FLEXPART simulator plugin instance with ``svd_vectors`` and optionally ``crop_svd``. datastore (pd.DataFrame): observation datastore; ``obs_incr`` column is updated in-place with the adjoint increment. Jref (float): reference observation cost (unused). Returns: float: SVD-based observation cost :math:`J_o`. """ j_o = 0 parameters = datastore['parameter'].unique() for param in parameters: mask_param = (datastore['parameter'] == param) # Projecting at daily scale ds = datastore.loc[mask_param] dds = ds.loc[:, 'sim'] - ds.loc[:, 'obs'] stations = ds['station'].unique() resamples = {s: dds.loc[ds['station'] == s].resample('1D') for s in stations} ds_delta = pd.DataFrame({s: resamples[s].sum() for s in stations}) ds_counts = pd.DataFrame({s: resamples[s].size() for s in stations}) ds_delta /= ds_counts mu_hat = np.nanmean(ds_delta, axis=0, keepdims=True) valid = ~np.isnan(ds_delta) ds_delta = np.where(valid, ds_delta, mu_hat) # Fetching SVD vectors svd_vect = self.svd_vectors[param] U = svd_vect['U'] s_obs = svd_vect['s'] # SVD and cost function Vh_sim = np.dot(U.T, ds_delta) # Errors are chosen as inversely proportional to the singular values # It is possible to relax constraints on singular vectors from a # given index with the argument 'crop_svd' in the Yaml errors = 1. / np.sqrt(s_obs[:, np.newaxis]) if hasattr(self, 'crop_svd'): errors[self.crop_svd:] = np.inf j_o += np.nansum(old_div(Vh_sim ** 2, errors ** 2)) # Now get the adjoint for the gradient departures = U.dot(old_div(Vh_sim, errors ** 2)) ds.loc[:, 'obs_incr'] = np.nan for _, stat in enumerate(stations): col_stat = ds_counts.columns.get_loc(stat) ds_depart = pd.Series(departures[:, col_stat], index=ds_counts.index) mask_stat = ds['station'] == stat index = ds["date"][mask_stat].floor('D') # Propagating mu_hat to adjoint nvalid = valid.loc[:, stat].sum() ds_depart.loc[valid.loc[:, stat]] += \ old_div(ds_depart.loc[~valid.loc[:, stat]].sum(), nvalid) # De-aggregating daily scale ds.loc[mask_stat, 'obs_incr'] = \ old_div(ds_depart.loc[index].values, ds_counts.loc[index, stat].values) datastore.loc[mask_param, 'obs_incr'] = ds.loc[:, 'obs_incr'] return j_o