Source code for pycif.plugins.modes.analytic.basefunctions

import copy
import os
import glob
import re
import time
from ....utils import path
from logging import info
from ....utils.yml import ordered_dump


[docs] def check_base_functions(base_dir, statedim): """Return the list of base-function indices that have not yet been computed. Args: base_dir (str): directory where base function obs-vectors are stored (``obsvect_NNNN`` sub-directories). statedim (int): total number of control-vector dimensions. Returns: list[int]: indices ``i`` in ``range(statedim)`` for which ``obsvect_NNNN`` is absent from *base_dir*. """ list_monitor = glob.glob("{}/obsvect_*".format(base_dir)) list_monitor.sort() regex = re.compile(os.path.join(base_dir, "obsvect_([0-9]*)")) list_IDs = [int(regex.findall(m)[0]) for m in list_monitor] return [i for i in range(statedim) if i not in list_IDs]
[docs] def compute_base_functions(self, controlvect, list_base_functions, dryrun, sequential): """Submit one forward pyCIF run per missing base-function dimension. For each index in *list_base_functions*, sets the prior control vector to a Dirac vector (all zeros except dimension *i* = 1), dumps a tailored YAML configuration, and submits the run via the platform plugin. Waits for all jobs to finish, then moves the resulting obs-vector directories into the H-matrix sub-directory. Args: self (Plugin): mode plugin providing ``workdir``, ``platform``, and ``dump_nc_base_control`` attributes. controlvect: control vector plugin (modified in-place; restored to its original ``xb`` on return). list_base_functions (list[int]): indices of the control-vector dimensions for which base functions must be (re-)computed. dryrun (bool): if ``True``, submit only the first dimension to estimate the forward run cost, then return. sequential (bool): if ``True``, wait for each job to finish before submitting the next one. """ workdir = controlvect.workdir platform = self.platform # Save Xb for later controlvect.xb_ref = copy.deepcopy(controlvect.xb) # Loop over state vector dimensions list_jobs = [] for idim in list_base_functions: controlvect.xb[:] = 0. controlvect.xb[idim] = 1. # Dumps Dirac controlvect base_dir = "{}/base_functions/base_{:04d}/".format(workdir, idim) path.init_dir(base_dir) controlvect.dump( "{}/controlvect.pickle".format(base_dir), ) # Updating configuration dictionary yml_dict = \ self.from_yaml(self.reference_instances["reference_setup"].def_file) yml_dict.update( {"workdir": base_dir, "mode": {"plugin": {"name": "forward", "version": "std"}} } ) new_controlvect = { "plugin": {"name": "standard", "version": "std"}, "reload_xb": True, "reload_file": "{}/controlvect.pickle".format(base_dir), "save_out_netcdf": self.dump_nc_base_control, } yml_dict["controlvect"] = { **yml_dict.get("controlvect", {}), **new_controlvect } # Dumps new yml file yml_file = "{}/config_base_{:04d}.yml".format(base_dir, idim) with open(yml_file, "w") as outfile: ordered_dump(outfile, yml_dict) # Run the base function as an independent process job_file = os.path.join(base_dir, "job_pycif_base_{:04d}".format(idim)) info("Submitting base function {} from {}" .format(idim, len(list_base_functions))) # Submit the job only if not doing a dry run # In the case of a dry run, submits only the first dimension # to have an idea on how long last a forward if (not dryrun) or (dryrun and idim == 0): job_id = platform.submit_job( "{} -m pycif {}".format(platform.python, yml_file), job_file ) list_jobs.append(job_id) if sequential : while not platform.check_jobs(list_jobs): time.sleep(platform.sleep_time) # Check that jobs are over while not platform.check_jobs(list_jobs): time.sleep(platform.sleep_time) # Move monitors for idim in list_base_functions: base_dir = "{}/base_functions/base_{:04d}/".format(workdir, idim) os.system("mv {basedir}/obsoperator/fwd_0000/obsvect " "{basedir}/../H_matrix/obsvect_{:04d}" .format(idim, basedir=base_dir))