Source code for pycif.plugins.transforms.complex.conc2ratio.forward

import copy
import os
from pathlib import Path

import numpy as np

from .....utils.datastores.dump import dump_datastore
from .....utils.path import init_dir


[docs] def forward( transform, inout_datastore, controlvect, obsvect, mapper, di, df, mode, runsubdir, workdir, onlyinit=False, **kwargs ): if onlyinit: return ddi = min(di, df) trids_in = list(mapper["inputs"].keys()) trids_out = list(mapper["outputs"].keys()) inputs_all = [trid[1] for trid in trids_in] outputs_all = [trid[1] for trid in trids_out] in_types_all = [trid[0] for trid in trids_in] out_types_all = [trid[0] for trid in trids_out] datastore_in = inout_datastore["inputs"] datastore_out = inout_datastore["outputs"] # Save data for later use by adjoint dir_fwd = ddi.strftime("{}/../chain/isotopes".format(runsubdir)) if not os.path.isdir(dir_fwd): init_dir(dir_fwd) for comp, spec in zip(in_types_all, inputs_all): # Reload parameter metadata from dry run metadata datastore_in[(comp, spec)][di][("metadata", "parameter")] = \ transform.metadata[ddi][(comp, spec)]["parameter"] datastore_in[(comp, spec)][di][("metadata", "parameter_ref")] = \ transform.metadata[ddi][(comp, spec)]["parameter_ref"] # Save data for later use by adjoint file_fwd_data = Path(runsubdir).parent.joinpath( "chain", "conc2ratio", ddi.strftime( f"{transform.orig_name}_fwd_obsvect_{comp}_{spec}_%Y%m%d%H%M.nc" ), ) dump_datastore( datastore_in[(comp, spec)][di], file_monit=str(file_fwd_data), dump_default=False, col2dump=[("metadata", "parameter"), ("metadata", "parameter_ref"), ("maindata", "spec"), ("maindata", "incr")], mode="w", ) if any("__sample#" in inp for inp in inputs_all): trids_in_pert = [trid for trid in trids_in if "__sample#" in trid[1]] trids_out_pert = [trid for trid in trids_out if "__sample#" in trid[1]] trids_in_nopert = [ trid for trid in trids_in if not "__sample#" in trid[1]] trids_out_nopert = [ trid for trid in trids_out if not "__sample#" in trid[1]] nsamples = max([int(trid[1].split("__sample#")[1]) for trid in trids_in_pert]) + 1 list_idxs_input = [] list_idxs_output = [] for isample in range(nsamples): sample_id_str = "__sample#{:03d}".format(isample) list_idxs_input.append([idx for idx, trid in enumerate(trids_in_pert) if sample_id_str in trid[1]]) list_idxs_output.append([idx for idx, trid in enumerate(trids_out_pert) if sample_id_str in trid[1]]) inputs_sorted = [[trids_in_pert[idx][1] for idx in idxs] + [trid[1] for trid in trids_in_nopert] for idxs in list_idxs_input] outputs_sorted = [[trids_out_pert[idx][1] for idx in idxs] + [trid[1] for trid in trids_out_nopert] for idxs in list_idxs_output] in_types_sorted = [[trids_in_pert[idx][0] for idx in idxs] + [trid[0] for trid in trids_in_nopert] for idxs in list_idxs_input] out_types_sorted = [[trids_out_pert[idx][0] for idx in idxs] + [trid[0] for trid in trids_out_nopert] for idxs in list_idxs_output] else: inputs_sorted = [inputs_all] outputs_sorted = [outputs_all] in_types_sorted = [in_types_all] out_types_sorted = [out_types_all] # Run the main code for all group of samples for inputs, outputs, in_types, out_types in \ zip(inputs_sorted, outputs_sorted, in_types_sorted, out_types_sorted): # Starts reshaping the datastore df_all_isos = {trid[1]: datastore_in[trid][di] for trid in zip(in_types, inputs)} dfspec = df_all_isos[inputs[0]] # Compute ratios from concentrations sub_outputs = [(sign, outputs[-1]) for sign in outputs[:-1]] for sub_output in sub_outputs: sign_id = sub_output[0] spec_id = sub_output[1] sign_ref = sign_id.split("__sample#")[0] \ if "__sample#" in sign_id else sign_id sign_attr = getattr(transform.parameters_out, sign_ref) r_std = sign_attr.standard refs = sign_attr.refs isotopologues = sign_attr.isotopologues # Add sampling suffix if samples if "__sample#" in sign_id: sample_id = int(sign_id.split("__sample#")[1]) sample_id_str = "__sample#{:03d}".format(sample_id) refs = [r + sample_id_str for r in refs] isotopologues = [iso + sample_id_str for iso in isotopologues] ref_sims = sum([df_all_isos[ref]["maindata"][["spec", "incr"]] for ref in refs]) isotopologue_sims = sum([df_all_isos[iso]["maindata"][["spec", "incr"]] for iso in isotopologues]) # Skip if no signatures to simulate mask_sign = dfspec["metadata"]["parameter_ref"] == sign_id.lower() if np.sum(mask_sign) == 0: continue # Copy output datastores ref_datastore_sign = copy.deepcopy(dfspec.loc[mask_sign]) comp_out = out_types[outputs.index(sign_id)] datastore_out[(comp_out, sign_id)][di] = ref_datastore_sign # Fill spec/incr values ref_sim = ref_sims.loc[mask_sign, "spec"] isotopologue_sim = isotopologue_sims.loc[mask_sign, "spec"] sign_sim = isotopologue_sim / ref_sim sign_sim /= r_std sign_sim = (sign_sim - 1) * 1000 dfout_iso = datastore_out[(comp_out, sign_id)][di] dfout_iso.loc[:, ("maindata", "spec")] = sign_sim # Applying tangent-linear if mode == "tl": ref_sim_tl = ref_sims.loc[mask_sign, "incr"] isotopologue_sim_tl = isotopologue_sims.loc[mask_sign, "incr"] sign_sim_tl = (isotopologue_sim_tl / ref_sim - isotopologue_sim / ref_sim ** 2 * ref_sim_tl) sign_sim_tl /= r_std / 1000 dfout_iso.loc[:, ("maindata", "incr")] = sign_sim_tl # Compute sum of isotopologues spec_sims = sum([df_all_isos[s]["maindata"][["spec", "incr"]] for s in inputs]) spec_id = outputs[-1] comp = out_types[-1] # Skip if no full concentrations mask_spec = dfspec["metadata"]["parameter_ref"] == spec_id.lower() if np.sum(mask_spec) == 0: continue ref_datastore_spec = copy.deepcopy(dfspec.loc[mask_spec]) datastore_out[(comp, spec_id)][di] = ref_datastore_spec dfout_ref = datastore_out[(comp, spec_id)][di] dfout_ref.loc[:, ("maindata", "spec")] = \ spec_sims.loc[mask_spec, "spec"].values if mode == "tl": dfout_ref.loc[:, ("maindata", "incr")] = \ spec_sims.loc[mask_spec, "incr"].values