Source code for pycif.plugins.modes.analytic.basefunctions
import copy
import os
import glob
import re
import time
from ....utils import path
from logging import info
from ....utils.yml import ordered_dump
[docs]
def check_base_functions(base_dir, statedim):
"""Return the list of base-function indices that have not yet been computed.
Args:
base_dir (str): directory where base function obs-vectors are stored
(``obsvect_NNNN`` sub-directories).
statedim (int): total number of control-vector dimensions.
Returns:
list[int]: indices ``i`` in ``range(statedim)`` for which
``obsvect_NNNN`` is absent from *base_dir*.
"""
list_monitor = glob.glob("{}/obsvect_*".format(base_dir))
list_monitor.sort()
regex = re.compile(os.path.join(base_dir,
"obsvect_([0-9]*)"))
list_IDs = [int(regex.findall(m)[0]) for m in list_monitor]
return [i for i in range(statedim) if i not in list_IDs]
[docs]
def compute_base_functions(self, controlvect, list_base_functions, dryrun, sequential):
"""Submit one forward pyCIF run per missing base-function dimension.
For each index in *list_base_functions*, sets the prior control vector to
a Dirac vector (all zeros except dimension *i* = 1), dumps a tailored YAML
configuration, and submits the run via the platform plugin. Waits for all
jobs to finish, then moves the resulting obs-vector directories into the
H-matrix sub-directory.
Args:
self (Plugin): mode plugin providing ``workdir``, ``platform``, and
``dump_nc_base_control`` attributes.
controlvect: control vector plugin (modified in-place; restored to its
original ``xb`` on return).
list_base_functions (list[int]): indices of the control-vector
dimensions for which base functions must be (re-)computed.
dryrun (bool): if ``True``, submit only the first dimension to estimate
the forward run cost, then return.
sequential (bool): if ``True``, wait for each job to finish before
submitting the next one.
"""
workdir = controlvect.workdir
platform = self.platform
# Save Xb for later
controlvect.xb_ref = copy.deepcopy(controlvect.xb)
# Loop over state vector dimensions
list_jobs = []
for idim in list_base_functions:
controlvect.xb[:] = 0.
controlvect.xb[idim] = 1.
# Dumps Dirac controlvect
base_dir = "{}/base_functions/base_{:04d}/".format(workdir, idim)
path.init_dir(base_dir)
controlvect.dump(
"{}/controlvect.pickle".format(base_dir),
)
# Updating configuration dictionary
yml_dict = \
self.from_yaml(self.reference_instances["reference_setup"].def_file)
yml_dict.update(
{"workdir": base_dir,
"mode": {"plugin": {"name": "forward", "version": "std"}}
}
)
new_controlvect = {
"plugin": {"name": "standard", "version": "std"},
"reload_xb": True,
"reload_file": "{}/controlvect.pickle".format(base_dir),
"save_out_netcdf": self.dump_nc_base_control,
}
yml_dict["controlvect"] = {
**yml_dict.get("controlvect", {}),
**new_controlvect
}
# Dumps new yml file
yml_file = "{}/config_base_{:04d}.yml".format(base_dir, idim)
with open(yml_file, "w") as outfile:
ordered_dump(outfile, yml_dict)
# Run the base function as an independent process
job_file = os.path.join(base_dir, "job_pycif_base_{:04d}".format(idim))
info("Submitting base function {} from {}"
.format(idim, len(list_base_functions)))
# Submit the job only if not doing a dry run
# In the case of a dry run, submits only the first dimension
# to have an idea on how long last a forward
if (not dryrun) or (dryrun and idim == 0):
job_id = platform.submit_job(
"{} -m pycif {}".format(platform.python, yml_file),
job_file
)
list_jobs.append(job_id)
if sequential :
while not platform.check_jobs(list_jobs):
time.sleep(platform.sleep_time)
# Check that jobs are over
while not platform.check_jobs(list_jobs):
time.sleep(platform.sleep_time)
# Move monitors
for idim in list_base_functions:
base_dir = "{}/base_functions/base_{:04d}/".format(workdir, idim)
os.system("mv {basedir}/obsoperator/fwd_0000/obsvect "
"{basedir}/../H_matrix/obsvect_{:04d}"
.format(idim, basedir=base_dir))