import warnings
import copy
import numpy as np
import pandas as pd
import xarray as xr
from ....utils import path
from logging import info, warning
[docs]
def run(self, runsubdir, mode, workdir, ddi, do_simu=True, **kwargs):
"""Run dummy_txt Gaussian model in forward or adjoint mode
Args:
runsubdir (str): working directory for the current run
mode (str): forward or backward
workdir (str): pycif working directory
do_simu (bool): if False, considers that the simulation was
already run
"""
if not do_simu:
warning("The local working directory {} was already present, "
"but the dummy model nevertheless needs to run "
"because no output files are saved"
.format(runsubdir))
# Stop here if fwd and no fluxes
if mode == "fwd":
do_fwd = sum([
np.any(self.dataflx[ddi][spec] != 0)
for spec in self.chemistry.acspecies.attributes])
if not do_fwd:
return
elif mode == "tl":
do_fwd = sum([
np.any(self.dataflx[ddi][spec] != 0)
for spec in self.chemistry.acspecies.attributes])
do_tl = sum([
np.any(self.dataflx_tl[ddi][spec] != 0)
for spec in self.chemistry.acspecies.attributes])
if not do_tl and not do_fwd:
return
# Do the simulation
# Linking Pasquill-Gifford classification
source = "{}/model/Pasquill-Gifford.txt".format(workdir)
target = "{}/Pasquill-Gifford.txt".format(runsubdir)
path.link(source, target)
# If H was already computed, take it from previous simulation
info("Running the model Dummy in {}".format(runsubdir))
H = {}
dref = self.meteo.data[ddi].index.min()
if dref in self.H_matrix:
H = self.H_matrix[dref]
# If one of acspecies is not in H, compute H again
if not all(spec in H for spec in self.chemistry.acspecies.attributes):
H = {spec: fortran_exe(self, target, mode, spec, ddi)
for spec in self.chemistry.acspecies.attributes}
# Saving H for later computations if required
if self.save_H:
self.H_matrix[dref] = H
self.dflx = {}
# Compute H.x or H^T.dy depending on the running mode
for spec in self.chemistry.acspecies.attributes:
specobs = self.dataobs[ddi][spec]
meteo_index = specobs["metadata"]["tstep"].astype(int)
if mode in ["fwd", "tl"]:
# In fwd and tl, simple product of flux and footprints
try:
specobs.loc[:, ("maindata", "spec")] = np.nansum(
self.dataflx[ddi][spec].values[meteo_index, 0] * H[spec],
axis=(1, 2)
)
specobs.loc[:, ("maindata", "incr")] = np.nansum(
self.dataflx_tl[ddi][spec].values[meteo_index, 0] * H[spec],
axis=(1, 2)
)
except:
print(__file__)
import code
code.interact(local=dict(locals(), **globals()))
else:
# Return zero sensitivity if no obs
if len(specobs) == 0:
self.dflx[spec] = xr.DataArray(
np.zeros((len(self.meteo.data[ddi].index), 1,
self.domain.nlat, self.domain.nlon)),
coords={
"time": self.meteo.data[ddi].index.values,
"lev": [0],
"lat": list(range(self.domain.nlat)),
"lon": list(range(self.domain.nlon)),
},
dims=("time", "lev", "lat", "lon"),
)
continue
# In adjoint, aggregation of footprints, to model resolution
dflx = \
specobs[("maindata", "adj_out")].values[:, np.newaxis, np.newaxis] \
* H[spec]
dflx[~pd.notnull(dflx)] = 0.0
dflx = xr.DataArray(
dflx[:, np.newaxis],
coords={
"time": meteo_index.values,
"lev": [0],
"lat": list(range(self.domain.nlat)),
"lon": list(range(self.domain.nlon)),
},
dims=("time", "lev", "lat", "lon"),
)
# Summing observations at the same hour and reprojecting to meteo
# resolution if missing observations
dflx = (
dflx.to_dataframe(name="flx")
.groupby(["time", "lev", "lat", "lon"])
.sum()
)
dflx = dflx.unstack(level=["lev", "lat", "lon"])
dflx.index = self.meteo.data[ddi].index[dflx.index]
dflx = dflx.reindex(self.meteo.data[ddi].index).stack(
level=["lev", "lat", "lon"], dropna=False
)
dflx = xr.Dataset.from_dataframe(dflx)["flx"]
# Filling NaNs with 0, as it corresponds to hours with no obs
dflx = dflx.fillna(0.0)
# Saving sensitivity to the model object
# It corresponds to outputs in classical numerical models
self.dflx[spec] = copy.deepcopy(dflx)
# Reset meteo
if do_simu:
del self.meteo.data[ddi]
[docs]
def fortran_exe(self, pg_file, mode, spec, ddi):
"""Mimic the behaviour of a numerical model executable"""
info("Running the 'FORTRAN' executable")
# Species-specific data frame
specobs = self.dataobs[ddi][spec]
# Loading Pasquill Gifford parameterization
pg_params = pd.read_csv(pg_file)
# Init variables
meteo_index = specobs["metadata"]["tstep"]
meteo_data = self.meteo.data[ddi]
winddir = meteo_data["winddir"]
windspeed = meteo_data["windspeed"]
stabclass = meteo_data["stabclass"]
# Computing distance along plume in km
dx = (
self.domain.zlon[np.newaxis]
- specobs["metadata"]["lon"].values.astype(float)[:, np.newaxis, np.newaxis]
)
dy = (
self.domain.zlat[np.newaxis]
- specobs["metadata"]["lat"].values.astype(float)[:, np.newaxis, np.newaxis]
)
dist = np.sqrt(dx ** 2 + dy ** 2) * 1e-3
theta = np.arctan2(dy, dx)
y = dist * np.cos(
theta
- np.radians(
winddir.iloc[meteo_index].values.astype(float)[:, np.newaxis, np.newaxis])
)
x = dist * np.sin(
theta
- np.radians(
winddir.iloc[meteo_index].values.astype(float)[:, np.newaxis, np.newaxis])
)
x[x <= 0] = np.nan
# y is needed in m in later formula
y = np.abs(y) * 1e3
# Select Pasquill-Gifford parameters
mask = (
(pg_params["xmin"].values[:, np.newaxis, np.newaxis, np.newaxis] <= x)
& (pg_params["xmax"].values[:, np.newaxis, np.newaxis, np.newaxis] >= x)
& (pg_params["Class"].values[:, np.newaxis, np.newaxis, np.newaxis]
== stabclass.iloc[meteo_index].values[np.newaxis, :, np.newaxis, np.newaxis])
)
inds = np.argmax(mask, axis=0)
a = pg_params["a"].values[inds]
b = pg_params["b"].values[inds]
c = pg_params["c"].values[inds]
d = pg_params["d"].values[inds]
# Computing sigmas
sigma_z = a * x ** b
sigma_y = np.abs(465.11628 * x * np.tan(0.017653293 * (c - d * np.log(x))))
# Cropping sigma values
mask = (sigma_z > 5000.0) & (
stabclass.iloc[meteo_index].values[:, np.newaxis, np.newaxis] < "D"
)
sigma_z[mask] = 5000.0
# Filtering small values in sigma
sigma_z[sigma_z < 1e-16] = np.nan
sigma_y[sigma_y < 1e-16] = np.nan
# Filling H matrix
hmatrix = (
1 / 2.0 / np.pi / sigma_y / sigma_z
/ windspeed.iloc[meteo_index].values[:, np.newaxis, np.newaxis]
* np.exp(-(y ** 2) / 2.0 / sigma_y ** 2)
* 2
* np.exp(
- specobs["metadata"]["alt"].values.astype(float)[:, np.newaxis, np.newaxis] ** 2
/ 2.0
/ sigma_z ** 2
)
)
hmatrix[np.isnan(hmatrix)] = 0.0
return hmatrix