Source code for pycif.plugins.obsoperators.standard.transforms.period_pipe

from logging import info

from .utils.default_subsimus import default_subsimus
from .utils.fwd_pipe import fwd_adj_pipe


[docs] def period_pipe(self, all_transforms, mapper): """Arrange all transforms into ordered forward and adjoint execution pipes. Determines the chronologically correct execution order for every ``(transform, sub-simulation date)`` pair by: 1. Propagating sub-simulation periods from each transform to its precursors and successors via :func:`~.utils.default_subsimus.default_subsimus`. 2. Building a dependency graph and walking it in forward order with :func:`~.utils.fwd_pipe.fwd_adj_pipe` (``mode='forward'``). 3. Walking the same graph in reverse order (``mode='adjoint'``). Each returned pipe is a list of ``(date, transform_id, direction)`` tuples, where ``direction`` is either ``'forward'`` or ``'adjoint'`` and controls whether a transform runs in its normal or dry-run mode. Args: self (ObsOperator): the obs-operator plugin instance. all_transforms: the :class:`~pycif.utils.classes.transforms.Transform` object holding all initialized transforms. mapper (dict): the pipeline mapper dictionary mapping transform IDs to their sub-simulation, input/output and precursor/successor metadata. Returns: tuple[list, list]: ``(pipe_fwd, pipe_adj)`` where each element is a list of ``(datetime.datetime, str, str)`` tuples giving the execution order for forward and adjoint runs respectively. """ info("Computing the optimal order of transformation. This can take a while") # First update subsimulations from precursors and successors default_subsimus(all_transforms, mapper) # Get final pipe_list including forward transformations info("Doing forward order") all_pipe_transforms_fwd = fwd_adj_pipe( self, all_transforms, mapper, mode="forward") info("Doing adjoint order") all_pipe_transforms_bkwd = fwd_adj_pipe( self, all_transforms, mapper, mode="adjoint") return all_pipe_transforms_fwd, all_pipe_transforms_bkwd