Source code for pycif.plugins.obsoperators.standard.transforms.utils.init_default_transformations

import copy
from logging import debug

from . import init_regrid, init_reindex, init_sparse


[docs] def init_default_transformations( self, all_transforms, backup_comps, mapper, transform, do_pipe_entry=False, trid_to_check=None ): """Initialize default transformations based on compatibility of input/output formats of successive transforms. """ debug(f"Initialize default transformations for {transform}") # Add default transformations: # reprojections, sampling of full data to sparse data, etc. transf_plg = getattr(all_transforms, transform) transf_mapper = mapper[transform] precursors = transf_mapper["precursors"] successors = transf_mapper["successors"] ref_outputs = transf_mapper["outputs"] # Loop only on certaint trid if specified list_trid_successors = copy.deepcopy(list(successors.keys())) if trid_to_check != None: list_trid_successors = trid_to_check # Loop on successors and add intermediate transforms for reprojections for trid in list_trid_successors: trid_dict = ref_outputs[trid] ref_successors = copy.deepcopy(successors[trid]) for tr in ref_successors: # Skip if not in successors anymore # Means that it has been replaced during initialization of other transforms if tr not in successors[trid]: continue tr_plg = getattr(all_transforms, tr) tr_mapper = mapper[tr] tmp_dict = tr_mapper["inputs"][trid] # Fetch info from yml prm = trid[1] cmp = trid[0] components = self.datavect.components comps = components.attributes # Use backup_components if defined # Skip if no backup is defined cmp_in = cmp if cmp in comps else backup_comps.get(cmp, "") cmp_plg = getattr(components, cmp_in, None) param = ref_outputs[trid].get("tracer", None) # Converts full data to sparse data (obs typically) # or conversely # Includes horizontal, vertical and temporal sampling do_sparse = (trid_dict.get("sparse_data", False) is not None and tmp_dict.get("sparse_data", False) is not None) if (trid_dict.get("sparse_data", False) or tmp_dict.get("sparse_data", False)) \ and do_sparse: successor_id = init_sparse.init_sparse( self, trid, trid_dict, tmp_dict, transform, tr, param, all_transforms, mapper, backup_comps, precursors, do_pipe_entry=do_pipe_entry ) continue # Otherwise, do reprojections if necessary do_reproj = ( not trid_dict.get("domain", None) is None and not tmp_dict.get("domain", None) is None ) do_reindex = ( not trid_dict.get("input_dates", None) is None and not tmp_dict.get("input_dates", None) is None ) domain_ref = trid_dict.get("domain", None) domain_precurs = tmp_dict.get("domain", None) same_domain = domain_ref == domain_precurs if domain_ref is not None else False # Horizontal and vertical reprojection if needed successor_id = tr precursor_id = transform trid_dict = mapper[precursor_id]["outputs"][trid] tmp_dict = mapper[successor_id]["inputs"][trid] temp_first = getattr(getattr( param, "time_interpolation", None ), "first", False) if temp_first and do_reindex: precursor_id = init_reindex.init_reindex( self, trid, trid_dict, tmp_dict, precursor_id, successor_id, param, all_transforms, mapper, backup_comps, precursors, do_pipe_entry=do_pipe_entry ) continue trid_dict = mapper[precursor_id]["outputs"][trid] tmp_dict = mapper[successor_id]["inputs"][trid] if not same_domain and do_reproj: regrid_pipe, did_nothing = init_regrid.init_regrid( self, trid, trid_dict, tmp_dict, precursor_id, successor_id, param, all_transforms, mapper, backup_comps, precursors, do_pipe_entry=do_pipe_entry ) precursor_id = regrid_pipe[-2] # Stop here as recursive init_default # will take over for intermediate transforms # Carry on if did_nothing in regrid, in case, # there is a need for time_interpolation if not did_nothing: continue trid_dict = mapper[precursor_id]["outputs"][trid] tmp_dict = mapper[successor_id]["inputs"][trid] # Do temporal re-indexing if not temp_first and do_reindex: successor_id = init_reindex.init_reindex( self, trid, trid_dict, tmp_dict, precursor_id, successor_id, param, all_transforms, mapper, backup_comps, precursors, do_pipe_entry=do_pipe_entry )