Source code for pycif.plugins.obsoperators.standard.parallel

import copy
import multiprocessing
import os
import subprocess
import time
from logging import debug, info

import pandas as pd

from ....utils import path
from ....utils.dates import date_range
from ....utils.yml import ordered_dump
from .serial import obsoper_serial
from .flushrun import flushrun


[docs] def run_pycif_in_subprocess(python_path, yaml_path): """Run a pyCIF configuration file in a blocking subprocess. Launches ``python_path -m pycif yaml_path``, redirecting stdout to ``subprocess_stdout.log`` and stderr to ``subprocess_stderr.log`` in the same directory as *yaml_path*. Args: python_path (str): path to the Python interpreter (e.g. ``self.platform.python``). yaml_path (str): absolute path to the pyCIF YAML configuration file to execute. Raises: RuntimeError: if the subprocess exits with a non-zero return code. """ base_dir = os.path.dirname(yaml_path) out_filepath = os.path.join(base_dir, "subprocess_stdout.log") err_filepath = os.path.join(base_dir, "subprocess_stderr.log") with open(out_filepath, "w") as out_file, open(err_filepath, "w") as err_file: info(f"Running pycif in '{base_dir}'") process = subprocess.Popen( [python_path, "-m", "pycif", yaml_path], stdout=out_file, stderr=err_file, cwd=base_dir ) process.wait() debug( f"pycif exited with a return code {process.returncode} in '{base_dir}'") if process.returncode: raise RuntimeError( f"pycif subprocess did not run properly in '{base_dir}'")
[docs] def obsoper_parallel(self, controlvect, obsvect, rundir, mode, workdir, check_transforms, ignore_exceptions): """Run the observation operator in parallel over independent time segments. Splits the simulation window ``[self.datei, self.datef]`` into segments of length ``self.parallel.segments`` with optional boundary overlap ``self.parallel.overlap``, then runs each segment independently — either as subprocesses (``self.parallel.subprocess = True``) or as HPC jobs via the ``platform`` plugin. Each segment is configured via a freshly dumped YAML file that restricts the ``approx_operator`` window to its date range, then executed with :func:`run_pycif_in_subprocess` or ``self.platform.submit_job``. After all segments finish, their outputs are reassembled: * ``'tl'`` mode — ``obsvect.ysim`` and ``obsvect.dy`` are set to the element-wise sums over all segment observation vectors. * ``'adj'`` mode — ``controlvect.dx`` is set to the element-wise sum over all segment adjoint sensitivities; ``controlvect.x`` and ``controlvect.xb`` are reset to their pre-run values. Args: self (ObsOperator): the obs-operator plugin instance. Must have ``self.parallel`` (with ``segments``, ``overlap``, ``subprocess`` attributes), ``self.datei``, ``self.datef``, ``self.ref_fwd_dir``, and ``self.platform`` set. controlvect (ControlVect): control-vector object. obsvect (ObsVect): observation-vector object. rundir (str): the run sub-directory for this operator call. mode (str): execution mode — ``'tl'`` or ``'adj'``. (Forward mode is always dispatched to serial execution.) workdir (str): parent working directory. check_transforms (bool): if ``True``, validate each segment's transform adjoint / TL identity. ignore_exceptions (bool): if ``True``, non-fatal transform errors inside segments are swallowed. Raises: RuntimeError: if a subprocess-based segment exits with a non-zero return code (propagated from :func:`run_pycif_in_subprocess`). """ # Run reference forward run if not already done if not os.path.isdir(self.ref_fwd_dir): info(f"directory '{self.ref_fwd_dir}' not found, " "running the reference forward run") fwd_dir = os.path.join(rundir, "ref_forward") path.init_dir(os.path.join(fwd_dir, "chain")) obsoper_serial(self, controlvect, obsvect, fwd_dir, "fwd", workdir, check_transforms, ignore_exceptions) self.ref_fwd_dir = fwd_dir # Save Xb for later and update xb with current x for running the forward controlvect.xb_ref = copy.deepcopy(controlvect.xb) controlvect.x_ref = copy.deepcopy(controlvect.x) # List of segments parallel = self.parallel segments = parallel.segments overlap = parallel.overlap list_segments = date_range(self.datei, self.datef, segments) list_segments_datei = [ (ddi - pd.tseries.frequencies.to_offset(overlap)).to_pydatetime() for ddi in list_segments[:-1] ] list_segments_datei[0] = list_segments[0] list_segments_datef = list_segments[1:] list_segments_basedir = [ os.path.join(rundir, ddi.strftime("parallel/segment_%Y%m%d-%H%M")) for ddi in list_segments_datei ] # Dumps controlvect value controlvect_path = os.path.join(rundir, "parallel/controlvect.pickle") path.init_dir(os.path.dirname(controlvect_path)) controlvect.dump(controlvect_path) list_yaml_files = [] for ddi, ddf, base_dir in zip( list_segments_datei, list_segments_datef, list_segments_basedir): # Creates base directory path.init_dir(base_dir) # Dumps controlvect value dir_obsvect = os.path.join(base_dir, "obsvect") obsvect.dump(dir_obsvect) # Updating configuration dictionary if mode == "tl": run_mode = {"plugin": {"name": "forward", "version": "std"}, "run_mode": "tl", "use_xb": False} elif mode == "adj": run_mode = {"plugin": {"name": "footprint", "version": "std"}} yaml_dict = \ self.from_yaml( self.reference_instances["reference_setup"].def_file) yaml_dict.update({ "workdir": base_dir, "mode": run_mode }) yaml_dict["controlvect"].update({ "plugin": {"name": "standard", "version": "std"}, "reload_xb": True, "reload_file": controlvect_path }) yaml_dict["obsvect"].update({ "plugin": {"name": "standard", "version": "std"}, "dir_obsvect": dir_obsvect }) yaml_dict["obsoperator"].pop("parallel") yaml_dict["obsoperator"].update({ "approx_operator": { "datei": ddi, "datef": ddf, "overlap": overlap }, "ref_fwd_dir": self.ref_fwd_dir }) if parallel.subprocess and hasattr(parallel, 'nproc'): # TODO: adapt to models other than LMDz yaml_dict['model'].update({ 'nproc': parallel.nproc }) # Dumps new yaml file yaml_file = os.path.join( base_dir, ddi.strftime("config_segment_%Y%m%d-%H%M.yaml")) with open(yaml_file, "w") as f: ordered_dump(f, yaml_dict) list_yaml_files.append(yaml_file) if parallel.subprocess: # Run the segments in parallel subprocesses with multiprocessing.Pool() as pool: pool.starmap( run_pycif_in_subprocess, ((self.platform.python, f) for f in list_yaml_files) ) else: # Run the segments in new jobs with the 'platform' plugin list_jobs = [] for ijob, (ddi, base_dir, yaml_file) in enumerate( zip(list_segments_datei, list_segments_basedir, list_yaml_files)): job_file = os.path.join(base_dir, ddi.strftime( "job_pycif_segment_%Y%m%d-%H%M")) info( f"Submitting segment {ijob + 1} of {len(list_segments_datei)} in '{base_dir}'") job_id = self.platform.submit_job( f"{self.platform.python} -m pycif {yaml_file}", job_file ) list_jobs.append(job_id) while not self.platform.check_jobs(list_jobs): time.sleep(self.platform.sleep_time) info("All segments finished") # Re-constituting simulated observation vector in TL mode if mode == "tl": obsvect.ysim[:] = 0.0 obsvect.dy[:] = 0.0 ysim_out = 0. * obsvect.ysim dy_out = 0. * obsvect.dy for base_dir in list_segments_basedir: # Re-load output obsvect obsvect.read(os.path.join(base_dir, "obsoperator/tl_0000/obsvect")) ysim_out += obsvect.ysim dy_out += obsvect.dy obsvect.ysim[:] = copy.deepcopy(ysim_out[:]) obsvect.dy[:] = copy.deepcopy(dy_out[:]) # Reconstruct sensitivities from segments elif mode == "adj": controlvect.dx = 0. * controlvect.x dx_out = 0. * controlvect.dx for base_dir in list_segments_basedir: # Re-load output controlvect controlvect.load(os.path.join( base_dir, "obsoperator/adj_0000/controlvect.pickle")) dx_out += controlvect.dx controlvect.dx[:] = copy.deepcopy(dx_out[:]) # Reset x and xb values controlvect.x = copy.deepcopy(controlvect.x_ref) controlvect.xb = copy.deepcopy(controlvect.xb_ref) # Cleaning unnecessary files if self.autoflush: info(f"Removing temporary controlvect dump in {controlvect_path}") path.remove(controlvect_path) info(f"Flushing unnecessary files in {self.ref_fwd_dir}") transform_pipe = self.transform_pipe flushrun(self, workdir, rundir, mode, transform_pipe)