Source code for pycif.plugins.obsoperators.standard.obsoper

import datetime
import os
from ....utils import path
from logging import info, debug
from .serial import obsoper_serial
from .parallel import obsoper_parallel


[docs] def obsoper( self, controlvect, obsvect, mode, run_id=0, datei=datetime.datetime(1979, 1, 1), datef=datetime.datetime(2100, 1, 1), workdir="./", reload_results=False, check_transforms=False, ignore_exceptions=False, force_fetch_results=False, **kwargs ): """Run the standard observation operator in forward, tangent-linear or adjoint mode. Orchestrates the full observation-operator pipeline: * Creates a per-run sub-directory ``obsoperator/<mode>_<run_id>/`` under *workdir*. * If ``reload_results`` is set, attempts to recover cached outputs from a previous run before computing from scratch. * Dispatches to :func:`~pycif.plugins.obsoperators.standard.serial.obsoper_serial` or :func:`~pycif.plugins.obsoperators.standard.parallel.obsoper_parallel` depending on whether ``self.parallel`` is configured. * Dumps the resulting observation or control vector to disk for later use. Args: self (ObsOperator): the obs-operator plugin instance. controlvect (ControlVect): control-vector object. Must carry ``x`` (and ``dx`` for ``'tl'`` mode); receives ``dx`` in ``'adj'`` mode. obsvect (ObsVect): observation-vector object. Receives ``ysim`` (and ``dy`` for ``'tl'`` mode); provides ``dy`` in ``'adj'`` mode. mode (str): execution mode — one of ``'fwd'``, ``'tl'``, or ``'adj'``. run_id (int | str, optional): identifier for the current run; used to name the sub-directory. Defaults to ``0``. datei (datetime.datetime, optional): start date of the simulation window. Defaults to ``datetime.datetime(1979, 1, 1)``. datef (datetime.datetime, optional): end date of the simulation window. Defaults to ``datetime.datetime(2100, 1, 1)``. workdir (str, optional): parent directory in which the run sub-directory is created. Defaults to ``"./"`` reload_results (bool, optional): if ``True``, attempt to recover pre-computed outputs from the run sub-directory before running the full pipeline. Defaults to ``False``. check_transforms (bool, optional): if ``True``, run each transform in both directions and verify the adjoint / TL identity; disables result reloading. Defaults to ``False``. ignore_exceptions (bool, optional): if ``True``, non-fatal transform errors are logged and swallowed rather than re-raised. Defaults to ``False``. force_fetch_results (bool, optional): if ``True`` and cached outputs cannot be found, raise :exc:`IOError` instead of computing. Defaults to ``False``. **kwargs: extra keyword arguments (ignored). Returns: ObsVect: in ``'fwd'`` and ``'tl'`` modes — the updated *obsvect* with ``ysim`` (and ``dy``) populated. ControlVect: in ``'adj'`` mode — the updated *controlvect* with ``dx`` populated. Raises: TypeError: if *run_id* is neither an :class:`int` nor a :class:`str`. IOError: if ``force_fetch_results`` is ``True`` and cached outputs cannot be loaded. """ # Check that inputs are consistent with the mode to run # check_inputs(inputs, mode) # Create of sub- working directory for the present run if type(run_id) == int: rundir = "{}/obsoperator/{}_{:04d}/".format(workdir, mode, run_id) elif type(run_id) == str: rundir = "{}/obsoperator/{}_{}/".format(workdir, mode, run_id) else: raise TypeError(f"{type(run_id)} is not accepted for run_id") path.init_dir(rundir) info("Running the observation operator in {} mode in sub-folder {}" .format(mode, rundir)) # Create save directory for chaining sub-simulations path.init_dir("{}/chain/".format(rundir)) # Return results from previous versions if existing if reload_results and not check_transforms: if mode in ["fwd", "tl"]: try: # Saving the directory for possible later use by the adjoint self.adj_refdir = rundir # Save the reference forward directory if mode == "fwd": self.ref_fwd_dir = rundir # Trying reading the monitor if any nothing_to_read = obsvect.read("{}/obsvect/".format(rundir)) if self.force_full_operator and nothing_to_read: raise IOError( "The observation vector is empty, but I will " "carry on computing the observation operator since" "`force_full_operator` = True" ) info( "Observation vector from previous simulation ({}) " "was successfully retrieved".format( "{}/obsvect/".format(rundir)) ) return obsvect except IOError: info( "There is no monitor file to be recovered. " "Compute the full forward simulation" ) elif mode == "adj": try: controlvect.load("{}/controlvect.pickle".format(rundir)) info( "Sensitivities to control vector from previous simulation " "({}/controlvect.pickle) was successfully retrieved".format(rundir) ) return controlvect except IOError: info( "There is no controlvect file to be recovered. " "Compute the full adjoint simulation." ) if force_fetch_results: raise IOError( "Could not load outputs while forcing output reading" ) # Update adj_refdir if not defined if mode == "adj" and not hasattr(self, "adj_refdir"): if os.path.isdir(self.ref_fwd_dir): self.adj_refdir = self.ref_fwd_dir elif not hasattr(self, "adj_refdir"): self.adj_refdir = rundir # Run serial or parallel observation operator if not hasattr(self, "parallel") or mode == "fwd": obsoper_serial(self, controlvect, obsvect, rundir, mode, workdir, check_transforms, ignore_exceptions) else: obsoper_parallel(self, controlvect, obsvect, rundir, mode, workdir, check_transforms, ignore_exceptions) # Dump observation vector for later use in fwd and tl modes # Otherwise dumps the control vector if mode in ["fwd", "tl"]: obsvect.dump("{}/obsvect/".format(rundir)) elif mode == "adj": controlvect.dump( "{}/controlvect.pickle".format(rundir), to_netcdf=controlvect.force_adj_netcdf, dir_netcdf="{}/controlvect/".format(rundir), ) # Returning the output object depending on the running mode if mode in ["fwd", "tl"]: return obsvect if mode == "adj": return controlvect