Source code for pycif.plugins.models.lmdz_acc.run

import datetime
import glob
import os
import shutil
import subprocess
from logging import debug, error, info, warning

import numpy as np
import xarray as xr

from ....utils import path
from ..lmdz_old.run import dump2nc
from .chemistry import DRY_MASS
from .io.outputs.fake_end import make_end
from .io.outputs.fake_sim import write_obs, write_sim
from .io.outputs.make_mod_out import make_mod_out
from .io.outputs.read_sim import read_obs


[docs] def run_dispersion(self, runsubdir): logfile_path = os.path.join(runsubdir, "dispersion.log") # Environment variables env = None if hasattr(self, 'nproc'): env = {**os.environ.copy(), 'ACC_NUM_CORES': str(self.nproc)} if hasattr(self, 'modules'): # Writting a script that load the modules and run LMDZ run_script = os.path.join(runsubdir, "run.sh") lines = [ "#!/usr/bin/bash", "", "module purge", f"module load {' '.join(self.modules)}", "module list", "", self.run_command, "", ] with open(run_script, 'w') as f: f.write('\n'.join(lines)) args = ["bash", run_script] else: # Directly running LMDZ with the same environment as PyCIF args = self.run_command.split() # Running the model info(f"Running sub-simulation in '{runsubdir}'") debug(f"using command: '{self.run_command}'") with open(logfile_path, "w") as logfile: process = subprocess.Popen( args, cwd=runsubdir, env=env, stdout=logfile, stderr=subprocess.PIPE ) _, stderr = process.communicate() stderr = stderr.decode('utf-8') logfile.write("\n" + stderr) if process.returncode != 0: error("Exception while running LMDZ") error(f"LMDZ stderr:\n{stderr}") # Printing LMDZ stderr raise RuntimeError(f"LMDZ did not run properly in '{runsubdir}'") else: info(f"LMDZ ran successfully in '{runsubdir}'") # Always printing LMDZ stderr if not empty if stderr.strip(): warning(f"LMDZ stderr:\n{stderr}")
[docs] def run( self, runsubdir, mode, workdir, ddi, do_simu=True, approx_transf=False, ref_fwd_dir="", overlap=False, **kwargs, ): """Run LMDZ model in forward or adjoint mode Args: runsubdir (str): working directory for the current run mode (str): forward or backward workdir (str): pycif working directory do_simu (bool): if False, considers that the simulation was already run """ # Reset update obs to start next time with an empty obs.bin self.iniobs[ddi] = False self.reset_obs[ddi] = True if not do_simu: return # Cleaning the directory for pattern in ["infousedsat.txt", "restart.nc", "*_out.bin"]: path.remove(os.path.join(runsubdir, pattern)) if mode != "adj": path.remove(os.path.join(runsubdir, "traj*.bin")) if self.runsimu[ddi] and not approx_transf: # In adjoint mode, setting sensitivities to zero if overlap period if mode == "adj" and overlap: obs_scheme = read_obs(runsubdir) write_obs(obs_scheme, runsubdir) run_dispersion(self, runsubdir) # Dumps to NetCDF if asked if getattr(self, "dump", False): info(f"Dumping LMDZ outputs to a NetCDF file in '{runsubdir}'") dump2nc(self, runsubdir) # Putting all simulated concentrations to zero if overlap period if overlap: obs_scheme = read_obs(runsubdir) write_sim(obs_scheme, runsubdir) # Stop computing transport and approximate with linear chemistry if # species threshold is reach if mode == "tl": check_approx_threshold(self, ddi, mode, runsubdir) else: info( f"NOT running sub-simulation in '{runsubdir}' " "due to approximating operator" ) # Make artificial outputs if mode in ["fwd", "tl"]: # Use end concentrations from reference forward directory make_end(self, runsubdir, ddi, ref_fwd_dir) else: # Mimick mod_out files and restart make_mod_out(self, runsubdir, ddi, ref_fwd_dir) # Process here initial conditions for the next simulation chain_dir = os.path.join(runsubdir, "../chain") date = datetime.datetime.strptime(os.path.basename(runsubdir), "%Y-%m-%d_%H-%M") shutil.move( os.path.join(runsubdir, "restart.nc"), os.path.join(chain_dir, date.strftime("restart_%Y%m%d%H%M.nc")), ) if mode == "tl": restart_tl_path = os.path.join(runsubdir, "restart_tl.bin") if not os.path.isfile(restart_tl_path): raise FileNotFoundError( "LMDZ did not write a 'restart_tl.bin' file. This " "happens when no increment has been prescribed in initial " "conditions nor fluxes. Please check your configuration." ) shutil.move( os.path.join(runsubdir, "restart_tl.bin"), os.path.join(chain_dir, date.strftime("restart_tl_%Y%m%d%H%M.bin")), ) if mode != "adj": # Saves trajectory files for later adjoint simulations list_file = glob.glob(os.path.join(runsubdir, "traj*.bin")) for traj_file in list_file: rad, ext = os.path.splitext(os.path.basename(traj_file)) dest = os.path.join(chain_dir, date.strftime(f"{rad}_%Y%m%d%H%M{ext}")) debug(f"Moving '{traj_file}' to '{dest}'") shutil.move(traj_file, dest) # Keeps the running directory in memory for later adjoint simulations # self.adj_refdir = os.path.join(runsubdir, "..") # Removing input files if self.autoflush: # Observations files path.remove(os.path.join(runsubdir, "obs.bin")) path.remove(os.path.join(runsubdir, "obs.txt")) # Start files path.remove(os.path.join(runsubdir, "start.nc")) path.remove(os.path.join(runsubdir, "start_tl.nc")) path.remove(os.path.join(runsubdir, "start_tl.bin")) # Fluxes files path.remove(os.path.join(runsubdir, "mod_*.bin")) # Chemistry files path.remove(os.path.join(runsubdir, "kinetic.nc")) path.remove(os.path.join(runsubdir, "prescr_*.nc")) path.remove(os.path.join(runsubdir, "scale_*.bin")) path.remove(os.path.join(runsubdir, "prodloss_*.nc")) path.remove(os.path.join(runsubdir, "prodscale_*.bin")) # trajq files path.remove(os.path.join(runsubdir, "traj*.bin")) # Mass fluxes files path.remove(os.path.join(runsubdir, "defstoke.nc")) path.remove(os.path.join(runsubdir, "fluxstoke.nc")) path.remove(os.path.join(runsubdir, "fluxstokev.nc")) path.remove(os.path.join(runsubdir, "phystoke.nc"))
[docs] def check_approx_threshold(self, ddi, mode, runsubdir): # Stop computing transport and approximate with linear chemistry if # species threshold is reach if not hasattr(self, 'approx_thresholds'): return approx = True # Dimensions nspec = len(self.chemistry.acspecies.attributes) nlev = self.domain.nlev nlat = self.domain.nlat nlon = self.domain.nlon # Reading restart binary file filepath = os.path.join(runsubdir, "restart_tl.bin") data = np.fromfile(filepath, dtype="float") data = data.reshape((nlon, nlat, nlev, nspec), order='F') # Converting into NetCDF format var_name_list = [] for spec in self.chemistry.acspecies.attributes: spec_plg = getattr(self.chemistry.acspecies, spec) var_name_list.append(f'q{spec_plg.restart_id:02d}') ds = xr.Dataset( { var_name: (['lon', 'lat', 'lev'], data[:, :, :, i]) for i, var_name in enumerate(var_name_list) } ) spec_condition = [] debug("Checking species maximum concentrations in restart file:") # Checking each species in 'approx_threshold' for spec, threshold in self.approx_thresholds.items(): spec_plg = getattr(self.chemistry.acspecies, spec) var_name = f'q{spec_plg.restart_id:02d}' # kg/kg -> ppm conc = 1e6 * ds[var_name] * DRY_MASS / spec_plg.mass max_conc = conc.max().values if np.isnan(max_conc): raise ValueError("Maximum concentration in restart file is NaN.") spec_condition.append(max_conc < threshold) debug( f" {spec}: {max_conc} ppm (threshold={threshold}), approx={max_conc < threshold}" ) approx = ( all(spec_condition) if self.approx_condition == "and" else any(spec_condition) ) debug(f"Using approximated operator: {approx}") if approx: info( "Maximum concentations in restart file have reach the " "thresholds defined unded 'species_threshold'. " "Next periods will be approximated." ) for date in self.runsimu: if date > ddi: self.runsimu[date] = False