import datetime
import glob
import os
import shutil
import subprocess
from logging import debug, error, info, warning
import numpy as np
import xarray as xr
from ....utils import path
from ..lmdz_old.run import dump2nc
from .chemistry import DRY_MASS
from .io.outputs.fake_end import make_end
from .io.outputs.fake_sim import write_obs, write_sim
from .io.outputs.make_mod_out import make_mod_out
from .io.outputs.read_sim import read_obs
[docs]
def run_dispersion(self, runsubdir):
logfile_path = os.path.join(runsubdir, "dispersion.log")
# Environment variables
env = None
if hasattr(self, 'nproc'):
env = {**os.environ.copy(), 'ACC_NUM_CORES': str(self.nproc)}
if hasattr(self, 'modules'):
# Writting a script that load the modules and run LMDZ
run_script = os.path.join(runsubdir, "run.sh")
lines = [
"#!/usr/bin/bash",
"",
"module purge",
f"module load {' '.join(self.modules)}",
"module list",
"",
self.run_command,
"",
]
with open(run_script, 'w') as f:
f.write('\n'.join(lines))
args = ["bash", run_script]
else:
# Directly running LMDZ with the same environment as PyCIF
args = self.run_command.split()
# Running the model
info(f"Running sub-simulation in '{runsubdir}'")
debug(f"using command: '{self.run_command}'")
with open(logfile_path, "w") as logfile:
process = subprocess.Popen(
args, cwd=runsubdir, env=env, stdout=logfile, stderr=subprocess.PIPE
)
_, stderr = process.communicate()
stderr = stderr.decode('utf-8')
logfile.write("\n" + stderr)
if process.returncode != 0:
error("Exception while running LMDZ")
error(f"LMDZ stderr:\n{stderr}") # Printing LMDZ stderr
raise RuntimeError(f"LMDZ did not run properly in '{runsubdir}'")
else:
info(f"LMDZ ran successfully in '{runsubdir}'")
# Always printing LMDZ stderr if not empty
if stderr.strip():
warning(f"LMDZ stderr:\n{stderr}")
[docs]
def run(
self,
runsubdir,
mode,
workdir,
ddi,
do_simu=True,
approx_transf=False,
ref_fwd_dir="",
overlap=False,
**kwargs,
):
"""Run LMDZ model in forward or adjoint mode
Args:
runsubdir (str): working directory for the current run
mode (str): forward or backward
workdir (str): pycif working directory
do_simu (bool): if False, considers that the simulation was
already run
"""
# Reset update obs to start next time with an empty obs.bin
self.iniobs[ddi] = False
self.reset_obs[ddi] = True
if not do_simu:
return
# Cleaning the directory
for pattern in ["infousedsat.txt", "restart.nc", "*_out.bin"]:
path.remove(os.path.join(runsubdir, pattern))
if mode != "adj":
path.remove(os.path.join(runsubdir, "traj*.bin"))
if self.runsimu[ddi] and not approx_transf:
# In adjoint mode, setting sensitivities to zero if overlap period
if mode == "adj" and overlap:
obs_scheme = read_obs(runsubdir)
write_obs(obs_scheme, runsubdir)
run_dispersion(self, runsubdir)
# Dumps to NetCDF if asked
if getattr(self, "dump", False):
info(f"Dumping LMDZ outputs to a NetCDF file in '{runsubdir}'")
dump2nc(self, runsubdir)
# Putting all simulated concentrations to zero if overlap period
if overlap:
obs_scheme = read_obs(runsubdir)
write_sim(obs_scheme, runsubdir)
# Stop computing transport and approximate with linear chemistry if
# species threshold is reach
if mode == "tl":
check_approx_threshold(self, ddi, mode, runsubdir)
else:
info(
f"NOT running sub-simulation in '{runsubdir}' "
"due to approximating operator"
)
# Make artificial outputs
if mode in ["fwd", "tl"]:
# Use end concentrations from reference forward directory
make_end(self, runsubdir, ddi, ref_fwd_dir)
else:
# Mimick mod_out files and restart
make_mod_out(self, runsubdir, ddi, ref_fwd_dir)
# Process here initial conditions for the next simulation
chain_dir = os.path.join(runsubdir, "../chain")
date = datetime.datetime.strptime(os.path.basename(runsubdir), "%Y-%m-%d_%H-%M")
shutil.move(
os.path.join(runsubdir, "restart.nc"),
os.path.join(chain_dir, date.strftime("restart_%Y%m%d%H%M.nc")),
)
if mode == "tl":
restart_tl_path = os.path.join(runsubdir, "restart_tl.bin")
if not os.path.isfile(restart_tl_path):
raise FileNotFoundError(
"LMDZ did not write a 'restart_tl.bin' file. This "
"happens when no increment has been prescribed in initial "
"conditions nor fluxes. Please check your configuration."
)
shutil.move(
os.path.join(runsubdir, "restart_tl.bin"),
os.path.join(chain_dir, date.strftime("restart_tl_%Y%m%d%H%M.bin")),
)
if mode != "adj":
# Saves trajectory files for later adjoint simulations
list_file = glob.glob(os.path.join(runsubdir, "traj*.bin"))
for traj_file in list_file:
rad, ext = os.path.splitext(os.path.basename(traj_file))
dest = os.path.join(chain_dir, date.strftime(f"{rad}_%Y%m%d%H%M{ext}"))
debug(f"Moving '{traj_file}' to '{dest}'")
shutil.move(traj_file, dest)
# Keeps the running directory in memory for later adjoint simulations
# self.adj_refdir = os.path.join(runsubdir, "..")
# Removing input files
if self.autoflush:
# Observations files
path.remove(os.path.join(runsubdir, "obs.bin"))
path.remove(os.path.join(runsubdir, "obs.txt"))
# Start files
path.remove(os.path.join(runsubdir, "start.nc"))
path.remove(os.path.join(runsubdir, "start_tl.nc"))
path.remove(os.path.join(runsubdir, "start_tl.bin"))
# Fluxes files
path.remove(os.path.join(runsubdir, "mod_*.bin"))
# Chemistry files
path.remove(os.path.join(runsubdir, "kinetic.nc"))
path.remove(os.path.join(runsubdir, "prescr_*.nc"))
path.remove(os.path.join(runsubdir, "scale_*.bin"))
path.remove(os.path.join(runsubdir, "prodloss_*.nc"))
path.remove(os.path.join(runsubdir, "prodscale_*.bin"))
# trajq files
path.remove(os.path.join(runsubdir, "traj*.bin"))
# Mass fluxes files
path.remove(os.path.join(runsubdir, "defstoke.nc"))
path.remove(os.path.join(runsubdir, "fluxstoke.nc"))
path.remove(os.path.join(runsubdir, "fluxstokev.nc"))
path.remove(os.path.join(runsubdir, "phystoke.nc"))
[docs]
def check_approx_threshold(self, ddi, mode, runsubdir):
# Stop computing transport and approximate with linear chemistry if
# species threshold is reach
if not hasattr(self, 'approx_thresholds'):
return
approx = True
# Dimensions
nspec = len(self.chemistry.acspecies.attributes)
nlev = self.domain.nlev
nlat = self.domain.nlat
nlon = self.domain.nlon
# Reading restart binary file
filepath = os.path.join(runsubdir, "restart_tl.bin")
data = np.fromfile(filepath, dtype="float")
data = data.reshape((nlon, nlat, nlev, nspec), order='F')
# Converting into NetCDF format
var_name_list = []
for spec in self.chemistry.acspecies.attributes:
spec_plg = getattr(self.chemistry.acspecies, spec)
var_name_list.append(f'q{spec_plg.restart_id:02d}')
ds = xr.Dataset(
{
var_name: (['lon', 'lat', 'lev'], data[:, :, :, i])
for i, var_name in enumerate(var_name_list)
}
)
spec_condition = []
debug("Checking species maximum concentrations in restart file:")
# Checking each species in 'approx_threshold'
for spec, threshold in self.approx_thresholds.items():
spec_plg = getattr(self.chemistry.acspecies, spec)
var_name = f'q{spec_plg.restart_id:02d}'
# kg/kg -> ppm
conc = 1e6 * ds[var_name] * DRY_MASS / spec_plg.mass
max_conc = conc.max().values
if np.isnan(max_conc):
raise ValueError("Maximum concentration in restart file is NaN.")
spec_condition.append(max_conc < threshold)
debug(
f" {spec}: {max_conc} ppm (threshold={threshold}), approx={max_conc < threshold}"
)
approx = (
all(spec_condition) if self.approx_condition == "and" else any(spec_condition)
)
debug(f"Using approximated operator: {approx}")
if approx:
info(
"Maximum concentations in restart file have reach the "
"thresholds defined unded 'species_threshold'. "
"Next periods will be approximated."
)
for date in self.runsimu:
if date > ddi:
self.runsimu[date] = False