import datetime
import os
from ....utils import path
from logging import info, debug
from .serial import obsoper_serial
from .parallel import obsoper_parallel
[docs]
def obsoper(
self,
controlvect,
obsvect,
mode,
run_id=0,
datei=datetime.datetime(1979, 1, 1),
datef=datetime.datetime(2100, 1, 1),
workdir="./",
reload_results=False,
check_transforms=False,
ignore_exceptions=False,
force_fetch_results=False,
**kwargs
):
"""Run the standard observation operator in forward, tangent-linear or adjoint mode.
Orchestrates the full observation-operator pipeline:
* Creates a per-run sub-directory ``obsoperator/<mode>_<run_id>/`` under
*workdir*.
* If ``reload_results`` is set, attempts to recover cached outputs from a
previous run before computing from scratch.
* Dispatches to
:func:`~pycif.plugins.obsoperators.standard.serial.obsoper_serial` or
:func:`~pycif.plugins.obsoperators.standard.parallel.obsoper_parallel`
depending on whether ``self.parallel`` is configured.
* Dumps the resulting observation or control vector to disk for later use.
Args:
self (ObsOperator): the obs-operator plugin instance.
controlvect (ControlVect): control-vector object. Must carry ``x``
(and ``dx`` for ``'tl'`` mode); receives ``dx`` in ``'adj'`` mode.
obsvect (ObsVect): observation-vector object. Receives ``ysim`` (and
``dy`` for ``'tl'`` mode); provides ``dy`` in ``'adj'`` mode.
mode (str): execution mode — one of ``'fwd'``, ``'tl'``, or
``'adj'``.
run_id (int | str, optional): identifier for the current run; used to
name the sub-directory. Defaults to ``0``.
datei (datetime.datetime, optional): start date of the simulation
window. Defaults to ``datetime.datetime(1979, 1, 1)``.
datef (datetime.datetime, optional): end date of the simulation
window. Defaults to ``datetime.datetime(2100, 1, 1)``.
workdir (str, optional): parent directory in which the run
sub-directory is created. Defaults to ``"./"``
reload_results (bool, optional): if ``True``, attempt to recover
pre-computed outputs from the run sub-directory before running
the full pipeline. Defaults to ``False``.
check_transforms (bool, optional): if ``True``, run each transform in
both directions and verify the adjoint / TL identity; disables
result reloading. Defaults to ``False``.
ignore_exceptions (bool, optional): if ``True``, non-fatal transform
errors are logged and swallowed rather than re-raised.
Defaults to ``False``.
force_fetch_results (bool, optional): if ``True`` and cached outputs
cannot be found, raise :exc:`IOError` instead of computing.
Defaults to ``False``.
**kwargs: extra keyword arguments (ignored).
Returns:
ObsVect: in ``'fwd'`` and ``'tl'`` modes — the updated *obsvect*
with ``ysim`` (and ``dy``) populated.
ControlVect: in ``'adj'`` mode — the updated *controlvect* with
``dx`` populated.
Raises:
TypeError: if *run_id* is neither an :class:`int` nor a :class:`str`.
IOError: if ``force_fetch_results`` is ``True`` and cached outputs
cannot be loaded.
"""
# Check that inputs are consistent with the mode to run
# check_inputs(inputs, mode)
# Create of sub- working directory for the present run
if type(run_id) == int:
rundir = "{}/obsoperator/{}_{:04d}/".format(workdir, mode, run_id)
elif type(run_id) == str:
rundir = "{}/obsoperator/{}_{}/".format(workdir, mode, run_id)
else:
raise TypeError(f"{type(run_id)} is not accepted for run_id")
path.init_dir(rundir)
info("Running the observation operator in {} mode in sub-folder {}"
.format(mode, rundir))
# Create save directory for chaining sub-simulations
path.init_dir("{}/chain/".format(rundir))
# Return results from previous versions if existing
if reload_results and not check_transforms:
if mode in ["fwd", "tl"]:
try:
# Saving the directory for possible later use by the adjoint
self.adj_refdir = rundir
# Save the reference forward directory
if mode == "fwd":
self.ref_fwd_dir = rundir
# Trying reading the monitor if any
nothing_to_read = obsvect.read("{}/obsvect/".format(rundir))
if self.force_full_operator and nothing_to_read:
raise IOError(
"The observation vector is empty, but I will "
"carry on computing the observation operator since"
"`force_full_operator` = True"
)
info(
"Observation vector from previous simulation ({}) "
"was successfully retrieved".format(
"{}/obsvect/".format(rundir))
)
return obsvect
except IOError:
info(
"There is no monitor file to be recovered. "
"Compute the full forward simulation"
)
elif mode == "adj":
try:
controlvect.load("{}/controlvect.pickle".format(rundir))
info(
"Sensitivities to control vector from previous simulation "
"({}/controlvect.pickle) was successfully retrieved".format(rundir)
)
return controlvect
except IOError:
info(
"There is no controlvect file to be recovered. "
"Compute the full adjoint simulation."
)
if force_fetch_results:
raise IOError(
"Could not load outputs while forcing output reading"
)
# Update adj_refdir if not defined
if mode == "adj" and not hasattr(self, "adj_refdir"):
if os.path.isdir(self.ref_fwd_dir):
self.adj_refdir = self.ref_fwd_dir
elif not hasattr(self, "adj_refdir"):
self.adj_refdir = rundir
# Run serial or parallel observation operator
if not hasattr(self, "parallel") or mode == "fwd":
obsoper_serial(self, controlvect, obsvect, rundir, mode, workdir,
check_transforms, ignore_exceptions)
else:
obsoper_parallel(self, controlvect, obsvect, rundir, mode, workdir,
check_transforms, ignore_exceptions)
# Dump observation vector for later use in fwd and tl modes
# Otherwise dumps the control vector
if mode in ["fwd", "tl"]:
obsvect.dump("{}/obsvect/".format(rundir))
elif mode == "adj":
controlvect.dump(
"{}/controlvect.pickle".format(rundir),
to_netcdf=controlvect.force_adj_netcdf,
dir_netcdf="{}/controlvect/".format(rundir),
)
# Returning the output object depending on the running mode
if mode in ["fwd", "tl"]:
return obsvect
if mode == "adj":
return controlvect