Source code for pycif.plugins.models.lmdz_ico.run

from __future__ import annotations

import datetime
import os
import shutil
import subprocess
from logging import debug, error, info, warning
from os import PathLike
from pathlib import Path
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr

from .chemistry import DRY_MASS
from .flushrun import flush, remove_file, remove_spec_files
from .io.outputs import make_adjoint_out, make_end


[docs] def run_dispersion(self, runsubdir: Path) -> None: logfile_path = runsubdir / "dispersion.log" # Environment variables env = self.env_variables if self.compiler == "nvfortran" and hasattr(self, "nproc"): env["ACC_NUM_CORES"] = str(self.nproc) if env: env.update(os.environ.copy()) else: env = None if hasattr(self, "modules"): # Writting a script that load the modules and run LMDZ run_script = os.path.join(runsubdir, "run.sh") lines = [ "#!/usr/bin/bash", "", "module purge", f"module load {' '.join(self.modules)}", "module list", "", self.run_command, "", ] with open(run_script, "w") as f: f.write("\n".join(lines)) args = ["bash", run_script] else: # Directly running LMDZ with the same environment as PyCIF args = self.run_command.split() # Running the model info(f"Running sub-simulation in '{runsubdir}'") with open(logfile_path, "w") as logfile: process = subprocess.Popen( args, cwd=runsubdir, env=env, stdout=logfile, stderr=subprocess.PIPE, ) _, stderr = process.communicate() stderr = stderr.decode("utf-8") logfile.write("\n" + stderr) if process.returncode != 0: error("Exception while running LMDZ") error(f"LMDZ stderr:\n{stderr}") # Printing LMDZ stderr raise RuntimeError(f"LMDZ did not run properly in '{runsubdir}'") else: info(f"LMDZ ran successfully in '{runsubdir}'") # Always printing LMDZ stderr if not empty if stderr.strip(): warning(f"LMDZ stderr:\n{stderr}")
[docs] def run( self, runsubdir: str | PathLike, mode: Literal["fwd", "tl", "adj"], workdir: str | PathLike, # pylint: disable=unused-argument ddi: datetime.datetime, do_simu: bool = True, approx_transf: bool = False, ref_fwd_dir: str | PathLike | None = None, overlap: bool = False, **kwargs, ): """Run LMDZ model in forward or adjoint mode Args: runsubdir (str): working directory for the current run mode (str): forward or backward workdir (str): pycif working directory do_simu (bool): if False, considers that the simulation was already run """ runsubdir = Path(runsubdir) # Reset update obs to start next time with an empty obs.bin self.iniobs[ddi] = False self.reset_obs[ddi] = True if not do_simu: return if self.runsimu[ddi] and not approx_transf: # Cleaning the directory remove_file(runsubdir, "restart.nc") remove_file(runsubdir, "obs_out.nc") remove_file(runsubdir, "flux_out.nc") remove_file(runsubdir, "prescrconcs_out.nc") remove_file(runsubdir, "mass.nc") remove_file(runsubdir, "lifetime.nc") if mode in ("fwd", "tl"): remove_spec_files(runsubdir, "trajq.bin", self.chemistry.active_species) # In adjoint mode, setting sensitivities to zero if overlap period if mode == "adj" and overlap: with xr.open_dataset(runsubdir / "obs.nc") as ds: ds["obs_ad"][:] = 0.0 ds.to_netcdf(runsubdir / "obs.nc") run_dispersion(self, runsubdir) # Dumps to NetCDF if asked if mode in ("fwd", "tl") and self.dump: dump_trajq(self, runsubdir, ddi) # Putting all simulated concentrations to zero if overlap period if overlap: with xr.open_dataset(runsubdir / "obs_out.nc") as ds: ds["spec"][:] = 0.0 ds.to_netcdf(runsubdir / "obs_out.nc") # Stop computing transport and approximate with linear chemistry if # species threshold is reach if mode == "tl": check_approx_threshold(self, ddi, mode, runsubdir) else: info( f"NOT running sub-simulation in '{runsubdir}' due to approximating operator" ) if ref_fwd_dir is None: raise ValueError( "A reference forward run of the model is needed to run the approximated operator" ) # Make artificial outputs if mode in ("fwd", "tl"): # Use end concentrations from reference forward directory make_end(self, runsubdir, ddi, ref_fwd_dir) else: # Mimick mod_out files and restart make_adjoint_out(self, runsubdir, ddi, ref_fwd_dir) # Process here initial conditions for the next simulation chain_dir = runsubdir.parent / "chain" date = pd.to_datetime(runsubdir.name, format="%Y-%m-%d_%H-%M") shutil.move( runsubdir / "restart.nc", chain_dir / f"restart_{date:%Y-%m-%d}.nc", ) if mode in ("fwd", "tl") and not self.no_trajectory_file: # Saves trajectory files for later adjoint simulations for spec in self.chemistry.active_species: shutil.move( runsubdir / f"trajq_{spec}.bin", chain_dir / f"trajq_{spec}_{date:%Y-%m-%d}.bin", ) # Keeps the running directory in memory for later adjoint simulations # self.adj_refdir = os.path.join(runsubdir, "..") # Removing all input files immediately after running the sub-simulation if self.autoflush: flush(self, runsubdir, input_only=True)
[docs] def dump_trajq( self, runsubdir: str | PathLike, ddi: datetime.datetime, ) -> None: runsubdir = Path(runsubdir) date = pd.to_datetime(runsubdir.name, format="%Y-%m-%d_%H-%M") nlev = self.domain.nlev di, df = self.subsimu_intervals[ddi] ntime = int((df - di) / self.mass_fluxes_freq) ds = xr.Dataset(coords=self.domain.get_domain_coords()) # fmt: off ds["time"] = (["time"], pd.date_range(date, periods=ntime, freq="3H"), { "standard_name": "time", "long_name": "time", "axis": "T", }) # fmt: on for spec in self.chemistry.active_species: path = runsubdir / f"trajq_{spec}.bin" # TODO: trajq precision 8 or 4 data = np.fromfile(path, dtype="float64") attrs = { "standard_name": f"{spec}_mass_mixing_ratio_in_dry_air", "long_name": f"{spec} mass mixing ratio in dry air", "units": "kg/kg", } if self.grid == "regular": nlon = self.domain.nlon + 1 nlat = self.domain.nlat if data.size != ntime * nlev * nlat * nlon: raise ValueError( f"Error while reading {path}: expected size {ntime * nlev * nlat * nlon} " f"for shape {(ntime, nlev, nlat, nlon)}, got size {data.size}" ) data = data.reshape((ntime, nlev, nlat, nlon), order="C") ds[spec] = (["time", "lev", "lat", "lon"], data[:, :, :, :-1], attrs) elif self.grid == "dynamico": ncell = self.domain.nlon if data.size != ntime * nlev * ncell: raise ValueError( f"Error while reading {path}: expected size {ntime * nlev * ncell} " f"for shape {(ntime, nlev, ncell)}, got size {data.size}" ) data = data.reshape((ntime, nlev, ncell), order="C") ds[spec] = (["time", "lev", "cell"], data, attrs) else: raise ValueError(f"Unknown grid type: '{self.grid}'") ds.to_netcdf(runsubdir.parent / "chain" / f"trajq_{date:%Y-%m-%d}.nc")
# TODO
[docs] def check_approx_threshold(self, ddi, mode, runsubdir): # Stop computing transport and approximate with linear chemistry if # species threshold is reach if not hasattr(self, "approx_thresholds"): return approx = True with xr.open_dataset(runsubdir / "restart.nc") as ds: ds = ds[[f"{spec}_tl" for spec in self.chemistry.active_species]] spec_condition = [] debug("Checking species maximum concentrations in restart file:") # Checking each species in 'approx_threshold' for spec, threshold in self.approx_thresholds.items(): spec_plg = getattr(self.chemistry.acspecies, spec) var_name = f"q{spec_plg.restart_id:02d}" # kg/kg -> ppm conc = 1e6 * ds[var_name] * DRY_MASS / spec_plg.mass max_conc = conc.max().values if np.isnan(max_conc): raise ValueError("Maximum concentration in restart file is NaN.") spec_condition.append(max_conc < threshold) debug( f" {spec}: {max_conc} ppm (threshold={threshold}), approx={max_conc < threshold}" ) approx = ( all(spec_condition) if self.approx_condition == "and" else any(spec_condition) ) debug(f"Using approximated operator: {approx}") if approx: info( "Maximum concentations in restart file have reach the " "thresholds defined unded 'species_threshold'. " "Next periods will be approximated." ) for date in self.runsimu: if date > ddi: self.runsimu[date] = False self.runsimu[date] = False