Source code for pycif.plugins.transforms.complex.conc2ratio.forward

import copy
import os
from pathlib import Path

import numpy as np

from .....utils.datastores.dump import dump_datastore
from .....utils.path import init_dir


[docs] def forward( transform, inout_datastore, controlvect, obsvect, mapper, di, df, mode, runsubdir, workdir, onlyinit=False, **kwargs ): """Convert isotopologue concentrations to per-mil isotopic ratios. For each δ-value output declared in the mapper, computes: .. math:: \\delta = \\left(\\frac{\\sum_{iso}[iso]}{\\sum_{ref}[ref]} \\cdot \\frac{1}{R_{std}} - 1\\right) \\times 1000 The total concentration output is the sum of all input isotopologues. In TL mode the same linear formula is applied to increments. The input concentrations at the current date are saved to ``chain/conc2ratio/`` so the adjoint can reload them. Supports ensemble (batch sampling) runs: ``__sample#N`` tracers are grouped and processed together to avoid memory bloat. Args: transform (Plugin): conc2ratio plugin instance (carries ``parameters_out`` with ``standard``, ``refs``, and ``isotopologues`` attributes per signature, and ``metadata`` for parameter-name tracking). inout_datastore (dict): mutable datastore. controlvect: unused. obsvect: unused. mapper (dict): transform mapper. di (datetime): sub-simulation start date. df (datetime): sub-simulation end date. mode (str): ``'fwd'`` or ``'tl'``. runsubdir (str): sub-simulation run directory (used to locate the ``chain/`` directory for saving forward data). workdir (str): root working directory. onlyinit (bool): if ``True``, return immediately. **kwargs: forwarded to downstream operations. """ if onlyinit: return ddi = min(di, df) trids_in = list(mapper["inputs"].keys()) trids_out = list(mapper["outputs"].keys()) inputs_all = [trid[1] for trid in trids_in] outputs_all = [trid[1] for trid in trids_out] in_types_all = [trid[0] for trid in trids_in] out_types_all = [trid[0] for trid in trids_out] datastore_in = inout_datastore["inputs"] datastore_out = inout_datastore["outputs"] # Save data for later use by adjoint dir_fwd = ddi.strftime(f"{runsubdir}/../chain/isotopes") if not os.path.isdir(dir_fwd): init_dir(dir_fwd) for comp, spec in zip(in_types_all, inputs_all): # Reload parameter metadata from dry run metadata datastore_in[(comp, spec)][di][("metadata", "parameter")] = \ transform.metadata[ddi][(comp, spec)]["parameter"] datastore_in[(comp, spec)][di][("metadata", "parameter_ref")] = \ transform.metadata[ddi][(comp, spec)]["parameter_ref"] # Save data for later use by adjoint file_fwd_data = Path(runsubdir).parent.joinpath( "chain", "conc2ratio", ddi.strftime( f"{transform.orig_name}_fwd_obsvect_{comp}_{spec}_%Y%m%d%H%M.nc" ), ) dump_datastore( datastore_in[(comp, spec)][di], file_monit=str(file_fwd_data), dump_default=False, col2dump=[("metadata", "parameter"), ("metadata", "parameter_ref"), ("maindata", "spec"), ("maindata", "incr")], mode="w", ) if any("__sample#" in inp for inp in inputs_all): trids_in_pert = [trid for trid in trids_in if "__sample#" in trid[1]] trids_out_pert = [trid for trid in trids_out if "__sample#" in trid[1]] trids_in_nopert = [ trid for trid in trids_in if not "__sample#" in trid[1]] trids_out_nopert = [ trid for trid in trids_out if not "__sample#" in trid[1]] nsamples = max([int(trid[1].split("__sample#")[1]) for trid in trids_in_pert]) + 1 list_idxs_input = [] list_idxs_output = [] for isample in range(nsamples): sample_id_str = f"__sample#{isample:03d}" list_idxs_input.append([idx for idx, trid in enumerate(trids_in_pert) if sample_id_str in trid[1]]) list_idxs_output.append([idx for idx, trid in enumerate(trids_out_pert) if sample_id_str in trid[1]]) inputs_sorted = [[trids_in_pert[idx][1] for idx in idxs] + [trid[1] for trid in trids_in_nopert] for idxs in list_idxs_input] outputs_sorted = [[trids_out_pert[idx][1] for idx in idxs] + [trid[1] for trid in trids_out_nopert] for idxs in list_idxs_output] in_types_sorted = [[trids_in_pert[idx][0] for idx in idxs] + [trid[0] for trid in trids_in_nopert] for idxs in list_idxs_input] out_types_sorted = [[trids_out_pert[idx][0] for idx in idxs] + [trid[0] for trid in trids_out_nopert] for idxs in list_idxs_output] else: inputs_sorted = [inputs_all] outputs_sorted = [outputs_all] in_types_sorted = [in_types_all] out_types_sorted = [out_types_all] # Run the main code for all group of samples for inputs, outputs, in_types, out_types in \ zip(inputs_sorted, outputs_sorted, in_types_sorted, out_types_sorted): # Starts reshaping the datastore df_all_isos = {trid[1]: datastore_in[trid][di] for trid in zip(in_types, inputs)} dfspec = df_all_isos[inputs[0]] # Compute ratios from concentrations sub_outputs = [(sign, outputs[-1]) for sign in outputs[:-1]] for sub_output in sub_outputs: sign_id = sub_output[0] spec_id = sub_output[1] sign_ref = sign_id.split("__sample#")[0] \ if "__sample#" in sign_id else sign_id sign_attr = getattr(transform.parameters_out, sign_ref) r_std = sign_attr.standard refs = sign_attr.refs isotopologues = sign_attr.isotopologues # Add sampling suffix if samples if "__sample#" in sign_id: sample_id = int(sign_id.split("__sample#")[1]) sample_id_str = f"__sample#{sample_id:03d}" refs = [r + sample_id_str for r in refs] isotopologues = [iso + sample_id_str for iso in isotopologues] ref_sims = sum([df_all_isos[ref]["maindata"][["spec", "incr"]] for ref in refs]) isotopologue_sims = sum([df_all_isos[iso]["maindata"][["spec", "incr"]] for iso in isotopologues]) # Skip if no signatures to simulate mask_sign = dfspec["metadata"]["parameter_ref"] == sign_id.lower() if np.sum(mask_sign) == 0: continue # Copy output datastores ref_datastore_sign = copy.deepcopy(dfspec.loc[mask_sign]) comp_out = out_types[outputs.index(sign_id)] datastore_out[(comp_out, sign_id)][di] = ref_datastore_sign # Fill spec/incr values ref_sim = ref_sims.loc[mask_sign, "spec"] isotopologue_sim = isotopologue_sims.loc[mask_sign, "spec"] sign_sim = isotopologue_sim / ref_sim sign_sim /= r_std sign_sim = (sign_sim - 1) * 1000 dfout_iso = datastore_out[(comp_out, sign_id)][di] dfout_iso.loc[:, ("maindata", "spec")] = sign_sim # Applying tangent-linear if mode == "tl": ref_sim_tl = ref_sims.loc[mask_sign, "incr"] isotopologue_sim_tl = isotopologue_sims.loc[mask_sign, "incr"] sign_sim_tl = (isotopologue_sim_tl / ref_sim - isotopologue_sim / ref_sim ** 2 * ref_sim_tl) sign_sim_tl /= r_std / 1000 dfout_iso.loc[:, ("maindata", "incr")] = sign_sim_tl # Compute sum of isotopologues spec_sims = sum([df_all_isos[s]["maindata"][["spec", "incr"]] for s in inputs]) spec_id = outputs[-1] comp = out_types[-1] # Skip if no full concentrations mask_spec = dfspec["metadata"]["parameter_ref"] == spec_id.lower() if np.sum(mask_spec) == 0: continue ref_datastore_spec = copy.deepcopy(dfspec.loc[mask_spec]) datastore_out[(comp, spec_id)][di] = ref_datastore_spec dfout_ref = datastore_out[(comp, spec_id)][di] dfout_ref.loc[:, ("maindata", "spec")] = \ spec_sims.loc[mask_spec, "spec"].values if mode == "tl": dfout_ref.loc[:, ("maindata", "incr")] = \ spec_sims.loc[mask_spec, "incr"].values