Source code for pycif.plugins.modes.response_functions.execute

import copy
import os
from itertools import chain
from logging import info

import numpy as np

from ....utils.iterators import iter_attributes
from .analytical_inversion import analytical_inversion
from .base_function import dump_yaml_config
from .h_matrix import build_h, dump_obsvect_decomp, fill_obsvect, read_h_matrix
from .init_base_functions import init_base_functions
from .jobs import run_jobs_in_batches
from .ref_forward import run_ref_forward


[docs] def execute(self, **kwargs): """Run the response-functions mode: compute H-matrix columns and optionally invert. Orchestrates the full response-function workflow: 1. Optionally runs a reference forward simulation to populate the obs-vector ``'sim'`` column. 2. Initialises and submits one pyCIF forward run per control-vector dimension (the base functions / response functions). 3. Assembles the resulting H matrix. 4. If ``analytical_inversion`` is set, performs a direct Bayesian inversion ``xa = xb + K(y − H·xb)`` and dumps the posterior. Args: self (Plugin): mode plugin carrying all configuration attributes (``workdir``, ``datei``, ``datef``, ``run_mode``, ``dryrun``, ``analytical_inversion``, ``reload_h_matrix``, etc.). **kwargs: forwarded verbatim to the observation operator. Returns: ObsVect: the observation vector populated with simulated values (prior and/or posterior, depending on ``analytical_inversion``). """ # Checking the observation vector first # if not np.all(np.isfinite(self.obsvect.yobs)): # raise ValueError("there are NaNs in the observation vector") # if not np.all(np.isfinite(self.obsvect.yobs_err)): # raise ValueError("there are NaNs in the observation error vector") # Saving the control vector for analytical inversion xb_ref = copy.deepcopy(self.controlvect.xb) # Saving initial conditions tracers dates inicond_dates_ref = {} for tracer_name, tracer in iter_attributes(self.inicond.parameters): inicond_dates_ref[tracer_name] = tracer.dates.copy() # Setting prior as first 'x' vector if hasattr(self.controlvect, 'xb'): self.controlvect.x = copy.deepcopy(self.controlvect.xb) if self.run_reference_forward: self.ref_fwd_dir = run_ref_forward(self) else: self.obsvect.dump(self.dir_obsvect) # Initializing the base functions base_function_list = init_base_functions(self) if hasattr(self, 'reload_h_matrix') and not self.dryrun: # Loading the H matrix from the provided paths h_matrix = read_h_matrix(self, self.reload_h_matrix) else: # Running base function in job batches run_jobs_in_batches(self, base_function_list) # Stop here for dryrun if self.dryrun: return self.obsvect for base_func in base_function_list: # Extracting the base functions outputs if not base_func.is_ignored(): base_func.fetch_obsvect() # Flushing base functions 'controlvect.pickle' file and 'chain' dir if self.autoflush: base_func.flush() # Stop here with "first_period_only" argument if self.first_period_only: return self.obsvect if self.use_batch_sampling: # From base functions batches to individual base functions base_function_list = chain.from_iterable( base_func.iter_all() for base_func in base_function_list) # type: ignore h_matrix = build_h(self, base_function_list) # Restoring the datavect initial conditions component that could have been # altered by the response functions for tracer_name, tracer in iter_attributes(self.inicond.parameters): tracer.dates = inicond_dates_ref[tracer_name] # Dumping observation vector with summed response function contributions fill_obsvect(self, h_matrix, xb_ref) obsvectdir = os.path.join(self.dir_obsvect) os.makedirs(obsvectdir, exist_ok=True) self.obsvect.dump(obsvectdir) # Dumping individual response function contributions decompdir = os.path.join(self.basedir, "decomposition") if self.dump_obsvect_decompostion: dump_obsvect_decomp(self, h_matrix, decompdir) # Dumping the H matrix analytical_inversion(self, xb_ref, h_matrix, self.controlvect, self.obsvect) return self.obsvect