Source code for pycif.plugins.obsoperators.standard.transforms.utils.init_default_transformations
import copy
from logging import debug
from . import init_regrid, init_reindex, init_sparse
[docs]
def init_default_transformations(
self, all_transforms,
backup_comps, mapper,
transform,
do_pipe_entry=False,
trid_to_check=None
):
"""Initialize default transformations based on compatibility of input/output
formats of successive transforms.
"""
debug(f"Initialize default transformations for {transform}")
# Add default transformations:
# reprojections, sampling of full data to sparse data, etc.
transf_plg = getattr(all_transforms, transform)
transf_mapper = mapper[transform]
precursors = transf_mapper["precursors"]
successors = transf_mapper["successors"]
ref_outputs = transf_mapper["outputs"]
# Loop only on certaint trid if specified
list_trid_successors = copy.deepcopy(list(successors.keys()))
if trid_to_check != None:
list_trid_successors = trid_to_check
# Loop on successors and add intermediate transforms for reprojections
for trid in list_trid_successors:
trid_dict = ref_outputs[trid]
ref_successors = copy.deepcopy(successors[trid])
for tr in ref_successors:
# Skip if not in successors anymore
# Means that it has been replaced during initialization of other transforms
if tr not in successors[trid]:
continue
tr_plg = getattr(all_transforms, tr)
tr_mapper = mapper[tr]
tmp_dict = tr_mapper["inputs"][trid]
# Fetch info from yml
prm = trid[1]
cmp = trid[0]
components = self.datavect.components
comps = components.attributes
# Use backup_components if defined
# Skip if no backup is defined
cmp_in = cmp if cmp in comps else backup_comps.get(cmp, "")
cmp_plg = getattr(components, cmp_in, None)
param = ref_outputs[trid].get("tracer", None)
# Converts full data to sparse data (obs typically)
# or conversely
# Includes horizontal, vertical and temporal sampling
do_sparse = (trid_dict.get("sparse_data", False) is not None
and tmp_dict.get("sparse_data", False) is not None)
if (trid_dict.get("sparse_data", False)
or tmp_dict.get("sparse_data", False)) \
and do_sparse:
successor_id = init_sparse.init_sparse(
self,
trid,
trid_dict, tmp_dict,
transform, tr,
param,
all_transforms,
mapper,
backup_comps,
precursors,
do_pipe_entry=do_pipe_entry
)
continue
# Otherwise, do reprojections if necessary
do_reproj = (
not trid_dict.get("domain", None) is None
and not tmp_dict.get("domain", None) is None
)
do_reindex = (
not trid_dict.get("input_dates", None) is None
and not tmp_dict.get("input_dates", None) is None
)
domain_ref = trid_dict.get("domain", None)
domain_precurs = tmp_dict.get("domain", None)
same_domain = domain_ref == domain_precurs if domain_ref is not None else False
# Horizontal and vertical reprojection if needed
successor_id = tr
precursor_id = transform
trid_dict = mapper[precursor_id]["outputs"][trid]
tmp_dict = mapper[successor_id]["inputs"][trid]
temp_first = getattr(getattr(
param, "time_interpolation", None
), "first", False)
if temp_first and do_reindex:
precursor_id = init_reindex.init_reindex(
self,
trid,
trid_dict, tmp_dict,
precursor_id, successor_id,
param,
all_transforms,
mapper,
backup_comps,
precursors,
do_pipe_entry=do_pipe_entry
)
continue
trid_dict = mapper[precursor_id]["outputs"][trid]
tmp_dict = mapper[successor_id]["inputs"][trid]
if not same_domain and do_reproj:
regrid_pipe, did_nothing = init_regrid.init_regrid(
self,
trid,
trid_dict, tmp_dict,
precursor_id, successor_id,
param,
all_transforms,
mapper,
backup_comps,
precursors,
do_pipe_entry=do_pipe_entry
)
precursor_id = regrid_pipe[-2]
# Stop here as recursive init_default
# will take over for intermediate transforms
# Carry on if did_nothing in regrid, in case,
# there is a need for time_interpolation
if not did_nothing:
continue
trid_dict = mapper[precursor_id]["outputs"][trid]
tmp_dict = mapper[successor_id]["inputs"][trid]
# Do temporal re-indexing
if not temp_first and do_reindex:
successor_id = init_reindex.init_reindex(
self,
trid,
trid_dict, tmp_dict,
precursor_id, successor_id,
param,
all_transforms,
mapper,
backup_comps,
precursors,
do_pipe_entry=do_pipe_entry
)