Source code for pycif.plugins.transforms.basic.time_interpolation.forward

import numpy as np
import pandas as pd
import xarray as xr
import copy
from .....utils.parallel import thread
from .....utils.datastores.empty import init_empty

try:
    import cPickle as pickle
except ImportError:
    import pickle

from .utils.sparse.forward import forward as sparse_forward
from .utils.array.forward import forward as array_forward


[docs] def forward( transf, inout_datastore, controlvect, obsvect, mapper, di, df, mode, runsubdir, workdir, onlyinit=False, save_debug=True, **kwargs ): ddi = min(di, df) inputs = inout_datastore["inputs"] # Fetch information from reference trid # Valid for both ensemble and single simulations trid_ref = list(mapper["inputs"].keys())[0] sparse_in = mapper["inputs"][trid_ref]["sparse_data"] sparse_out = mapper["outputs"][trid_ref]["sparse_data"] sampled_in = mapper["inputs"][trid_ref]["sampled"] sampled_out = mapper["outputs"][trid_ref]["sampled"] # Deal differently sparse and array data # Array data is parallelized along trid externally, while sparse share indexing if sampled_out or sparse_out: sparse_forward( transf, ddi, inputs, inout_datastore, onlyinit, mapper, sampled_out, sparse_out, save_debug, nthreads=transf.nthreads ) else: array_forward( ddi, mapper, inout_datastore, inputs, nthreads=transf.nthreads )
# # Threading the application of the scaling factor for ensembles # nthreads = transf.nthreads # thread_intervals = np.linspace( # 0, len(mapper["inputs"]), nthreads + 1 # ).astype(int) # list_trids = copy.deepcopy(list(mapper["inputs"].keys())) # @thread # def thread_function(ithread): # for itrid in range(thread_intervals[ithread], thread_intervals[ithread + 1]): # trid = list_trids[itrid] # # # Fetch outputs depending on date # # outputs = None # # for data_id in inout_datastore["outputs"]: # # if data_id == trid: # # outputs = inout_datastore["outputs"][data_id][ddi] # # if outputs is None: # # continue # # Interpolate for sparse data # sparse_in = mapper["inputs"][trid]["sparse_data"] # sparse_out = mapper["outputs"][trid]["sparse_data"] # sampled_in = mapper["inputs"][trid]["sampled"] # sampled_out = mapper["outputs"][trid]["sampled"] # if sampled_out or sparse_out: # sparse_forward( # transf, # trid, ddi, # inputs, # inout_datastore, # onlyinit, mapper, sampled_out, sparse_out, # save_debug # ) # else: # array_forward( # ddi, mapper, trid, inout_datastore, inputs, # ) # thread_function(range(nthreads))