import copy
import os
import numpy as np
import pandas as pd
from logging import warning
from .utils import init_param
from ....utils.datastores.compress import compress_datastore
[docs]
def init_y0(obsvect, **kwargs):
"""Initialise the flat observation-vector arrays from the datavect configuration.
Iterates over every ``component / tracer`` pair declared in
``obsvect.datavect.components`` and, for each one that is flagged as an
observation (``tracer.isobs``):
1. Loads the datastore from a pre-computed ``monitor.nc`` file
(``dir_obsvect`` is set) or reads it from the raw monitor files via
:func:`~.utils.init_param`.
2. Assigns a contiguous slice in the global ``yobs`` / ``yobs_err`` /
``ysim`` / ``dy`` arrays using ``tracer.ypointer``.
3. Appends the ``obsvect_mask`` boolean array that marks which
observations enter the cost function (``is_obsvect == True``).
4. Optionally applies per-tracer error scaling
(``obserror_scale`` and ``obserror_value``).
5. Compresses the tracer datastore to save memory.
6. Dumps the final observation vector when ``obsvect.dump_obs`` is set.
Args:
obsvect (Plugin): obsvect plugin instance. On entry its flat arrays
(``yobs``, ``ysim``, etc.) are empty; on exit they are filled.
**kwargs: forwarded to :func:`~.utils.init_param` and downstream
datastream ``read`` / ``fetch`` methods.
Returns:
Plugin: the updated *obsvect* instance (also modified in-place).
"""
datei = getattr(obsvect, "datei")
datef = getattr(obsvect, "datef")
obsvect.dim = 0
obsvect.yobs = np.ones(0)
obsvect.yobs_err = np.ones(0)
obsvect.ysim = np.ones(0)
obsvect.dy = np.ones(0)
obsvect.obsvect_mask = np.ones(0, dtype=bool)
datavect = obsvect.datavect
# If no definition is specified for the control vector in the Yaml,
# return empty control vector
if not hasattr(datavect, "components"):
return obsvect
# Else, carry on initializing
components = datavect.components
for comp in components.attributes:
component = getattr(components, comp)
# Skip if component does not have parameters
if not hasattr(component, "parameters"):
continue
component.isobs = comp in obsvect.default_obstypes
for trcr in component.parameters.attributes:
tracer = getattr(component.parameters, trcr)
# Skip tracers that are not control variables
tracer.isobs = (
comp in obsvect.default_obstypes
or tracer.isobs
)
if not tracer.isobs:
continue
# Keeping a pointer to the correct location in the whole control
tracer.ypointer = obsvect.dim
# Initializing data for tracer if needed
if obsvect.dir_obsvect == "":
obsvect.dir_obsvect = os.path.join(obsvect.workdir, "obsvect")
file_ref = f"{obsvect.dir_obsvect}/{comp}/{trcr}/monitor.nc"
param = getattr(tracer, "varname", "")
param = trcr if param == "" else param
init_param(obsvect, tracer, param, comp, file_ref)
# Replacing tracer name
tracer.datastore[("metadata", "parameter")] = \
len(tracer.datastore) * [trcr]
tracer.datastore[("metadata", "parameter")] = \
tracer.datastore[("metadata", "parameter")].astype("category")
# Compute end date for interpolation
tracer.datastore[("metadata", "enddate")] = \
tracer.datastore["metadata"]["date"] \
+ pd.to_timedelta(
tracer.datastore["metadata"]["duration"].astype(
float) * 3600,
unit="s")
# Deal with obsvect flag
if tracer.datastore["metadata"]["is_obsvect"].dtype == "O":
tracer.datastore[("metadata", "is_obsvect")] = \
~copy.deepcopy(
tracer.datastore.loc[:, ("metadata", "is_obsvect")]
== "False").values
tracer.datastore[("metadata", "is_obsvect")] = \
tracer.datastore[("metadata", "is_obsvect")].astype(bool)
mask_obsvect = \
tracer.datastore["metadata"].loc[
:, "is_obsvect"
].values.astype(bool)
if not getattr(tracer, "is_obsvect", True):
mask_obsvect[:] = False
setattr(tracer, "dont_propagate_obsvect", True)
# Crop data if obsvect_only
if obsvect.obsvect_only:
tracer.datastore = tracer.datastore.loc[mask_obsvect]
mask_obsvect = mask_obsvect[mask_obsvect]
tracer.datastore = tracer.datastore.reset_index(drop=True)
# If datastore is empty, do not treat as observation
if len(tracer.datastore) == 0:
tracer.isobs = False
warning(
f"No valid observation will be used for {comp} / {trcr}.\n"
"Please check your Yaml and input data if this is not the expected "
"behaviour. \n"
"This can happen for instance if your observation data are "
"all outside the simulation window."
)
continue
# Fill attributes depending on values
obsvect.obsvect_mask = np.append(
obsvect.obsvect_mask, mask_obsvect)
tracer.dim = len(tracer.datastore)
obsvect.dim += tracer.dim
obsvect.yobs = np.append(
obsvect.yobs, tracer.datastore["maindata"].loc[:, "obs"])
# Load simulations and increments from previous run if any
ysim = np.zeros(len(tracer.datastore))
dy = np.zeros(len(tracer.datastore))
if "sim" in tracer.datastore["maindata"]:
ysim = tracer.datastore["maindata"].loc[:, "sim"]
if "sim_tl" in tracer.datastore["maindata"]:
dy = tracer.datastore["maindata"].loc[:, "sim_tl"]
obsvect.ysim = np.append(obsvect.ysim, ysim)
obsvect.dy = np.append(obsvect.dy, dy)
# Appending errors
to_append = tracer.datastore["maindata"].loc[:, "obserror"] \
if "obserror" in tracer.datastore["maindata"] \
else np.zeros(len(tracer.datastore))
to_append = np.sqrt(
(to_append * getattr(tracer, "obserror_scale", 1)) ** 2
+ getattr(tracer, "obserror_value", 0) ** 2
)
obsvect.yobs_err = np.append(obsvect.yobs_err, to_append)
# Compressing the data
tracer.datastore = compress_datastore(tracer.datastore)
if obsvect.dump_obs:
obsvect.dump(f"{obsvect.workdir}/obsvect/")
return obsvect