from logging import info
import os
from ....utils.check.errclass import PluginError
from .flushrun import flushrun
from .transforms.utils.dask.init_dask import init_dask
[docs]
def obsoper_serial(self, controlvect, obsvect, rundir, mode, workdir,
check_transforms, ignore_exceptions):
"""Run the observation operator sequentially over all transforms and time steps.
Handles bookkeeping common to every serial execution:
* ``'fwd'`` / ``'tl'`` — zeros ``obsvect.ysim`` and ``obsvect.dy``, then
dumps the control vector to ``rundir/controlvect.pickle``.
* ``'adj'`` — initialises ``controlvect.dx = 0`` and enables forward-run
chaining for multi-step models.
Dispatches to the Dask execution path
(:func:`~.transforms.utils.dask.init_dask.init_dask`) when
``self.use_dask`` is set, otherwise runs the standard transform loop
via :func:`~.transforms.do_transforms.do_transforms`.
After the run, calls :func:`~.flushrun.flushrun` to clean up intermediate
files when ``self.autoflush`` is set (and the operator is not running in
parallel mode).
Stores *rundir* as ``self.ref_fwd_dir`` after a forward run so that the
subsequent adjoint can locate the forward outputs.
Args:
self (ObsOperator): the obs-operator plugin instance.
controlvect (ControlVect): control-vector object.
obsvect (ObsVect): observation-vector object.
rundir (str): the run sub-directory for this operator call.
mode (str): execution mode — one of ``'fwd'``, ``'tl'``, or
``'adj'``.
workdir (str): parent working directory; forwarded to
:func:`~.flushrun.flushrun`.
check_transforms (bool): if ``True``, validate each transform's
adjoint / TL identity.
ignore_exceptions (bool): if ``True``, non-fatal transform errors are
swallowed rather than re-raised.
"""
# Initializing modules and variables from the setup
model = self.model
subsimu_dates = getattr(model, "subsimu_dates", [])[::-1]
if mode in ["fwd", "tl"]:
obsvect.ysim[:] = 0.0
obsvect.dy[:] = 0.0
# Dumps control vector in forward and tl modes
controlvect_file = f"{rundir}/controlvect.pickle"
if not (self.autorestart and os.path.isfile(controlvect_file)):
controlvect.dump(
f"{rundir}/controlvect.pickle",
to_netcdf=controlvect.save_out_netcdf,
dir_netcdf="{}/controlvect/".format(rundir),
)
elif mode == "adj":
if not hasattr(self.model, "adj_refdir"):
self.model.adj_refdir = self.adj_refdir
controlvect.dx = 0 * controlvect.x
if len(subsimu_dates) > 2:
model.fwd_chain = True
# If DASK to proceed with transforms if any
if self.use_dask:
pipe_links = self.pipe_links_adj_dask if mode == "adj" else self.pipe_links_fwd_dask
init_dask(
self, pipe_links,
mode=mode,
onlyinit=self.onlyinit,
save_debug=self.save_debug,
ref_fwd_dir=self.ref_fwd_dir,
ignore_exceptions=ignore_exceptions,
check_transforms=check_transforms,
)
return
# Reset transformation datastore
transform_pipe = self.transform_pipe
if hasattr(transform_pipe, "datastore"):
del transform_pipe.datastore
# Loop through transformations
mapper = transform_pipe.mapper
self.do_transforms(
self,
transform_pipe,
mapper,
controlvect,
obsvect,
mode,
rundir,
workdir,
onlyinit=self.onlyinit,
check_transforms=check_transforms,
save_debug=self.save_debug,
ignore_exceptions=ignore_exceptions,
ref_fwd_dir=self.ref_fwd_dir
)
# Cleaning unnecessary files
if self.autoflush and not hasattr(self, "parallel"):
flushrun(self, workdir, rundir, mode, transform_pipe)
if getattr(self, "force-full-flush"):
flushrun(self, workdir, rundir, "adj", transform_pipe, full_flush=False)
# Keep in memory the path to the forward
if mode == "fwd":
self.ref_fwd_dir = rundir