Source code for pycif.plugins.models.iconart.run

import os
import glob
import time
import shutil
import subprocess
import xarray as xr
from .io.outputs.read_sim import fetch_sim
from .flushrun import flush_rundir

from logging import error, info, warning

from ....utils import path
from .io.utils import OUTPUT_DIRNAME, OUTPUT_PATTERN

[docs] def run(self, runsubdir, mode, workdir, ddi, nbproc=1, do_simu=True, approx_transf=False, ref_fwd_dir="", overlap=False, **kwargs): """Run the model in forward, tangent-linear or adjoint mode. This includes: - executing the model external executable - updating ``adj_refdir`` - moving files needed for chained simulations to "{}/../".format(runsubdir) Note: For model for which the adjoint is not coded, make sure to return a clear error if the ``run`` function is called in ``adj`` mode and with ``do_simu = True`` Args: self: the model Plugin runsubdir (str): working directory for the current run mode (str): forward or backward workdir (str): pyCIF working directory do_simu (bool): re-run or not existing simulation """ # previous chain directory for moving the first output from the last runsubdir to this one last_runsubdir = getattr(self, "last_runsubdir", runsubdir) last_chaindir = os.path.join(last_runsubdir, '..', 'chain') if hasattr(self, "last_runsubdir"): delattr(self, "last_runsubdir") # chain directory for saving the last output chaindir = os.path.join(runsubdir, '..', 'chain') # Reset output data if hasattr(self, "icon_dataout"): delattr(self, "icon_dataout") # Skip if not a forward simulation if mode != 'fwd' and do_simu: error(f"Will raise error") raise ValueError( f"ICON-ART can only be run in 'fwd' mode. " f"Not '{mode}'." ) # Skip the run if asked if not do_simu or self.dont_run: warning(f"Skip this ICON-ART subsimulation because do_simu = False " f"or self.dont_run = True.") return # Check if output files already exist in the directory dont_run_subsim = False output_files = glob.glob(os.path.join(runsubdir, OUTPUT_DIRNAME, OUTPUT_PATTERN)) output_files = sorted(output_files) noutput_files = len(output_files) output_dates = self.tstep_dates[ddi] dont_run_subsim = noutput_files >= len(output_dates) - 1 # Skip the run if the model has already run if dont_run_subsim: info("Skip this ICON-ART subsimulation because all the outputs already exist.") # Run ICON else: # Find the executable path exe_path = os.path.join(runsubdir, 'icon.exe') if not os.path.exists(exe_path): raise FileNotFoundError(exe_path) # Find the wrapper path wrapper_path = os.path.join(runsubdir, 'wrapper_icon.sh') if not os.path.exists(wrapper_path): raise FileNotFoundError(wrapper_path) # Copy the last output from the previous sub-simulation as the first output # Need to do this because the ICON built-in restart option won't output the initial conditions first_output_filename = os.path.join( last_chaindir, ddi.strftime('OUTPUT__%Y%m%dT%H%M%SZ.nc') ) if os.path.exists(first_output_filename): shutil.copy( first_output_filename, os.path.join(runsubdir, OUTPUT_DIRNAME, os.path.basename(first_output_filename)) ) # Either run ICONART in a different job or use the same process if self.run_icon_with_another_job: # Run ICONART in a different job if not hasattr(self.platform, "plugin") or self.platform.plugin.name != 'EMPA': raise Exception("The option 'run_icon_with_another_job' have been set to True. " "Therefore, a Platform Plugin EMPA/Daint must be added " "to the Yaml configuration.") # Run the base function as an independent process job_file = os.path.join(runsubdir, "job_iconart") info("Submitting job for running ICON-ART...") job_id = self.platform.submit_job( "srun {} {}".format(wrapper_path, exe_path), job_file, jobname='ICON' ) # Check that jobs are over while not self.platform.check_jobs([job_id]): time.sleep(self.platform.sleep_time) else: with open("{}/log.std".format(runsubdir), "w") as log: info(f"Starting icon executable at {exe_path}.") process = subprocess.Popen( ['srun', wrapper_path, exe_path], cwd=runsubdir, stdout=log, stderr=subprocess.PIPE ) _, stderr = process.communicate() # Check errors error_message = stderr.decode('utf-8') if error_message: err_filepath = "{}/log.stderr".format(runsubdir) error( f"Errors while running {exe_path}.\n" f"They are logged in {err_filepath} ." ) with open(err_filepath, "w") as err_file: err_file.write(error_message) # Remove heavy icon core files not needed path.remove(os.path.join(runsubdir, 'core.*')) # Save the last output from this sub-simulation to be used as the first output of the next one # Need to do this because the ICON built-in restart option won't output the initial conditions last_output_filename = os.path.join( runsubdir, OUTPUT_DIRNAME, output_dates[-1].strftime('OUTPUT__%Y%m%dT%H%M%SZ.nc') ) if os.path.exists(last_output_filename): shutil.move( last_output_filename, os.path.join(chaindir, os.path.basename(last_output_filename)) ) # Add an attribute to the restart file just created by ICON to locate the runsubdir where it was created info("Add an attribute to the restart file to locate the directory where it was created.") restart_out_path = ddi.strftime("{}/../chain/restart_%Y%m%d%H.nc".format(runsubdir)) if os.path.exists(restart_out_path): ds_restart = xr.open_dataset(restart_out_path) ds_restart.attrs["cif_runsubdir"] = runsubdir os.remove(restart_out_path) ds_restart.to_netcdf(restart_out_path) else: warning("The restart file does not exist.") # Check if the outputs have already been processed dataout_filepath = os.path.join(runsubdir, 'outputs/dataout/dataout.nc') if not os.path.exists(dataout_filepath): fetch_sim(self, runsubdir, mode, ddi) else: info("Skip outputs post-processing because they have already been processed.") # Clean the directory if self.force_clean_run: flush_rundir(runsubdir, mode)