import copy
import multiprocessing
import os
import subprocess
import time
from logging import debug, info
import pandas as pd
from ....utils import path
from ....utils.dates import date_range
from ....utils.yml import ordered_dump
from .serial import obsoper_serial
from .flushrun import flushrun
[docs]
def run_pycif_in_subprocess(python_path, yaml_path):
"""Run a pyCIF configuration file in a blocking subprocess.
Launches ``python_path -m pycif yaml_path``, redirecting stdout to
``subprocess_stdout.log`` and stderr to ``subprocess_stderr.log`` in the
same directory as *yaml_path*.
Args:
python_path (str): path to the Python interpreter (e.g.
``self.platform.python``).
yaml_path (str): absolute path to the pyCIF YAML configuration file
to execute.
Raises:
RuntimeError: if the subprocess exits with a non-zero return code.
"""
base_dir = os.path.dirname(yaml_path)
out_filepath = os.path.join(base_dir, "subprocess_stdout.log")
err_filepath = os.path.join(base_dir, "subprocess_stderr.log")
with open(out_filepath, "w") as out_file, open(err_filepath, "w") as err_file:
info(f"Running pycif in '{base_dir}'")
process = subprocess.Popen(
[python_path, "-m", "pycif", yaml_path],
stdout=out_file,
stderr=err_file,
cwd=base_dir
)
process.wait()
debug(
f"pycif exited with a return code {process.returncode} in '{base_dir}'")
if process.returncode:
raise RuntimeError(
f"pycif subprocess did not run properly in '{base_dir}'")
[docs]
def obsoper_parallel(self, controlvect, obsvect, rundir, mode, workdir,
check_transforms, ignore_exceptions):
"""Run the observation operator in parallel over independent time segments.
Splits the simulation window ``[self.datei, self.datef]`` into segments
of length ``self.parallel.segments`` with optional boundary overlap
``self.parallel.overlap``, then runs each segment independently — either
as subprocesses (``self.parallel.subprocess = True``) or as HPC jobs via
the ``platform`` plugin.
Each segment is configured via a freshly dumped YAML file that restricts
the ``approx_operator`` window to its date range, then executed with
:func:`run_pycif_in_subprocess` or ``self.platform.submit_job``.
After all segments finish, their outputs are reassembled:
* ``'tl'`` mode — ``obsvect.ysim`` and ``obsvect.dy`` are set to the
element-wise sums over all segment observation vectors.
* ``'adj'`` mode — ``controlvect.dx`` is set to the element-wise sum
over all segment adjoint sensitivities; ``controlvect.x`` and
``controlvect.xb`` are reset to their pre-run values.
Args:
self (ObsOperator): the obs-operator plugin instance. Must have
``self.parallel`` (with ``segments``, ``overlap``, ``subprocess``
attributes), ``self.datei``, ``self.datef``, ``self.ref_fwd_dir``,
and ``self.platform`` set.
controlvect (ControlVect): control-vector object.
obsvect (ObsVect): observation-vector object.
rundir (str): the run sub-directory for this operator call.
mode (str): execution mode — ``'tl'`` or ``'adj'``.
(Forward mode is always dispatched to serial execution.)
workdir (str): parent working directory.
check_transforms (bool): if ``True``, validate each segment's
transform adjoint / TL identity.
ignore_exceptions (bool): if ``True``, non-fatal transform errors
inside segments are swallowed.
Raises:
RuntimeError: if a subprocess-based segment exits with a non-zero
return code (propagated from :func:`run_pycif_in_subprocess`).
"""
# Run reference forward run if not already done
if not os.path.isdir(self.ref_fwd_dir):
info(f"directory '{self.ref_fwd_dir}' not found, "
"running the reference forward run")
fwd_dir = os.path.join(rundir, "ref_forward")
path.init_dir(os.path.join(fwd_dir, "chain"))
obsoper_serial(self, controlvect, obsvect, fwd_dir, "fwd", workdir,
check_transforms, ignore_exceptions)
self.ref_fwd_dir = fwd_dir
# Save Xb for later and update xb with current x for running the forward
controlvect.xb_ref = copy.deepcopy(controlvect.xb)
controlvect.x_ref = copy.deepcopy(controlvect.x)
# List of segments
parallel = self.parallel
segments = parallel.segments
overlap = parallel.overlap
list_segments = date_range(self.datei, self.datef, segments)
list_segments_datei = [
(ddi - pd.tseries.frequencies.to_offset(overlap)).to_pydatetime()
for ddi in list_segments[:-1]
]
list_segments_datei[0] = list_segments[0]
list_segments_datef = list_segments[1:]
list_segments_basedir = [
os.path.join(rundir, ddi.strftime("parallel/segment_%Y%m%d-%H%M"))
for ddi in list_segments_datei
]
# Dumps controlvect value
controlvect_path = os.path.join(rundir, "parallel/controlvect.pickle")
path.init_dir(os.path.dirname(controlvect_path))
controlvect.dump(controlvect_path)
list_yaml_files = []
for ddi, ddf, base_dir in zip(
list_segments_datei, list_segments_datef, list_segments_basedir):
# Creates base directory
path.init_dir(base_dir)
# Dumps controlvect value
dir_obsvect = os.path.join(base_dir, "obsvect")
obsvect.dump(dir_obsvect)
# Updating configuration dictionary
if mode == "tl":
run_mode = {"plugin": {"name": "forward", "version": "std"},
"run_mode": "tl",
"use_xb": False}
elif mode == "adj":
run_mode = {"plugin": {"name": "footprint", "version": "std"}}
yaml_dict = \
self.from_yaml(
self.reference_instances["reference_setup"].def_file)
yaml_dict.update({
"workdir": base_dir,
"mode": run_mode
})
yaml_dict["controlvect"].update({
"plugin": {"name": "standard", "version": "std"},
"reload_xb": True,
"reload_file": controlvect_path
})
yaml_dict["obsvect"].update({
"plugin": {"name": "standard", "version": "std"},
"dir_obsvect": dir_obsvect
})
yaml_dict["obsoperator"].pop("parallel")
yaml_dict["obsoperator"].update({
"approx_operator": {
"datei": ddi,
"datef": ddf,
"overlap": overlap
},
"ref_fwd_dir": self.ref_fwd_dir
})
if parallel.subprocess and hasattr(parallel, 'nproc'):
# TODO: adapt to models other than LMDz
yaml_dict['model'].update({
'nproc': parallel.nproc
})
# Dumps new yaml file
yaml_file = os.path.join(
base_dir, ddi.strftime("config_segment_%Y%m%d-%H%M.yaml"))
with open(yaml_file, "w") as f:
ordered_dump(f, yaml_dict)
list_yaml_files.append(yaml_file)
if parallel.subprocess:
# Run the segments in parallel subprocesses
with multiprocessing.Pool() as pool:
pool.starmap(
run_pycif_in_subprocess,
((self.platform.python, f) for f in list_yaml_files)
)
else:
# Run the segments in new jobs with the 'platform' plugin
list_jobs = []
for ijob, (ddi, base_dir, yaml_file) in enumerate(
zip(list_segments_datei, list_segments_basedir, list_yaml_files)):
job_file = os.path.join(base_dir, ddi.strftime(
"job_pycif_segment_%Y%m%d-%H%M"))
info(
f"Submitting segment {ijob + 1} of {len(list_segments_datei)} in '{base_dir}'")
job_id = self.platform.submit_job(
f"{self.platform.python} -m pycif {yaml_file}",
job_file
)
list_jobs.append(job_id)
while not self.platform.check_jobs(list_jobs):
time.sleep(self.platform.sleep_time)
info("All segments finished")
# Re-constituting simulated observation vector in TL mode
if mode == "tl":
obsvect.ysim[:] = 0.0
obsvect.dy[:] = 0.0
ysim_out = 0. * obsvect.ysim
dy_out = 0. * obsvect.dy
for base_dir in list_segments_basedir:
# Re-load output obsvect
obsvect.read(os.path.join(base_dir, "obsoperator/tl_0000/obsvect"))
ysim_out += obsvect.ysim
dy_out += obsvect.dy
obsvect.ysim[:] = copy.deepcopy(ysim_out[:])
obsvect.dy[:] = copy.deepcopy(dy_out[:])
# Reconstruct sensitivities from segments
elif mode == "adj":
controlvect.dx = 0. * controlvect.x
dx_out = 0. * controlvect.dx
for base_dir in list_segments_basedir:
# Re-load output controlvect
controlvect.load(os.path.join(
base_dir, "obsoperator/adj_0000/controlvect.pickle"))
dx_out += controlvect.dx
controlvect.dx[:] = copy.deepcopy(dx_out[:])
# Reset x and xb values
controlvect.x = copy.deepcopy(controlvect.x_ref)
controlvect.xb = copy.deepcopy(controlvect.xb_ref)
# Cleaning unnecessary files
if self.autoflush:
info(f"Removing temporary controlvect dump in {controlvect_path}")
path.remove(controlvect_path)
info(f"Flushing unnecessary files in {self.ref_fwd_dir}")
transform_pipe = self.transform_pipe
flushrun(self, workdir, rundir, mode, transform_pipe)