Source code for pycif.plugins.modes.response_functions.execute
import copy
import os
from itertools import chain
from logging import info
import numpy as np
from ....utils.iterators import iter_attributes
from .analytical_inversion import analytical_inversion
from .base_function import dump_yaml_config
from .h_matrix import build_h, dump_obsvect_decomp, fill_obsvect, read_h_matrix
from .init_base_functions import init_base_functions
from .jobs import run_jobs_in_batches
from .ref_forward import run_ref_forward
[docs]
def execute(self, **kwargs):
"""Run the response-functions mode: compute H-matrix columns and optionally invert.
Orchestrates the full response-function workflow:
1. Optionally runs a reference forward simulation to populate the
obs-vector ``'sim'`` column.
2. Initialises and submits one pyCIF forward run per control-vector
dimension (the base functions / response functions).
3. Assembles the resulting H matrix.
4. If ``analytical_inversion`` is set, performs a direct Bayesian
inversion ``xa = xb + K(y − H·xb)`` and dumps the posterior.
Args:
self (Plugin): mode plugin carrying all configuration attributes
(``workdir``, ``datei``, ``datef``, ``run_mode``, ``dryrun``,
``analytical_inversion``, ``reload_h_matrix``, etc.).
**kwargs: forwarded verbatim to the observation operator.
Returns:
ObsVect: the observation vector populated with simulated values
(prior and/or posterior, depending on ``analytical_inversion``).
"""
# Checking the observation vector first
# if not np.all(np.isfinite(self.obsvect.yobs)):
# raise ValueError("there are NaNs in the observation vector")
# if not np.all(np.isfinite(self.obsvect.yobs_err)):
# raise ValueError("there are NaNs in the observation error vector")
# Saving the control vector for analytical inversion
xb_ref = copy.deepcopy(self.controlvect.xb)
# Saving initial conditions tracers dates
inicond_dates_ref = {}
for tracer_name, tracer in iter_attributes(self.inicond.parameters):
inicond_dates_ref[tracer_name] = tracer.dates.copy()
# Setting prior as first 'x' vector
if hasattr(self.controlvect, 'xb'):
self.controlvect.x = copy.deepcopy(self.controlvect.xb)
if self.run_reference_forward:
self.ref_fwd_dir = run_ref_forward(self)
else:
self.obsvect.dump(self.dir_obsvect)
# Initializing the base functions
base_function_list = init_base_functions(self)
if hasattr(self, 'reload_h_matrix') and not self.dryrun:
# Loading the H matrix from the provided paths
h_matrix = read_h_matrix(self, self.reload_h_matrix)
else:
# Running base function in job batches
run_jobs_in_batches(self, base_function_list)
# Stop here for dryrun
if self.dryrun:
return self.obsvect
for base_func in base_function_list:
# Extracting the base functions outputs
if not base_func.is_ignored():
base_func.fetch_obsvect()
# Flushing base functions 'controlvect.pickle' file and 'chain' dir
if self.autoflush:
base_func.flush()
# Stop here with "first_period_only" argument
if self.first_period_only:
return self.obsvect
if self.use_batch_sampling:
# From base functions batches to individual base functions
base_function_list = chain.from_iterable(
base_func.iter_all() for base_func in base_function_list) # type: ignore
h_matrix = build_h(self, base_function_list)
# Restoring the datavect initial conditions component that could have been
# altered by the response functions
for tracer_name, tracer in iter_attributes(self.inicond.parameters):
tracer.dates = inicond_dates_ref[tracer_name]
# Dumping observation vector with summed response function contributions
fill_obsvect(self, h_matrix, xb_ref)
obsvectdir = os.path.join(self.dir_obsvect)
os.makedirs(obsvectdir, exist_ok=True)
self.obsvect.dump(obsvectdir)
# Dumping individual response function contributions
decompdir = os.path.join(self.basedir, "decomposition")
if self.dump_obsvect_decompostion:
dump_obsvect_decomp(self, h_matrix, decompdir)
# Dumping the H matrix
analytical_inversion(self, xb_ref, h_matrix,
self.controlvect, self.obsvect)
return self.obsvect