import numpy as np
import pandas as pd
import xarray as xr
import copy
from .....utils.parallel import thread
from .....utils.datastores.empty import init_empty
try:
import cPickle as pickle
except ImportError:
import pickle
from .utils.sparse.forward import forward as sparse_forward
from .utils.array.forward import forward as array_forward
[docs]
def forward(
transf,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
save_debug=True,
**kwargs
):
"""Temporally interpolate or re-index data to the output time grid.
Dispatches to either the sparse (observation-indexed DataFrame) or
array (gridded xarray) implementation based on the ``sampled_out``
and ``sparse_out`` flags in the mapper:
* **Sparse/sampled output** — :func:`~.utils.sparse.forward.forward`:
resamples or re-indexes observation DataFrames to the output dates.
* **Array output** — :func:`~.utils.array.forward.forward`:
applies linear temporal interpolation to gridded xarray fields.
Args:
transf (Plugin): time_interpolation instance (carries
``method``, ``nthreads``, and interpolation index metadata).
inout_datastore (dict): mutable datastore.
controlvect: unused.
obsvect: unused.
mapper (dict): transform mapper (carries ``sparse_data``,
``sampled``, and date index metadata per tracer).
di (datetime): sub-simulation start date.
df (datetime): sub-simulation end date.
mode (str): ``'fwd'`` or ``'tl'``.
runsubdir (str): unused.
workdir (str): unused.
onlyinit (bool): passed to the sparse/array implementations.
save_debug (bool): if ``True``, save intermediate results for
debugging.
**kwargs: forwarded to the sparse/array implementations.
"""
ddi = min(di, df)
inputs = inout_datastore["inputs"]
# Fetch information from reference trid
# Valid for both ensemble and single simulations
trid_ref = list(mapper["inputs"].keys())[0]
sparse_in = mapper["inputs"][trid_ref]["sparse_data"]
sparse_out = mapper["outputs"][trid_ref]["sparse_data"]
sampled_in = mapper["inputs"][trid_ref]["sampled"]
sampled_out = mapper["outputs"][trid_ref]["sampled"]
# Deal differently sparse and array data
# Array data is parallelized along trid externally, while sparse share indexing
if sampled_out or sparse_out:
sparse_forward(
transf,
ddi,
inputs,
inout_datastore,
onlyinit, mapper, sampled_out, sparse_out,
save_debug, nthreads=transf.nthreads
)
else:
array_forward(
ddi, mapper, inout_datastore, inputs, nthreads=transf.nthreads
)
# # Threading the application of the scaling factor for ensembles
# nthreads = transf.nthreads
# thread_intervals = np.linspace(
# 0, len(mapper["inputs"]), nthreads + 1
# ).astype(int)
# list_trids = copy.deepcopy(list(mapper["inputs"].keys()))
# @thread
# def thread_function(ithread):
# for itrid in range(thread_intervals[ithread], thread_intervals[ithread + 1]):
# trid = list_trids[itrid]
# # # Fetch outputs depending on date
# # outputs = None
# # for data_id in inout_datastore["outputs"]:
# # if data_id == trid:
# # outputs = inout_datastore["outputs"][data_id][ddi]
# # if outputs is None:
# # continue
# # Interpolate for sparse data
# sparse_in = mapper["inputs"][trid]["sparse_data"]
# sparse_out = mapper["outputs"][trid]["sparse_data"]
# sampled_in = mapper["inputs"][trid]["sampled"]
# sampled_out = mapper["outputs"][trid]["sampled"]
# if sampled_out or sparse_out:
# sparse_forward(
# transf,
# trid, ddi,
# inputs,
# inout_datastore,
# onlyinit, mapper, sampled_out, sparse_out,
# save_debug
# )
# else:
# array_forward(
# ddi, mapper, trid, inout_datastore, inputs,
# )
# thread_function(range(nthreads))