import copy
import os
from pathlib import Path
import numpy as np
from .....utils.datastores.dump import dump_datastore
from .....utils.path import init_dir
[docs]
def forward(
transform,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
**kwargs
):
"""Convert isotopologue concentrations to per-mil isotopic ratios.
For each δ-value output declared in the mapper, computes:
.. math::
\\delta = \\left(\\frac{\\sum_{iso}[iso]}{\\sum_{ref}[ref]}
\\cdot \\frac{1}{R_{std}} - 1\\right) \\times 1000
The total concentration output is the sum of all input isotopologues.
In TL mode the same linear formula is applied to increments.
The input concentrations at the current date are saved to
``chain/conc2ratio/`` so the adjoint can reload them.
Supports ensemble (batch sampling) runs: ``__sample#N`` tracers are
grouped and processed together to avoid memory bloat.
Args:
transform (Plugin): conc2ratio plugin instance (carries
``parameters_out`` with ``standard``, ``refs``, and
``isotopologues`` attributes per signature, and ``metadata``
for parameter-name tracking).
inout_datastore (dict): mutable datastore.
controlvect: unused.
obsvect: unused.
mapper (dict): transform mapper.
di (datetime): sub-simulation start date.
df (datetime): sub-simulation end date.
mode (str): ``'fwd'`` or ``'tl'``.
runsubdir (str): sub-simulation run directory (used to locate the
``chain/`` directory for saving forward data).
workdir (str): root working directory.
onlyinit (bool): if ``True``, return immediately.
**kwargs: forwarded to downstream operations.
"""
if onlyinit:
return
ddi = min(di, df)
trids_in = list(mapper["inputs"].keys())
trids_out = list(mapper["outputs"].keys())
inputs_all = [trid[1] for trid in trids_in]
outputs_all = [trid[1] for trid in trids_out]
in_types_all = [trid[0] for trid in trids_in]
out_types_all = [trid[0] for trid in trids_out]
datastore_in = inout_datastore["inputs"]
datastore_out = inout_datastore["outputs"]
# Save data for later use by adjoint
dir_fwd = ddi.strftime(f"{runsubdir}/../chain/isotopes")
if not os.path.isdir(dir_fwd):
init_dir(dir_fwd)
for comp, spec in zip(in_types_all, inputs_all):
# Reload parameter metadata from dry run metadata
datastore_in[(comp, spec)][di][("metadata", "parameter")] = \
transform.metadata[ddi][(comp, spec)]["parameter"]
datastore_in[(comp, spec)][di][("metadata", "parameter_ref")] = \
transform.metadata[ddi][(comp, spec)]["parameter_ref"]
# Save data for later use by adjoint
file_fwd_data = Path(runsubdir).parent.joinpath(
"chain",
"conc2ratio",
ddi.strftime(
f"{transform.orig_name}_fwd_obsvect_{comp}_{spec}_%Y%m%d%H%M.nc"
),
)
dump_datastore(
datastore_in[(comp, spec)][di],
file_monit=str(file_fwd_data),
dump_default=False,
col2dump=[("metadata", "parameter"), ("metadata", "parameter_ref"),
("maindata", "spec"), ("maindata", "incr")],
mode="w",
)
if any("__sample#" in inp for inp in inputs_all):
trids_in_pert = [trid for trid in trids_in if "__sample#" in trid[1]]
trids_out_pert = [trid for trid in trids_out if "__sample#" in trid[1]]
trids_in_nopert = [
trid for trid in trids_in if not "__sample#" in trid[1]]
trids_out_nopert = [
trid for trid in trids_out if not "__sample#" in trid[1]]
nsamples = max([int(trid[1].split("__sample#")[1])
for trid in trids_in_pert]) + 1
list_idxs_input = []
list_idxs_output = []
for isample in range(nsamples):
sample_id_str = f"__sample#{isample:03d}"
list_idxs_input.append([idx for idx, trid
in enumerate(trids_in_pert)
if sample_id_str in trid[1]])
list_idxs_output.append([idx for idx, trid
in enumerate(trids_out_pert)
if sample_id_str in trid[1]])
inputs_sorted = [[trids_in_pert[idx][1] for idx in idxs] +
[trid[1] for trid in trids_in_nopert]
for idxs in list_idxs_input]
outputs_sorted = [[trids_out_pert[idx][1] for idx in idxs] +
[trid[1] for trid in trids_out_nopert]
for idxs in list_idxs_output]
in_types_sorted = [[trids_in_pert[idx][0] for idx in idxs] +
[trid[0] for trid in trids_in_nopert]
for idxs in list_idxs_input]
out_types_sorted = [[trids_out_pert[idx][0] for idx in idxs] +
[trid[0] for trid in trids_out_nopert]
for idxs in list_idxs_output]
else:
inputs_sorted = [inputs_all]
outputs_sorted = [outputs_all]
in_types_sorted = [in_types_all]
out_types_sorted = [out_types_all]
# Run the main code for all group of samples
for inputs, outputs, in_types, out_types in \
zip(inputs_sorted, outputs_sorted,
in_types_sorted, out_types_sorted):
# Starts reshaping the datastore
df_all_isos = {trid[1]: datastore_in[trid][di]
for trid in zip(in_types, inputs)}
dfspec = df_all_isos[inputs[0]]
# Compute ratios from concentrations
sub_outputs = [(sign, outputs[-1]) for sign in outputs[:-1]]
for sub_output in sub_outputs:
sign_id = sub_output[0]
spec_id = sub_output[1]
sign_ref = sign_id.split("__sample#")[0] \
if "__sample#" in sign_id else sign_id
sign_attr = getattr(transform.parameters_out, sign_ref)
r_std = sign_attr.standard
refs = sign_attr.refs
isotopologues = sign_attr.isotopologues
# Add sampling suffix if samples
if "__sample#" in sign_id:
sample_id = int(sign_id.split("__sample#")[1])
sample_id_str = f"__sample#{sample_id:03d}"
refs = [r + sample_id_str
for r in refs]
isotopologues = [iso + sample_id_str
for iso in isotopologues]
ref_sims = sum([df_all_isos[ref]["maindata"][["spec", "incr"]]
for ref in refs])
isotopologue_sims = sum([df_all_isos[iso]["maindata"][["spec", "incr"]]
for iso in isotopologues])
# Skip if no signatures to simulate
mask_sign = dfspec["metadata"]["parameter_ref"] == sign_id.lower()
if np.sum(mask_sign) == 0:
continue
# Copy output datastores
ref_datastore_sign = copy.deepcopy(dfspec.loc[mask_sign])
comp_out = out_types[outputs.index(sign_id)]
datastore_out[(comp_out, sign_id)][di] = ref_datastore_sign
# Fill spec/incr values
ref_sim = ref_sims.loc[mask_sign, "spec"]
isotopologue_sim = isotopologue_sims.loc[mask_sign, "spec"]
sign_sim = isotopologue_sim / ref_sim
sign_sim /= r_std
sign_sim = (sign_sim - 1) * 1000
dfout_iso = datastore_out[(comp_out, sign_id)][di]
dfout_iso.loc[:, ("maindata", "spec")] = sign_sim
# Applying tangent-linear
if mode == "tl":
ref_sim_tl = ref_sims.loc[mask_sign, "incr"]
isotopologue_sim_tl = isotopologue_sims.loc[mask_sign, "incr"]
sign_sim_tl = (isotopologue_sim_tl / ref_sim -
isotopologue_sim / ref_sim ** 2 * ref_sim_tl)
sign_sim_tl /= r_std / 1000
dfout_iso.loc[:, ("maindata", "incr")] = sign_sim_tl
# Compute sum of isotopologues
spec_sims = sum([df_all_isos[s]["maindata"][["spec", "incr"]]
for s in inputs])
spec_id = outputs[-1]
comp = out_types[-1]
# Skip if no full concentrations
mask_spec = dfspec["metadata"]["parameter_ref"] == spec_id.lower()
if np.sum(mask_spec) == 0:
continue
ref_datastore_spec = copy.deepcopy(dfspec.loc[mask_spec])
datastore_out[(comp, spec_id)][di] = ref_datastore_spec
dfout_ref = datastore_out[(comp, spec_id)][di]
dfout_ref.loc[:, ("maindata", "spec")] = \
spec_sims.loc[mask_spec, "spec"].values
if mode == "tl":
dfout_ref.loc[:, ("maindata", "incr")] = \
spec_sims.loc[mask_spec, "incr"].values