Source code for pycif.plugins.obsvects.standard.rinvprod
from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
[docs]
def rinvprod(
obsvect,
dy: NDArray[np.floating],
inverse: bool = True,
mask: NDArray[np.bool_] | None = None,
) -> NDArray[np.floating]:
r"""Apply the observation error covariance (or its inverse) to a vector.
Assumes a **diagonal** observation error covariance matrix
:math:`\mathbf{R} = \mathrm{diag}(\sigma_{\varepsilon,1}^2, \ldots,
\sigma_{\varepsilon,m}^2)`.
Two modes of operation:
* ``inverse=True`` (default) — computes :math:`\mathbf{R}^{-1}\,\delta\mathbf{y}`:
.. math::
(\mathbf{R}^{-1}\,\delta\mathbf{y})_i
= \frac{\delta y_i}{\sigma_{\varepsilon,i}^2}
Used in the cost function gradient:
:math:`\nabla J_o = \mathbf{H}^\top \mathbf{R}^{-1}
(\mathcal{H}(\mathbf{x}) - \mathbf{y}^o)`.
* ``inverse=False`` — computes a noise-perturbed observation sample
:math:`\mathbf{y}^o + \mathbf{R}^{1/2}\,\delta\mathbf{y}`:
.. math::
(\mathbf{R}^{1/2}\,\delta\mathbf{y} + \mathbf{y}^o)_i
= \sigma_{\varepsilon,i}\,\delta y_i + y^o_i
Used for Monte Carlo sampling of perturbed observations.
An optional *mask* restricts the operation to the active observation
subset (``obsvect.obsvect_mask``); masked-out positions are set to zero
in the output.
Args:
obsvect (Plugin): obsvect plugin instance (provides ``yobs_err``
and ``yobs``).
dy (np.ndarray): input vector, shape ``(dim,)``.
inverse (bool): if ``True`` apply :math:`\mathbf{R}^{-1}`; if
``False`` apply :math:`\mathbf{R}^{1/2}` and add ``yobs``.
mask (np.ndarray of bool, optional): boolean mask of shape ``(dim,)``
selecting a subset of observations. Positions where the mask is
``False`` are set to zero in the output.
Returns:
np.ndarray: result vector, shape ``(dim,)``.
Raises:
ValueError: if *mask* has a different shape than *dy*, or if any
entry of ``yobs_err`` is zero or NaN (which would make
:math:`\mathbf{R}^{-1}` undefined or infinite).
"""
# At the moment, this is valid for diagonal observation matrices only
# TODO: extend to full non-diagonal R eventually
dy_out = dy.copy()
yobs = obsvect.yobs
yobs_err = obsvect.yobs_err
# Raise exception if some errors are 0 or NaN
if np.any((yobs_err == 0) | np.isnan(yobs_err)):
raise ValueError(
"Your observation errors have some zero or NaN values. "
"If will make the departures infinite or NaNs in the cost "
"function. Please check your observation errors in your monitor files."
)
# Apply mask if any
if mask is not None:
if mask.shape != dy.shape:
raise ValueError(
f"Mask shape {mask.shape} does not match dy shape {dy.shape} in rinvprod"
)
dy_out = dy_out[mask]
yobs_err = yobs_err[mask]
yobs = yobs[mask]
if inverse:
dy_out = dy_out / yobs_err**2
else:
dy_out = dy_out * yobs_err + yobs
# Filling masked values with zeros
if mask is not None:
dy_tmp = np.zeros_like(dy)
dy_tmp[mask] = dy_out
dy_out = dy_tmp
return dy_out