Source code for pycif.plugins.obsoperators.standard.transforms.period_pipe

import pandas as pd
import numpy as np
import copy
from logging import info
import itertools


from .utils.default_subsimus import default_subsimus


[docs]def period_pipe(self, all_transforms, mapper): """Arrange all transformations for all their sub-simulation periods into a pipe whose order respects the required precursors and successors for each transformation. First propagate sub-simulation periods to precursors/successors for transformations which don't have pre-defined sub-simulation periods. Second, define a graph from all the precursors of all transformations Last, walk the graph to define the proper order of the transformations :param all_transforms: the object gathering all transformations :param mapper: the dictionary containing all information about the input/output of each transformation :return: the pipes to be computed in forward and backward mode, including for each direction a dry run in the other direction for initialization """ info("Computing the optimal order of transformation. This can take a while") # First update subsimulations from precursors and successors default_subsimus(all_transforms, mapper) # Loop over transformations and periods and find precursors to be # computed just before pipe_links = {} transforms_ids = [] for i, transf in enumerate(all_transforms.attributes[::-1]): # For each transform loop over all its sub-simulations for simu in mapper[transf]["subsimus"]: transf_id = (simu, transf, "forward") if transf_id not in transforms_ids: transforms_ids.append(transf_id) name = getattr(all_transforms, transf).plugin.name # If the transform is not already in the pipe, # add it only if it has some successors, or if it is related to # the observation vector if transf_id not in pipe_links: all_successors = \ [success for trid in mapper[transf]["successors"] for success in mapper[transf]["successors"][trid]] if not all_successors: if getattr(all_transforms, transf).plugin.name != \ "toobsvect" and not self.force_full_operator: continue pipe_links[transf_id] = [] # Loop on all precursors and check whether they produce outputs # necessary for the present sub-simulation transf_subsimus = mapper[transf]["subsimus"][simu] transf_precursors = mapper[transf]["precursors"] for trid in transf_precursors: if trid not in transf_subsimus["inputs"]: continue # Loop on the input sub-simulation for that trid for prec_simu in transf_subsimus["inputs"][trid]: for precursor in transf_precursors[trid]: precurs_subsimus = mapper[precursor]["subsimus"] is_in_precursor = [ tmp_simu for tmp_simu in precurs_subsimus if prec_simu in precurs_subsimus[tmp_simu]["outputs"][trid]] for tmp_simu in is_in_precursor: precur_id = (tmp_simu, precursor, "forward") if precur_id not in transforms_ids: transforms_ids.append(precur_id) pipe_links[transf_id].append(precur_id) # Add a virtual transformation of which the precursors are toobsvect # To recursively walk the path leading to them self.pipe_links = copy.deepcopy(pipe_links) pipe_links["final_toobsvect"] = [ transf_id for transf_id in pipe_links if list(itertools.chain.from_iterable( mapper[transf_id[1]]["successors"].values())) == [] ] transforms_ids.append("final_toobsvect") # Turn pipe_links to list of indexes to speed up pipe_links_inds = [ [transforms_ids.index(precur_id) for precur_id in pipe_links.get(transf_id, [])] for transf_id in transforms_ids ] # Turns to pandas ref_ds = pd.DataFrame(data={"precursors": 0}, index=range(len(pipe_links_inds))) ref_ds = ref_ds.iloc[ref_ds.index.repeat([len(p) for p in pipe_links_inds])] ref_ds.loc[:, "precursors"] = list(itertools.chain(*pipe_links_inds)) ref_ds["index"] = ref_ds.index ref_len = np.inf ds = pd.DataFrame( data={"precursors": pipe_links_inds[-1]}, index=[len(pipe_links_inds) - 1] * len(pipe_links_inds[-1])) ds["index"] = ds.index.values while ref_len != len(ds) or np.any(tmp_ds.index.values != ds.index.values): ref_len = len(ds) tmp_ds = copy.deepcopy(ds) precursors = ref_ds.loc[ ds["precursors"].loc[ds["precursors"].isin(ref_ds.index)]].dropna() new_index = pd.RangeIndex(len(ds)).repeat( [len(pipe_links_inds[i]) + 1 for i in ds["precursors"].astype(int)]) orig_index = np.cumsum( [len(pipe_links_inds[i]) + 1 for i in ds["precursors"].astype(int)]) - 1 ds = ds.iloc[new_index] ds.iloc[[i for i in range(len(ds)) if i not in orig_index]] = \ precursors.values ds = ds.drop_duplicates() ds.index = ds["index"].values info("Optimal order computed!") # Get final pipe_list pipe_indexes = ds["precursors"].drop_duplicates().values.astype(int) pipe_order = [transforms_ids[i] for i in pipe_indexes] pipe_order_bkwd = [(s[0], s[1], "backward") for s in pipe_order] return pipe_order_bkwd[::-1] + pipe_order, \ pipe_order + pipe_order_bkwd[::-1]