Source code for pycif.plugins.obsoperators.standard.transforms.utils.connect_pipes
import copy
import itertools
from logging import debug
from . import add_default
[docs]
def connect_pipes(all_transforms, mapper, transform):
    """Connect transforms based on their inputs and outputs"""
    debug(f"Connecting pipeline around {transform}")
    # Check whether transform is already in transform_pipe,
    # otherwise raise exception
    if transform not in all_transforms.attributes:
        raise AttributeError(
            f"Cannot connect {transform} to the rest of the pipeline. "
            "Include it to the pipeline before calling connect_pipe"
        )
    itransf = all_transforms.attributes.index(transform)
    transf_mapper = mapper[transform]
    # Update precursors and successors with all possible transformation
    # using the corresponding input/output
    # For each transformation, looking for all trid in its own inputs that
    # are found in outputs from another transform before it
    transf_mapper["precursors"] = transf_mapper.get("precursors", {})
    precursors = transf_mapper["precursors"]
    ref_precursors = copy.deepcopy(precursors)
    for inpt in transf_mapper["inputs"]:
        if inpt not in precursors:
            precursors[inpt] = []
        # If already some precursors, skip
        if ref_precursors[inpt] != []:
            continue
        # Otherwise, fetch all possible precursors
        for tr in all_transforms.attributes[:itransf]:
            if inpt in mapper[tr]["outputs"] \
                    
and tr not in precursors[inpt]:
                precursors[inpt].append(tr)
    transf_mapper["successors"] = transf_mapper.get("successors", {})
    successors = transf_mapper["successors"]
    ref_successors = copy.deepcopy(successors)
    for outpt in transf_mapper["outputs"]:
        if outpt not in successors:
            successors[outpt] = []
        # Skip if successors before initialization
        if ref_successors[outpt] != []:
            continue
        # Otherwise, fetch all possible successors
        for tr in all_transforms.attributes[itransf + 1:]:
            if outpt in mapper[tr]["inputs"] \
                    
and tr not in successors[outpt]:
                successors[outpt].append(tr)
    # Clean successors and precursors by removing direct pipes
    # This means that some successive transforms may have the same
    # inputs/outputs and only carry on computations on them only, i.e.,
    # not changing the shape of the datastore.
    successors = transf_mapper["successors"]
    for trid in successors:
        redundancy = set(itertools.chain(
            *[mapper[tr_ref]["successors"].get(trid, [])
                for tr_ref in successors[trid]]))
        for trtmp in redundancy:
            # Skip if trtmp was already in pre-defined successors
            if trtmp in ref_successors[trid]:
                continue
            # Otherwise exclude it
            if trtmp != transform and trtmp in successors[trid]:
                successors[trid].remove(trtmp)
    precursors = transf_mapper["precursors"]
    for trid in precursors:
        redundancy = set(itertools.chain(
            *[mapper[tr_ref]["precursors"].get(trid, [])
                for tr_ref in precursors[trid]]))
        for trtmp in redundancy:
            # Skip if trtmp was already in pre-defined precursors
            if trtmp in ref_precursors[trid]:
                continue
            # Otherwise exclude it
            if trtmp != transform and trtmp in precursors[trid]:
                precursors[trid].remove(trtmp)
    # Now update precursors (resp. successors)
    # of transforms after (resp. before) this one
    for trid in successors:
        for trtmp in successors[trid]:
            tmp_mapper = mapper[trtmp]
            tmp_mapper["precursors"][trid] = [
                t for t in tmp_mapper["precursors"][trid]
                if t not in precursors.get(trid, [])
            ]
            if transform not in tmp_mapper["precursors"][trid]:
                tmp_mapper["precursors"][trid].append(transform)
            # If several precursors for a given trid, take last
            if len(tmp_mapper["precursors"][trid]) > 1:
                list_indexes = [
                    all_transforms.attributes.index(tr)
                    for tr in tmp_mapper["precursors"][trid]
                ]
                if ref_precursors.get(trid, []) != []:
                    tmp_mapper["precursors"][trid] = \
                        
ref_precursors[trid]
                    continue
                tmp_mapper["precursors"][trid] = \
                    
[all_transforms.attributes[max(list_indexes)]]
                for tr_index in list_indexes:
                    if tr_index != max(list_indexes):
                        mapper[
                            all_transforms.attributes[tr_index]
                        ]["successors"][trid].remove(trtmp)
    for trid in precursors:
        for trtmp in precursors[trid]:
            tmp_mapper = mapper[trtmp]
            tmp_mapper["successors"][trid] = [
                t for t in tmp_mapper["successors"][trid]
                if t not in successors.get(trid, [])
            ]
            if transform not in tmp_mapper["successors"][trid]:
                tmp_mapper["successors"][trid].append(transform) 
    # Now prune dead branches
    # prune_dead_branches(all_transforms, mapper, transform)
def prune_dead_branches(all_transforms, mapper, skip_transform):
    ref_transforms = copy.deepcopy(all_transforms.attributes)
    for transform in ref_transforms:
        # Skipping toobsvect
        if "toobsvect" in transform:
            continue
        # Skip transform being initialize
        if transform == skip_transform:
            continue
        successors = mapper[transform].get("successors", {})
        # Remove transform if no successors
        if sum([len(successors[trid]) for trid in successors]) == 0:
            debug(f"Removing {transform}")
            # Discard it from precursors
            for trid in mapper[transform].get("precursors", {}):
                for tr in mapper[transform]["precursors"][trid]:
                    mapper[tr]["successors"][trid].remove(transform)
            # Now deleting the transform itself
            mapper[transform] = {}
            del mapper[transform]
            all_transforms.attributes.remove(transform)
            setattr(all_transforms, transform, None)