Source code for pycif.plugins.transforms.basic.unit_conversion.forward
import xarray as xr
from logging import debug
import copy
import numpy as np
from .....utils.parallel import thread
[docs]
def forward(
transf,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
**kwargs
):
ddi = min(di, df)
# Threading the application of the scaling factor for ensembles
nthreads = transf.nthreads
thread_intervals = np.linspace(
0, len(mapper["inputs"]), nthreads + 1
).astype(int)
list_trids = copy.deepcopy(list(mapper["inputs"].keys()))
@thread
def thread_function(ithread):
for itrid in range(thread_intervals[ithread], thread_intervals[ithread + 1]):
trid = list_trids[itrid]
debug("Computing unit conversion for {} with scaling of {}"
.format(trid, transf.scale))
# Compute scaling with/without areas
scaling = transf.scale
if transf.grid_to_surface:
domain = mapper["inputs"][trid]["domain"]
if not hasattr(domain, "areas"):
domain.calc_areas()
scaling /= domain.areas
# Apply scaling for sparse data
if mapper["inputs"][trid].get("sparse_data", False):
if onlyinit:
continue
xmod_in = inout_datastore["inputs"][trid][ddi]
inout_datastore["outputs"][trid][ddi] = copy.deepcopy(xmod_in)
xmod_out = inout_datastore["outputs"][trid][ddi]
xmod_out.loc[:, ("maindata", "spec")] = \
scaling * xmod_in["maindata"]["spec"].values
if mode == "tl":
xmod_out.loc[:, ("maindata", "incr")] = \
scaling * xmod_in["maindata"]["incr"].values
continue
# Other format for regular data
if "spec" not in inout_datastore["inputs"][trid][ddi]:
continue
inout_datastore["outputs"][trid][ddi] = \
xr.Dataset({"spec":
scaling * inout_datastore["inputs"][trid][ddi]["spec"]})
if mode == "tl":
inout_datastore["outputs"][trid][ddi]["incr"] = \
scaling * inout_datastore["inputs"][trid][ddi]["incr"]
thread_function(range(nthreads))