from logging import debug, info, warning
from pathlib import Path
import numpy as np
from .dump import dump_cost, load_cost, write_cost, write_gradcost
from .svd import svd_cost
[docs]
def simul(
self,
chi,
grad=True,
run_id=-1,
split_obs_vs_control=False,
linear=False,
**kwargs,
):
r"""Evaluate the Bayesian Gaussian cost function and its gradient.
Computes
.. math::
J(\boldsymbol{\chi})
= \underbrace{\tfrac{1}{2}\boldsymbol{\chi}^\top\boldsymbol{\chi}}_{J_b}
+ \underbrace{\tfrac{1}{2}
(\mathcal{H}(\mathbf{x}) - \mathbf{y})^\top
\mathbf{R}^{-1}
(\mathcal{H}(\mathbf{x}) - \mathbf{y})}_{J_o}
and its gradient
.. math::
\nabla J = \boldsymbol{\chi}
+ \mathbf{L}^\top \mathbf{H}^\top
\mathbf{R}^{-1}(\mathcal{H}(\mathbf{x}) - \mathbf{y})
where :math:`\mathbf{x} = \mathbf{x}_b + \mathbf{L}\boldsymbol{\chi}`
and :math:`\mathbf{L} = \mathbf{B}^{1/2}`.
Both :math:`J` and :math:`\nabla J` are written to
``$workdir/simulator/`` for monitoring and restart purposes.
Args:
self (Plugin): gausscost simulator plugin instance.
chi (np.ndarray): current iterate :math:`\boldsymbol{\chi}`,
shape ``(n,)``.
grad (bool): if ``True`` (default), also compute and return the
gradient.
run_id (int): unique call identifier used for sub-directory names
and for reloading previously computed results.
split_obs_vs_control (bool): if ``True``, return
``(j_b, j_o, zgrad_b, zgrad_o)`` instead of ``(zcost, zgrad)``.
linear (bool): if ``True``, linearise around :math:`\mathbf{x}_b`
by calling the TL obs-operator instead of the full forward.
**kwargs: forwarded to the obs-operator.
Returns:
tuple or float:
* ``(zcost, zgrad)`` — default (``grad=True``, ``split_obs_vs_control=False``).
* ``zcost`` — when ``grad=False``.
* ``(j_b, j_o, zgrad_b, zgrad_o)`` — when ``split_obs_vs_control=True``.
Raises:
ValueError: if the obs-operator is not attached, if departures
contain NaNs (and ``replace_NaNs`` is ``False``), or if any
cost/gradient value is non-finite.
"""
# Various variables
datei = self.datei
datef = self.datef
reload_results = self.reload_from_previous
# Get the observation operator from extra arguments
if not hasattr(self, "obsoperator"):
raise ValueError(
"Observation operator is missing to compute the "
"simulator. Please check your setup files"
)
obsoper = self.obsoperator
controlvect = self.controlvect
obsvect = self.obsvect
mask_obsvect = obsvect.obsvect_mask
# Saving chi to the control vector for later
controlvect.chi = chi
# Try reloading previously computed cost function
j_b, j_o, zgrad_b, zgrad_o = load_cost(self, run_id)
# Control space contribution of the cost function
if j_b is None:
j_b = 0.5 * np.dot(chi, chi)
# Observation space contribution
if j_o is None:
# Get observation departures
departures = get_departures(
self,
chi,
controlvect,
run_id,
linear=linear,
reload_results=reload_results,
**kwargs,
)
# Check that departures are all non NaNs
if np.any(np.isnan(departures[mask_obsvect])):
# Force replacing NaNs by 0 if requested
if self.replace_NaNs:
warning(
"There are some NaNs in the departures!\n"
"I will erase them, but beware of the risks associated."
)
departures = np.where(np.isnan(departures), 0.0, departures)
else:
raise ValueError(
"There are some NaNs in the departures! "
"Please check the outputs of the forward model"
)
# Computes the observation term of the cost function
# use only observations for which is_obsvect is True or NaNs
masked_departures = departures.copy()
masked_departures[~mask_obsvect] = 0.0
rinv_prod = obsvect.rinvprod(departures, mask=mask_obsvect)
if not np.all(np.isfinite(masked_departures)):
raise ValueError("Non-finite values in departures")
if not np.all(np.isfinite(rinv_prod)):
raise ValueError("Non-finite values in (R^-1 * departures)")
j_o = 0.5 * np.dot(masked_departures, rinv_prod)
# Computes SVD-based cost function
if self.do_svd:
j_o = svd_cost(self, obsvect.datastore, j_o)
zcost = j_b + j_o
info(
f"In Simulator {run_id}:\n"
f" Jb = {j_b}\n"
f" Jo = {j_o}\n"
f" Total = {zcost}"
)
if not (np.isfinite(j_b) and np.isfinite(j_o) and np.isfinite(zcost)):
raise ValueError("Non-finite values in cost function")
# Saves cost function value
write_cost(self, run_id, j_b, j_o, zcost)
# Return only the cost function if grad = False
if not grad:
return zcost
# Control part of the gradient
if zgrad_b is None:
zgrad_b = chi
# Observation part of the gradien
if zgrad_o is None:
# Runs the adjoint to get the gradients
if not self.do_svd:
obsvect.dy = obsvect.rinvprod(departures, mask=mask_obsvect)
obsvect.dy[~mask_obsvect] = 0
# If not linear, compute adjoint in x, otherwise in xb
if linear:
x_copy = controlvect.x.copy()
controlvect.x = controlvect.xb.copy()
controlvect = obsoper.obsoper(
controlvect,
obsvect,
"adj",
datei=datei,
datef=datef,
workdir=self.workdir,
run_id=run_id,
reload_results=reload_results,
**kwargs,
)
if linear:
controlvect.x = x_copy
# Observation part of the gradient
zgrad_o = controlvect.sqrtbprod_ad(controlvect.dx, **kwargs)
# Now compute the total gradient and norms
zgrad = zgrad_b + zgrad_o
# Verbose the norms
znorm_grad_b = np.dot(zgrad_b, zgrad_b) ** 0.5
znorm_grad_o = np.dot(zgrad_o, zgrad_o) ** 0.5
znorm_grad = np.dot(zgrad, zgrad) ** 0.5
info(
f"In Simulator {run_id}:\n"
f" grad(Jb) = {znorm_grad_b}\n"
f" grad(Jo) = {znorm_grad_o}\n"
f" grad(J) = {znorm_grad}"
)
if not (
np.isfinite(znorm_grad_b)
and np.isfinite(znorm_grad_o)
and np.isfinite(znorm_grad)
):
raise ValueError("Non-finite values in cost function gradient")
# Saves cost function gradient norm
write_gradcost(self, run_id, znorm_grad_b, znorm_grad_o, znorm_grad)
# Saves cost function and its gradient for later reloading
dump_cost(self, run_id, j_b, j_o, zgrad_b, zgrad_o)
# Return expected values
if split_obs_vs_control:
return j_b, j_o, zgrad_b, zgrad_o
return zcost, zgrad
[docs]
def get_departures(
self,
chi,
controlvect,
run_id,
linear=False,
reload_results=False,
**kwargs,
):
"""Compute the observation-space departures :math:`\mathcal{H}(\mathbf{x}) - \mathbf{y}`.
Optionally reloads previously computed forward results from disk. When
``linear`` is ``True``, calls the TL obs-operator and returns
:math:`\mathbf{H}\,\delta\mathbf{x} + \mathbf{y}_\mathrm{sim,ref} - \mathbf{y}`.
Args:
self (Plugin): gausscost simulator plugin instance.
chi (np.ndarray): current iterate, shape ``(n,)``.
controlvect: control vector plugin instance.
run_id (int): unique call identifier forwarded to the obs-operator.
linear (bool): if ``True``, use the tangent-linear operator.
reload_results (bool): if ``True``, attempt to reload a cached
forward run before recomputing.
**kwargs: forwarded to the obs-operator.
Returns:
np.ndarray: departure vector
:math:`\mathcal{H}(\mathbf{x}) - \mathbf{y}`, shape ``(m,)``.
"""
obsoper_kw = dict(
datei=self.datei,
datef=self.datef,
workdir=self.workdir,
run_id=run_id,
reload_results=reload_results,
**kwargs,
)
obsvect = None
# Fetch observation vector if already computed
if reload_results:
try:
obsvect = self.obsoperator.obsoper(
controlvect, self.obsvect, "fwd", force_fetch_results=True, **obsoper_kw
)
if not linear:
departures = obsvect.ysim - obsvect.yobs
else:
departures = obsvect.ysim + obsvect.dy - obsvect.yobs
except IOError:
debug("Computing Hx from scratch.")
# Computes forward simulations
if obsvect is None:
controlvect.x = controlvect.sqrtbprod(chi, **kwargs)
if not linear:
obsvect = self.obsoperator.obsoper(
controlvect, self.obsvect, "fwd", **obsoper_kw
)
departures = obsvect.ysim - obsvect.yobs
else:
controlvect.dx = controlvect.x - controlvect.xb
controlvect.x = controlvect.xb.copy()
obsvect = self.obsoperator.obsoper(
controlvect, self.obsvect, "tl", **obsoper_kw
)
departures = obsvect.ysim + obsvect.dy - obsvect.yobs
# Reset control vector to original values
controlvect.x = controlvect.xb + controlvect.dx
return departures