Source code for pycif.plugins.simulators.gausscost.simul

from logging import debug, info, warning
from pathlib import Path

import numpy as np

from .dump import dump_cost, load_cost, write_cost, write_gradcost
from .svd import svd_cost


[docs] def simul( self, chi, grad=True, run_id=-1, split_obs_vs_control=False, linear=False, **kwargs, ): r"""Evaluate the Bayesian Gaussian cost function and its gradient. Computes .. math:: J(\boldsymbol{\chi}) = \underbrace{\tfrac{1}{2}\boldsymbol{\chi}^\top\boldsymbol{\chi}}_{J_b} + \underbrace{\tfrac{1}{2} (\mathcal{H}(\mathbf{x}) - \mathbf{y})^\top \mathbf{R}^{-1} (\mathcal{H}(\mathbf{x}) - \mathbf{y})}_{J_o} and its gradient .. math:: \nabla J = \boldsymbol{\chi} + \mathbf{L}^\top \mathbf{H}^\top \mathbf{R}^{-1}(\mathcal{H}(\mathbf{x}) - \mathbf{y}) where :math:`\mathbf{x} = \mathbf{x}_b + \mathbf{L}\boldsymbol{\chi}` and :math:`\mathbf{L} = \mathbf{B}^{1/2}`. Both :math:`J` and :math:`\nabla J` are written to ``$workdir/simulator/`` for monitoring and restart purposes. Args: self (Plugin): gausscost simulator plugin instance. chi (np.ndarray): current iterate :math:`\boldsymbol{\chi}`, shape ``(n,)``. grad (bool): if ``True`` (default), also compute and return the gradient. run_id (int): unique call identifier used for sub-directory names and for reloading previously computed results. split_obs_vs_control (bool): if ``True``, return ``(j_b, j_o, zgrad_b, zgrad_o)`` instead of ``(zcost, zgrad)``. linear (bool): if ``True``, linearise around :math:`\mathbf{x}_b` by calling the TL obs-operator instead of the full forward. **kwargs: forwarded to the obs-operator. Returns: tuple or float: * ``(zcost, zgrad)`` — default (``grad=True``, ``split_obs_vs_control=False``). * ``zcost`` — when ``grad=False``. * ``(j_b, j_o, zgrad_b, zgrad_o)`` — when ``split_obs_vs_control=True``. Raises: ValueError: if the obs-operator is not attached, if departures contain NaNs (and ``replace_NaNs`` is ``False``), or if any cost/gradient value is non-finite. """ # Various variables datei = self.datei datef = self.datef reload_results = self.reload_from_previous # Get the observation operator from extra arguments if not hasattr(self, "obsoperator"): raise ValueError( "Observation operator is missing to compute the " "simulator. Please check your setup files" ) obsoper = self.obsoperator controlvect = self.controlvect obsvect = self.obsvect mask_obsvect = obsvect.obsvect_mask # Saving chi to the control vector for later controlvect.chi = chi # Try reloading previously computed cost function j_b, j_o, zgrad_b, zgrad_o = load_cost(self, run_id) # Control space contribution of the cost function if j_b is None: j_b = 0.5 * np.dot(chi, chi) # Observation space contribution if j_o is None: # Get observation departures departures = get_departures( self, chi, controlvect, run_id, linear=linear, reload_results=reload_results, **kwargs, ) # Check that departures are all non NaNs if np.any(np.isnan(departures[mask_obsvect])): # Force replacing NaNs by 0 if requested if self.replace_NaNs: warning( "There are some NaNs in the departures!\n" "I will erase them, but beware of the risks associated." ) departures = np.where(np.isnan(departures), 0.0, departures) else: raise ValueError( "There are some NaNs in the departures! " "Please check the outputs of the forward model" ) # Computes the observation term of the cost function # use only observations for which is_obsvect is True or NaNs masked_departures = departures.copy() masked_departures[~mask_obsvect] = 0.0 rinv_prod = obsvect.rinvprod(departures, mask=mask_obsvect) if not np.all(np.isfinite(masked_departures)): raise ValueError("Non-finite values in departures") if not np.all(np.isfinite(rinv_prod)): raise ValueError("Non-finite values in (R^-1 * departures)") j_o = 0.5 * np.dot(masked_departures, rinv_prod) # Computes SVD-based cost function if self.do_svd: j_o = svd_cost(self, obsvect.datastore, j_o) zcost = j_b + j_o info( f"In Simulator {run_id}:\n" f" Jb = {j_b}\n" f" Jo = {j_o}\n" f" Total = {zcost}" ) if not (np.isfinite(j_b) and np.isfinite(j_o) and np.isfinite(zcost)): raise ValueError("Non-finite values in cost function") # Saves cost function value write_cost(self, run_id, j_b, j_o, zcost) # Return only the cost function if grad = False if not grad: return zcost # Control part of the gradient if zgrad_b is None: zgrad_b = chi # Observation part of the gradien if zgrad_o is None: # Runs the adjoint to get the gradients if not self.do_svd: obsvect.dy = obsvect.rinvprod(departures, mask=mask_obsvect) obsvect.dy[~mask_obsvect] = 0 # If not linear, compute adjoint in x, otherwise in xb if linear: x_copy = controlvect.x.copy() controlvect.x = controlvect.xb.copy() controlvect = obsoper.obsoper( controlvect, obsvect, "adj", datei=datei, datef=datef, workdir=self.workdir, run_id=run_id, reload_results=reload_results, **kwargs, ) if linear: controlvect.x = x_copy # Observation part of the gradient zgrad_o = controlvect.sqrtbprod_ad(controlvect.dx, **kwargs) # Now compute the total gradient and norms zgrad = zgrad_b + zgrad_o # Verbose the norms znorm_grad_b = np.dot(zgrad_b, zgrad_b) ** 0.5 znorm_grad_o = np.dot(zgrad_o, zgrad_o) ** 0.5 znorm_grad = np.dot(zgrad, zgrad) ** 0.5 info( f"In Simulator {run_id}:\n" f" grad(Jb) = {znorm_grad_b}\n" f" grad(Jo) = {znorm_grad_o}\n" f" grad(J) = {znorm_grad}" ) if not ( np.isfinite(znorm_grad_b) and np.isfinite(znorm_grad_o) and np.isfinite(znorm_grad) ): raise ValueError("Non-finite values in cost function gradient") # Saves cost function gradient norm write_gradcost(self, run_id, znorm_grad_b, znorm_grad_o, znorm_grad) # Saves cost function and its gradient for later reloading dump_cost(self, run_id, j_b, j_o, zgrad_b, zgrad_o) # Return expected values if split_obs_vs_control: return j_b, j_o, zgrad_b, zgrad_o return zcost, zgrad
[docs] def get_departures( self, chi, controlvect, run_id, linear=False, reload_results=False, **kwargs, ): """Compute the observation-space departures :math:`\mathcal{H}(\mathbf{x}) - \mathbf{y}`. Optionally reloads previously computed forward results from disk. When ``linear`` is ``True``, calls the TL obs-operator and returns :math:`\mathbf{H}\,\delta\mathbf{x} + \mathbf{y}_\mathrm{sim,ref} - \mathbf{y}`. Args: self (Plugin): gausscost simulator plugin instance. chi (np.ndarray): current iterate, shape ``(n,)``. controlvect: control vector plugin instance. run_id (int): unique call identifier forwarded to the obs-operator. linear (bool): if ``True``, use the tangent-linear operator. reload_results (bool): if ``True``, attempt to reload a cached forward run before recomputing. **kwargs: forwarded to the obs-operator. Returns: np.ndarray: departure vector :math:`\mathcal{H}(\mathbf{x}) - \mathbf{y}`, shape ``(m,)``. """ obsoper_kw = dict( datei=self.datei, datef=self.datef, workdir=self.workdir, run_id=run_id, reload_results=reload_results, **kwargs, ) obsvect = None # Fetch observation vector if already computed if reload_results: try: obsvect = self.obsoperator.obsoper( controlvect, self.obsvect, "fwd", force_fetch_results=True, **obsoper_kw ) if not linear: departures = obsvect.ysim - obsvect.yobs else: departures = obsvect.ysim + obsvect.dy - obsvect.yobs except IOError: debug("Computing Hx from scratch.") # Computes forward simulations if obsvect is None: controlvect.x = controlvect.sqrtbprod(chi, **kwargs) if not linear: obsvect = self.obsoperator.obsoper( controlvect, self.obsvect, "fwd", **obsoper_kw ) departures = obsvect.ysim - obsvect.yobs else: controlvect.dx = controlvect.x - controlvect.xb controlvect.x = controlvect.xb.copy() obsvect = self.obsoperator.obsoper( controlvect, self.obsvect, "tl", **obsoper_kw ) departures = obsvect.ysim + obsvect.dy - obsvect.yobs # Reset control vector to original values controlvect.x = controlvect.xb + controlvect.dx return departures