import pandas as pd
from logging import debug
import numpy as np
from .....utils.datastores.crop_monitor import crop_monitor
from .....utils.dates import date_range
[docs]
def forward(
transform,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
**kwargs
):
"""Store simulated concentrations from the datastore into the global observation vector.
For each observation tracer (``isobs = True``) found in the input
datastore:
1. Reads the ``'spec'`` (and ``'incr'`` in TL mode) columns from the
datastore entry ``datastore[tracer_id][ddi]``.
2. Accumulates them into the corresponding slice of ``obsvect.ysim``
(and ``obsvect.dy``) via the tracer's ``ypointer`` offset.
3. Zeros ``obsvect.dy`` at locations outside the active observation
mask.
4. Writes the simulated and adjoint columns back into the tracer's
reference datastore for diagnostic access.
When ``split_freq`` is configured, only the observations falling within
the current sub-window ``[ddi, split_ddf)`` are accumulated, weighted
by their fractional overlap with the window.
Args:
transform (Plugin): toobsvect transform instance.
inout_datastore (dict): datastore with ``'inputs'`` carrying
sampled simulated fields.
controlvect: control vector plugin (unused).
obsvect: observation vector plugin; ``ysim`` and ``dy`` are
updated in-place.
mapper (dict): transform mapper.
di (datetime): sub-simulation start date.
df (datetime): sub-simulation end date.
mode (str): ``'fwd'`` or ``'tl'``.
runsubdir (str): sub-simulation run directory (unused).
workdir (str): root working directory (unused).
onlyinit (bool): if ``True``, return immediately.
**kwargs: unused.
"""
if onlyinit:
return
ddi = min(di, df)
ddf = max(di, df)
datastore = inout_datastore["inputs"]
datavect = obsvect.datavect
tracer_ids = mapper["inputs"]
for tracer_id in tracer_ids:
if tracer_id not in datastore:
continue
mod_input = tracer_id[0]
trcr = tracer_id[1]
# If this type of input is not considered in the control vector,
# ignoring the model sensitivity
component = getattr(getattr(datavect, "components", None),
mod_input,
None)
parameters = getattr(component, "parameters", None)
# Now fetch info from data structure to common structure
param = getattr(parameters, trcr, None)
if param is None or not getattr(param, "isobs", False):
debug(f"{trcr}/{mod_input} was not is the observation vector. Passing")
continue
if datastore.get(tracer_id, None) is None:
debug(f"{trcr}/{mod_input} was not provided any data to put back in the observation vector")
continue
data_sim = datastore[tracer_id][ddi]["maindata"]
if "spec" not in data_sim:
continue
# Keep only sub-period if `split_freq` is specified
ds_meta = param.datastore
crop_index = np.arange(len(ds_meta), dtype=int)
weight = 1
if hasattr(transform, "split_freq"):
obsvect_dates = date_range(
transform.datei, transform.datef, period=transform.split_freq)
i0 = np.argwhere(obsvect_dates == ddi)[0][0]
split_ddf = obsvect_dates[i0 + 1]
crop_index, weight = crop_monitor(
ds_meta, ddi, split_ddf,
return_index=True,
return_weight=True,
keep_partial=True
)
obsvect.ysim[param.ypointer: param.ypointer + param.dim][crop_index] += \
np.asarray(data_sim.loc[:, "spec"]).astype(float).flatten() * weight
obsvect.dy[param.ypointer: param.ypointer + param.dim][crop_index] += \
np.asarray(data_sim.loc[:, "incr"]).astype(float).flatten() * weight
# Set increment to zero if not in observation vector
obsvect.dy[~obsvect.obsvect_mask] = 0
# Update the reference datastore
columns2update = [
('maindata', 'spec'),
('maindata', 'incr'),
('maindata', 'adj_out')
]
for c in columns2update:
if c not in datastore[tracer_id][ddi]:
continue
if c not in param.datastore:
param.datastore[c] = pd.NA
ind_column = param.datastore.columns.get_loc(c)
try:
param.datastore.iloc[crop_index, ind_column] = \
datastore[tracer_id][ddi].loc[:, c].values
except:
print(__file__)
import code
code.interact(local=dict(locals(), **globals()))