from __future__ import annotations
import datetime
import os
import shutil
import subprocess
from logging import debug, error, info, warning
from os import PathLike
from pathlib import Path
from typing import Literal
import numpy as np
import pandas as pd
import xarray as xr
from .chemistry import DRY_MASS
from .flushrun import flush, remove_file, remove_spec_files
from .io.outputs import make_adjoint_out, make_end
[docs]
def run_dispersion(self, runsubdir: Path) -> None:
logfile_path = runsubdir / "dispersion.log"
# Environment variables
env = self.env_variables
if self.compiler == "nvfortran" and hasattr(self, "nproc"):
env["ACC_NUM_CORES"] = str(self.nproc)
if env:
env.update(os.environ.copy())
else:
env = None
if hasattr(self, "modules"):
# Writting a script that load the modules and run LMDZ
run_script = os.path.join(runsubdir, "run.sh")
lines = [
"#!/usr/bin/bash",
"",
"module purge",
f"module load {' '.join(self.modules)}",
"module list",
"",
self.run_command,
"",
]
with open(run_script, "w") as f:
f.write("\n".join(lines))
args = ["bash", run_script]
else:
# Directly running LMDZ with the same environment as PyCIF
args = self.run_command.split()
# Running the model
info(f"Running sub-simulation in '{runsubdir}'")
with open(logfile_path, "w") as logfile:
process = subprocess.Popen(
args,
cwd=runsubdir,
env=env,
stdout=logfile,
stderr=subprocess.PIPE,
)
_, stderr = process.communicate()
stderr = stderr.decode("utf-8")
logfile.write("\n" + stderr)
if process.returncode != 0:
error("Exception while running LMDZ")
error(f"LMDZ stderr:\n{stderr}") # Printing LMDZ stderr
raise RuntimeError(f"LMDZ did not run properly in '{runsubdir}'")
else:
info(f"LMDZ ran successfully in '{runsubdir}'")
# Always printing LMDZ stderr if not empty
if stderr.strip():
warning(f"LMDZ stderr:\n{stderr}")
[docs]
def run(
self,
runsubdir: str | PathLike,
mode: Literal["fwd", "tl", "adj"],
workdir: str | PathLike, # pylint: disable=unused-argument
ddi: datetime.datetime,
do_simu: bool = True,
approx_transf: bool = False,
ref_fwd_dir: str | PathLike | None = None,
overlap: bool = False,
**kwargs,
):
"""Run LMDZ model in forward or adjoint mode
Args:
runsubdir (str): working directory for the current run
mode (str): forward or backward
workdir (str): pycif working directory
do_simu (bool): if False, considers that the simulation was
already run
"""
runsubdir = Path(runsubdir)
# Reset update obs to start next time with an empty obs.bin
self.iniobs[ddi] = False
self.reset_obs[ddi] = True
if not do_simu:
return
if self.runsimu[ddi] and not approx_transf:
# Cleaning the directory
remove_file(runsubdir, "restart.nc")
remove_file(runsubdir, "obs_out.nc")
remove_file(runsubdir, "flux_out.nc")
remove_file(runsubdir, "prescrconcs_out.nc")
remove_file(runsubdir, "mass.nc")
remove_file(runsubdir, "lifetime.nc")
if mode in ("fwd", "tl"):
remove_spec_files(runsubdir, "trajq.bin", self.chemistry.active_species)
# In adjoint mode, setting sensitivities to zero if overlap period
if mode == "adj" and overlap:
with xr.open_dataset(runsubdir / "obs.nc") as ds:
ds["obs_ad"][:] = 0.0
ds.to_netcdf(runsubdir / "obs.nc")
run_dispersion(self, runsubdir)
# Dumps to NetCDF if asked
if mode in ("fwd", "tl") and self.dump:
dump_trajq(self, runsubdir, ddi)
# Putting all simulated concentrations to zero if overlap period
if overlap:
with xr.open_dataset(runsubdir / "obs_out.nc") as ds:
ds["spec"][:] = 0.0
ds.to_netcdf(runsubdir / "obs_out.nc")
# Stop computing transport and approximate with linear chemistry if
# species threshold is reach
if mode == "tl":
check_approx_threshold(self, ddi, mode, runsubdir)
else:
info(
f"NOT running sub-simulation in '{runsubdir}' due to approximating operator"
)
if ref_fwd_dir is None:
raise ValueError(
"A reference forward run of the model is needed to run the approximated operator"
)
# Make artificial outputs
if mode in ("fwd", "tl"):
# Use end concentrations from reference forward directory
make_end(self, runsubdir, ddi, ref_fwd_dir)
else:
# Mimick mod_out files and restart
make_adjoint_out(self, runsubdir, ddi, ref_fwd_dir)
# Process here initial conditions for the next simulation
chain_dir = runsubdir.parent / "chain"
date = pd.to_datetime(runsubdir.name, format="%Y-%m-%d_%H-%M")
shutil.move(
runsubdir / "restart.nc",
chain_dir / f"restart_{date:%Y-%m-%d}.nc",
)
if mode in ("fwd", "tl") and not self.no_trajectory_file:
# Saves trajectory files for later adjoint simulations
for spec in self.chemistry.active_species:
shutil.move(
runsubdir / f"trajq_{spec}.bin",
chain_dir / f"trajq_{spec}_{date:%Y-%m-%d}.bin",
)
# Keeps the running directory in memory for later adjoint simulations
# self.adj_refdir = os.path.join(runsubdir, "..")
# Removing all input files immediately after running the sub-simulation
if self.autoflush:
flush(self, runsubdir, input_only=True)
[docs]
def dump_trajq(
self,
runsubdir: str | PathLike,
ddi: datetime.datetime,
) -> None:
runsubdir = Path(runsubdir)
date = pd.to_datetime(runsubdir.name, format="%Y-%m-%d_%H-%M")
nlev = self.domain.nlev
di, df = self.subsimu_intervals[ddi]
ntime = int((df - di) / self.mass_fluxes_freq)
ds = xr.Dataset(coords=self.domain.get_domain_coords())
# fmt: off
ds["time"] = (["time"], pd.date_range(date, periods=ntime, freq="3H"), {
"standard_name": "time",
"long_name": "time",
"axis": "T",
})
# fmt: on
for spec in self.chemistry.active_species:
path = runsubdir / f"trajq_{spec}.bin"
# TODO: trajq precision 8 or 4
data = np.fromfile(path, dtype="float64")
attrs = {
"standard_name": f"{spec}_mass_mixing_ratio_in_dry_air",
"long_name": f"{spec} mass mixing ratio in dry air",
"units": "kg/kg",
}
if self.grid == "regular":
nlon = self.domain.nlon + 1
nlat = self.domain.nlat
if data.size != ntime * nlev * nlat * nlon:
raise ValueError(
f"Error while reading {path}: expected size {ntime * nlev * nlat * nlon} "
f"for shape {(ntime, nlev, nlat, nlon)}, got size {data.size}"
)
data = data.reshape((ntime, nlev, nlat, nlon), order="C")
ds[spec] = (["time", "lev", "lat", "lon"], data[:, :, :, :-1], attrs)
elif self.grid == "dynamico":
ncell = self.domain.nlon
if data.size != ntime * nlev * ncell:
raise ValueError(
f"Error while reading {path}: expected size {ntime * nlev * ncell} "
f"for shape {(ntime, nlev, ncell)}, got size {data.size}"
)
data = data.reshape((ntime, nlev, ncell), order="C")
ds[spec] = (["time", "lev", "cell"], data, attrs)
else:
raise ValueError(f"Unknown grid type: '{self.grid}'")
ds.to_netcdf(runsubdir.parent / "chain" / f"trajq_{date:%Y-%m-%d}.nc")
# TODO
[docs]
def check_approx_threshold(self, ddi, mode, runsubdir):
# Stop computing transport and approximate with linear chemistry if
# species threshold is reach
if not hasattr(self, "approx_thresholds"):
return
approx = True
with xr.open_dataset(runsubdir / "restart.nc") as ds:
ds = ds[[f"{spec}_tl" for spec in self.chemistry.active_species]]
spec_condition = []
debug("Checking species maximum concentrations in restart file:")
# Checking each species in 'approx_threshold'
for spec, threshold in self.approx_thresholds.items():
spec_plg = getattr(self.chemistry.acspecies, spec)
var_name = f"q{spec_plg.restart_id:02d}"
# kg/kg -> ppm
conc = 1e6 * ds[var_name] * DRY_MASS / spec_plg.mass
max_conc = conc.max().values
if np.isnan(max_conc):
raise ValueError("Maximum concentration in restart file is NaN.")
spec_condition.append(max_conc < threshold)
debug(
f" {spec}: {max_conc} ppm (threshold={threshold}), approx={max_conc < threshold}"
)
approx = (
all(spec_condition) if self.approx_condition == "and" else any(spec_condition)
)
debug(f"Using approximated operator: {approx}")
if approx:
info(
"Maximum concentations in restart file have reach the "
"thresholds defined unded 'species_threshold'. "
"Next periods will be approximated."
)
for date in self.runsimu:
if date > ddi:
self.runsimu[date] = False
self.runsimu[date] = False