Source code for pycif.plugins.obsoperators.standard.transforms.period_pipe
from logging import info
from .utils.default_subsimus import default_subsimus
from .utils.fwd_pipe import fwd_adj_pipe
[docs]
def period_pipe(self, all_transforms, mapper):
"""Arrange all transforms into ordered forward and adjoint execution pipes.
Determines the chronologically correct execution order for every
``(transform, sub-simulation date)`` pair by:
1. Propagating sub-simulation periods from each transform to its
precursors and successors via
:func:`~.utils.default_subsimus.default_subsimus`.
2. Building a dependency graph and walking it in forward order with
:func:`~.utils.fwd_pipe.fwd_adj_pipe` (``mode='forward'``).
3. Walking the same graph in reverse order (``mode='adjoint'``).
Each returned pipe is a list of ``(date, transform_id, direction)``
tuples, where ``direction`` is either ``'forward'`` or ``'adjoint'``
and controls whether a transform runs in its normal or dry-run mode.
Args:
self (ObsOperator): the obs-operator plugin instance.
all_transforms: the :class:`~pycif.utils.classes.transforms.Transform`
object holding all initialized transforms.
mapper (dict): the pipeline mapper dictionary mapping transform IDs
to their sub-simulation, input/output and
precursor/successor metadata.
Returns:
tuple[list, list]: ``(pipe_fwd, pipe_adj)`` where each element is a
list of ``(datetime.datetime, str, str)`` tuples giving the execution
order for forward and adjoint runs respectively.
"""
info("Computing the optimal order of transformation. This can take a while")
# First update subsimulations from precursors and successors
default_subsimus(all_transforms, mapper)
# Get final pipe_list including forward transformations
info("Doing forward order")
all_pipe_transforms_fwd = fwd_adj_pipe(
self, all_transforms, mapper, mode="forward")
info("Doing adjoint order")
all_pipe_transforms_bkwd = fwd_adj_pipe(
self, all_transforms, mapper, mode="adjoint")
return all_pipe_transforms_fwd, all_pipe_transforms_bkwd