Source code for pycif.plugins.obsoperators.standard.serial

from logging import info
import os
from ....utils.check.errclass import PluginError
from .flushrun import flushrun
from .transforms.utils.dask.init_dask import init_dask


[docs] def obsoper_serial(self, controlvect, obsvect, rundir, mode, workdir, check_transforms, ignore_exceptions): """Run the observation operator sequentially over all transforms and time steps. Handles bookkeeping common to every serial execution: * ``'fwd'`` / ``'tl'`` — zeros ``obsvect.ysim`` and ``obsvect.dy``, then dumps the control vector to ``rundir/controlvect.pickle``. * ``'adj'`` — initialises ``controlvect.dx = 0`` and enables forward-run chaining for multi-step models. Dispatches to the Dask execution path (:func:`~.transforms.utils.dask.init_dask.init_dask`) when ``self.use_dask`` is set, otherwise runs the standard transform loop via :func:`~.transforms.do_transforms.do_transforms`. After the run, calls :func:`~.flushrun.flushrun` to clean up intermediate files when ``self.autoflush`` is set (and the operator is not running in parallel mode). Stores *rundir* as ``self.ref_fwd_dir`` after a forward run so that the subsequent adjoint can locate the forward outputs. Args: self (ObsOperator): the obs-operator plugin instance. controlvect (ControlVect): control-vector object. obsvect (ObsVect): observation-vector object. rundir (str): the run sub-directory for this operator call. mode (str): execution mode — one of ``'fwd'``, ``'tl'``, or ``'adj'``. workdir (str): parent working directory; forwarded to :func:`~.flushrun.flushrun`. check_transforms (bool): if ``True``, validate each transform's adjoint / TL identity. ignore_exceptions (bool): if ``True``, non-fatal transform errors are swallowed rather than re-raised. """ # Initializing modules and variables from the setup model = self.model subsimu_dates = getattr(model, "subsimu_dates", [])[::-1] if mode in ["fwd", "tl"]: obsvect.ysim[:] = 0.0 obsvect.dy[:] = 0.0 # Dumps control vector in forward and tl modes controlvect_file = f"{rundir}/controlvect.pickle" if not (self.autorestart and os.path.isfile(controlvect_file)): controlvect.dump( f"{rundir}/controlvect.pickle", to_netcdf=controlvect.save_out_netcdf, dir_netcdf="{}/controlvect/".format(rundir), ) elif mode == "adj": if not hasattr(self.model, "adj_refdir"): self.model.adj_refdir = self.adj_refdir controlvect.dx = 0 * controlvect.x if len(subsimu_dates) > 2: model.fwd_chain = True # If DASK to proceed with transforms if any if self.use_dask: pipe_links = self.pipe_links_adj_dask if mode == "adj" else self.pipe_links_fwd_dask init_dask( self, pipe_links, mode=mode, onlyinit=self.onlyinit, save_debug=self.save_debug, ref_fwd_dir=self.ref_fwd_dir, ignore_exceptions=ignore_exceptions, check_transforms=check_transforms, ) return # Reset transformation datastore transform_pipe = self.transform_pipe if hasattr(transform_pipe, "datastore"): del transform_pipe.datastore # Loop through transformations mapper = transform_pipe.mapper self.do_transforms( self, transform_pipe, mapper, controlvect, obsvect, mode, rundir, workdir, onlyinit=self.onlyinit, check_transforms=check_transforms, save_debug=self.save_debug, ignore_exceptions=ignore_exceptions, ref_fwd_dir=self.ref_fwd_dir ) # Cleaning unnecessary files if self.autoflush and not hasattr(self, "parallel"): flushrun(self, workdir, rundir, mode, transform_pipe) if getattr(self, "force-full-flush"): flushrun(self, workdir, rundir, "adj", transform_pipe, full_flush=False) # Keep in memory the path to the forward if mode == "fwd": self.ref_fwd_dir = rundir