Source code for pycif.plugins.transforms.basic.time_interpolation.forward
import numpy as np
import pandas as pd
import xarray as xr
import copy
from .....utils.parallel import thread
from .....utils.datastores.empty import init_empty
try:
import cPickle as pickle
except ImportError:
import pickle
from .utils.sparse.forward import forward as sparse_forward
from .utils.array.forward import forward as array_forward
[docs]
def forward(
transf,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
save_debug=True,
**kwargs
):
ddi = min(di, df)
inputs = inout_datastore["inputs"]
# Fetch information from reference trid
# Valid for both ensemble and single simulations
trid_ref = list(mapper["inputs"].keys())[0]
sparse_in = mapper["inputs"][trid_ref]["sparse_data"]
sparse_out = mapper["outputs"][trid_ref]["sparse_data"]
sampled_in = mapper["inputs"][trid_ref]["sampled"]
sampled_out = mapper["outputs"][trid_ref]["sampled"]
# Deal differently sparse and array data
# Array data is parallelized along trid externally, while sparse share indexing
if sampled_out or sparse_out:
sparse_forward(
transf,
ddi,
inputs,
inout_datastore,
onlyinit, mapper, sampled_out, sparse_out,
save_debug, nthreads=transf.nthreads
)
else:
array_forward(
ddi, mapper, inout_datastore, inputs, nthreads=transf.nthreads
)
# # Threading the application of the scaling factor for ensembles
# nthreads = transf.nthreads
# thread_intervals = np.linspace(
# 0, len(mapper["inputs"]), nthreads + 1
# ).astype(int)
# list_trids = copy.deepcopy(list(mapper["inputs"].keys()))
# @thread
# def thread_function(ithread):
# for itrid in range(thread_intervals[ithread], thread_intervals[ithread + 1]):
# trid = list_trids[itrid]
# # # Fetch outputs depending on date
# # outputs = None
# # for data_id in inout_datastore["outputs"]:
# # if data_id == trid:
# # outputs = inout_datastore["outputs"][data_id][ddi]
# # if outputs is None:
# # continue
# # Interpolate for sparse data
# sparse_in = mapper["inputs"][trid]["sparse_data"]
# sparse_out = mapper["outputs"][trid]["sparse_data"]
# sampled_in = mapper["inputs"][trid]["sampled"]
# sampled_out = mapper["outputs"][trid]["sampled"]
# if sampled_out or sparse_out:
# sparse_forward(
# transf,
# trid, ddi,
# inputs,
# inout_datastore,
# onlyinit, mapper, sampled_out, sparse_out,
# save_debug
# )
# else:
# array_forward(
# ddi, mapper, trid, inout_datastore, inputs,
# )
# thread_function(range(nthreads))