import copy
import os
from pathlib import Path
import numpy as np
from .....utils.datastores.dump import dump_datastore
from .....utils.path import init_dir
[docs]
def forward(
transform,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
**kwargs
):
if onlyinit:
return
ddi = min(di, df)
trids_in = list(mapper["inputs"].keys())
trids_out = list(mapper["outputs"].keys())
inputs_all = [trid[1] for trid in trids_in]
outputs_all = [trid[1] for trid in trids_out]
in_types_all = [trid[0] for trid in trids_in]
out_types_all = [trid[0] for trid in trids_out]
datastore_in = inout_datastore["inputs"]
datastore_out = inout_datastore["outputs"]
# Save data for later use by adjoint
dir_fwd = ddi.strftime("{}/../chain/isotopes".format(runsubdir))
if not os.path.isdir(dir_fwd):
init_dir(dir_fwd)
for comp, spec in zip(in_types_all, inputs_all):
# Reload parameter metadata from dry run metadata
datastore_in[(comp, spec)][di][("metadata", "parameter")] = \
transform.metadata[ddi][(comp, spec)]["parameter"]
datastore_in[(comp, spec)][di][("metadata", "parameter_ref")] = \
transform.metadata[ddi][(comp, spec)]["parameter_ref"]
# Save data for later use by adjoint
file_fwd_data = Path(runsubdir).parent.joinpath(
"chain",
"conc2ratio",
ddi.strftime(
f"{transform.orig_name}_fwd_obsvect_{comp}_{spec}_%Y%m%d%H%M.nc"
),
)
dump_datastore(
datastore_in[(comp, spec)][di],
file_monit=str(file_fwd_data),
dump_default=False,
col2dump=[("metadata", "parameter"), ("metadata", "parameter_ref"),
("maindata", "spec"), ("maindata", "incr")],
mode="w",
)
if any("__sample#" in inp for inp in inputs_all):
trids_in_pert = [trid for trid in trids_in if "__sample#" in trid[1]]
trids_out_pert = [trid for trid in trids_out if "__sample#" in trid[1]]
trids_in_nopert = [
trid for trid in trids_in if not "__sample#" in trid[1]]
trids_out_nopert = [
trid for trid in trids_out if not "__sample#" in trid[1]]
nsamples = max([int(trid[1].split("__sample#")[1])
for trid in trids_in_pert]) + 1
list_idxs_input = []
list_idxs_output = []
for isample in range(nsamples):
sample_id_str = "__sample#{:03d}".format(isample)
list_idxs_input.append([idx for idx, trid
in enumerate(trids_in_pert)
if sample_id_str in trid[1]])
list_idxs_output.append([idx for idx, trid
in enumerate(trids_out_pert)
if sample_id_str in trid[1]])
inputs_sorted = [[trids_in_pert[idx][1] for idx in idxs] +
[trid[1] for trid in trids_in_nopert]
for idxs in list_idxs_input]
outputs_sorted = [[trids_out_pert[idx][1] for idx in idxs] +
[trid[1] for trid in trids_out_nopert]
for idxs in list_idxs_output]
in_types_sorted = [[trids_in_pert[idx][0] for idx in idxs] +
[trid[0] for trid in trids_in_nopert]
for idxs in list_idxs_input]
out_types_sorted = [[trids_out_pert[idx][0] for idx in idxs] +
[trid[0] for trid in trids_out_nopert]
for idxs in list_idxs_output]
else:
inputs_sorted = [inputs_all]
outputs_sorted = [outputs_all]
in_types_sorted = [in_types_all]
out_types_sorted = [out_types_all]
# Run the main code for all group of samples
for inputs, outputs, in_types, out_types in \
zip(inputs_sorted, outputs_sorted,
in_types_sorted, out_types_sorted):
# Starts reshaping the datastore
df_all_isos = {trid[1]: datastore_in[trid][di]
for trid in zip(in_types, inputs)}
dfspec = df_all_isos[inputs[0]]
# Compute ratios from concentrations
sub_outputs = [(sign, outputs[-1]) for sign in outputs[:-1]]
for sub_output in sub_outputs:
sign_id = sub_output[0]
spec_id = sub_output[1]
sign_ref = sign_id.split("__sample#")[0] \
if "__sample#" in sign_id else sign_id
sign_attr = getattr(transform.parameters_out, sign_ref)
r_std = sign_attr.standard
refs = sign_attr.refs
isotopologues = sign_attr.isotopologues
# Add sampling suffix if samples
if "__sample#" in sign_id:
sample_id = int(sign_id.split("__sample#")[1])
sample_id_str = "__sample#{:03d}".format(sample_id)
refs = [r + sample_id_str
for r in refs]
isotopologues = [iso + sample_id_str
for iso in isotopologues]
ref_sims = sum([df_all_isos[ref]["maindata"][["spec", "incr"]]
for ref in refs])
isotopologue_sims = sum([df_all_isos[iso]["maindata"][["spec", "incr"]]
for iso in isotopologues])
# Skip if no signatures to simulate
mask_sign = dfspec["metadata"]["parameter_ref"] == sign_id.lower()
if np.sum(mask_sign) == 0:
continue
# Copy output datastores
ref_datastore_sign = copy.deepcopy(dfspec.loc[mask_sign])
comp_out = out_types[outputs.index(sign_id)]
datastore_out[(comp_out, sign_id)][di] = ref_datastore_sign
# Fill spec/incr values
ref_sim = ref_sims.loc[mask_sign, "spec"]
isotopologue_sim = isotopologue_sims.loc[mask_sign, "spec"]
sign_sim = isotopologue_sim / ref_sim
sign_sim /= r_std
sign_sim = (sign_sim - 1) * 1000
dfout_iso = datastore_out[(comp_out, sign_id)][di]
dfout_iso.loc[:, ("maindata", "spec")] = sign_sim
# Applying tangent-linear
if mode == "tl":
ref_sim_tl = ref_sims.loc[mask_sign, "incr"]
isotopologue_sim_tl = isotopologue_sims.loc[mask_sign, "incr"]
sign_sim_tl = (isotopologue_sim_tl / ref_sim -
isotopologue_sim / ref_sim ** 2 * ref_sim_tl)
sign_sim_tl /= r_std / 1000
dfout_iso.loc[:, ("maindata", "incr")] = sign_sim_tl
# Compute sum of isotopologues
spec_sims = sum([df_all_isos[s]["maindata"][["spec", "incr"]]
for s in inputs])
spec_id = outputs[-1]
comp = out_types[-1]
# Skip if no full concentrations
mask_spec = dfspec["metadata"]["parameter_ref"] == spec_id.lower()
if np.sum(mask_spec) == 0:
continue
ref_datastore_spec = copy.deepcopy(dfspec.loc[mask_spec])
datastore_out[(comp, spec_id)][di] = ref_datastore_spec
dfout_ref = datastore_out[(comp, spec_id)][di]
dfout_ref.loc[:, ("maindata", "spec")] = \
spec_sims.loc[mask_spec, "spec"].values
if mode == "tl":
dfout_ref.loc[:, ("maindata", "incr")] = \
spec_sims.loc[mask_spec, "incr"].values