import numpy as np
import pandas as pd
import xarray as xr
import os
import itertools
import copy
from logging import info, debug, warning
from .....utils.path import init_dir
from .....utils.datastores.dump import dump_datastore, read_datastore
from .....utils.datastores.empty import init_empty
from .....utils.datastores.crop_monitor import crop_monitor
from .apply_AK import apply_ak
from .apply_AK import apply_ak_tl
from .vinterp import vertical_interp
[docs]
def forward(
transf,
inout_datastore,
controlvect,
obsvect,
mapper,
di,
df,
mode,
runsubdir,
workdir,
onlyinit=False,
save_debug=False,
**kwargs
):
"""Aggregate simulations at the grid scale to total columns.
Re-interpolate the model pressure levels to the satellite averaging kernel
levels. Average using the averaging kernel formula
"""
if not hasattr(transf, "save_data"):
transf.save_data = {}
ddi = min(di, df)
if onlyinit:
return
ref_parameter = transf.parameter[0]
ref_component = transf.component[0]
ref_datastore = inout_datastore["outputs"][(ref_component, ref_parameter)]
ref_ds = ref_datastore[ddi]
target_datastore = \
inout_datastore["inputs"][("concs", ref_parameter)][ddi].reset_index(drop=True)
# Put auxiliary values into target datastore
target_datastore.loc[:, ("metadata", "pressure")] = \
inout_datastore["inputs"][
("pressure", ref_parameter)][ddi][("maindata", "spec")].values
target_datastore.loc[:, ("metadata", "dp")] = \
inout_datastore["inputs"][
("dpressure", ref_parameter)][ddi][("maindata", "spec")].values
if transf.product == "column":
target_datastore.loc[:, ("metadata", "airm")] = \
inout_datastore["inputs"][
("airm", ref_parameter)][ddi][("maindata", "spec")].values
target_datastore.loc[:, ("metadata", "hlay")] = \
inout_datastore["inputs"][(
"hlay", ref_parameter)][ddi][("maindata", "spec")].values
target_datastore.loc[:, ("metadata", "indorig")] = \
transf.metadata[ddi].loc[:, ("metadata", "indorig")].values
target_datastore.loc[:, ("metadata", "date")] = \
transf.metadata[ddi].loc[:, ("metadata", "date")].values
ddi = min(di, df)
ref_indexes = ~transf.metadata[ddi].loc[
:, ("metadata", "indorig")].duplicated().values
y0 = copy.deepcopy(target_datastore.loc[ref_indexes])
y0.index = y0.loc[:, ("metadata", "indorig")].values
# Re-construct list of file_aks
ref_mapper = mapper["outputs"][(ref_component, ref_parameter)]
ref_tracer = ref_mapper["tracer"]
files_aks = \
set(list(itertools.chain(
*itertools.chain(*ref_tracer.input_files.values()))))
files_aks = sorted(files_aks)
# Use only files relevant for the present dataset
ind_file = transf.metadata[ddi].loc[ref_indexes, ("metadata", "ind_file")]
files_aks = np.array(files_aks)[ind_file.drop_duplicates().values.astype(int)]
# Pass information from previous transforms if debug
if save_debug:
df_debug = pd.DataFrame({
c: inout_datastore["inputs"][trid][ddi][c].loc[ref_indexes]
for trid in inout_datastore["inputs"]
if trid[0] != "stratosphere"
for c in inout_datastore["inputs"][trid][ddi].columns
if c[0] not in y0.columns.get_level_values(level=0)
})
y0 = pd.concat([y0, df_debug], axis=1)
# Exit if empty observation datastore
if target_datastore.size == 0:
return
# Init directory for saving forward datastore for later use by adjoint
dir_fwd = ddi.strftime("{}/../chain/satellites/{}".format(
runsubdir, transf.transform_id))
if not os.path.isdir(dir_fwd):
init_dir(dir_fwd)
# Check that only one satellite was specified in the datastore
iq1 = transf.metadata[ddi].loc[ref_indexes, ("metadata", "station")]
list_satIDs = iq1.unique()
if len(list_satIDs) > 1:
raise Exception(
f"Warning! Several satellite IDs were specified in the monitor file: "
f"{list_satIDs} \n "
"This corresponds to the column 'station'. Please check your monitor file. "
"It is possible to manually exclude IDs from the datastore by using the "
"parameter 'exclude_stations' (list[str]) in the corresponding satellite "
"paragraph of your data vector in the Yaml configuration file."
)
# Deal with case when no ID was specified: assume that all data are concerned
satID = list_satIDs[0]
if type(satID) == str:
pass
elif ~pd.notnull(satID):
satID = 0
iq1[:] = satID
# Building the extended dataframe
ds_p = target_datastore["metadata"].set_index("indorig")[
["pressure", "dp"] + (transf.product == "column") * ["airm", "hlay"]]
nlev_model = mapper["inputs"][("concs", ref_parameter)]["domain"].nlev
satmask = iq1 == satID
nobs = np.sum(satmask)
# Stacking output datastore into levels * nobs
native_ind_stack = (
np.flatnonzero(ref_indexes)[satmask]
+ np.arange(nlev_model)[:, np.newaxis]
)
# If all nans in datasim, meaning that the species was not simulated
sim = target_datastore.loc[
:, ("maindata", "spec")].values[native_ind_stack]
if not np.any(pd.notnull(sim)):
return
# Grouping all data from this satellite
datasim = xr.Dataset(
{
"pressure": (
["level", "index"],
np.log(ds_p["pressure"].values[native_ind_stack]),
),
"dp": (
["level", "index"],
ds_p["dp"].values[native_ind_stack],
),
"sim": (["level", "index"], sim),
},
coords={
"index": np.arange(nobs),
"level": np.arange(nlev_model),
},
)
if transf.product == "column":
datasim = datasim.assign(
{
"airm": (
["level", "index"],
ds_p["airm"].values[native_ind_stack],
),
"hlay": (
["level", "index"],
ds_p["hlay"].values[native_ind_stack],
),
}
)
if mode == "tl":
datasim["sim_tl"] = (
["level", "index"],
target_datastore.loc[:, ("maindata", "incr")
].values[native_ind_stack],
)
# convert ppb fields to the correct unit
# from ppb to molec.cm-2 if the satellite product is a column
if transf.product == "column":
keys = ["sim"] + (["sim_tl"] if mode == "tl" else [])
factor = (datasim["hlay"] * 100) / (1e9 / datasim["airm"])
for k in keys:
datasim[k] *= factor
# alternative conversion
# factor2 = 2.120*1e13*datasim["dp"]/100.
# Crop aks depending on split_freq and original file
ddf = ref_tracer.datef
if hasattr(ref_tracer, "split_freq"):
obsvect_dates = pd.date_range(
ref_tracer.datei, ref_tracer.datef,
freq=ref_tracer.split_freq)
i0 = np.argwhere(obsvect_dates == ddi)[0][0]
ddf = obsvect_dates[i0 + 1]
# Check whether there is some ak
info("Fetching satellite infos from files: {}".format(files_aks))
try:
colsat = ["ak", "pavg0", "date",
"index", "station", "duration"]
if transf.use_prior:
colsat += ["qa0"]
if transf.use_drycols:
colsat += ["dryair"]
if transf.precomputed_pwgt:
colsat += ["pw"]
coord2dump = ["index"]
all_sat_aks = []
for ind_file, file_aks in enumerate(files_aks):
# Crop the monitor to replicate what is done
# in pycif/plugins/obsvects/standard/utils/__init__.py
ds_dates = xr.open_dataset(file_aks)[["date", "duration"]]
crop_date = init_empty()
crop_date[("metadata", "date")] = ds_dates["date"].values
crop_date[("metadata", "duration")] = ds_dates["duration"].values
crop_index = crop_monitor(
crop_date, ref_tracer.datei, ref_tracer.datef,
return_index=True
)
if crop_index.size == 0:
continue
# Now extract only data relevant to present sub-period
# This happens only with split_freq
crop_date = crop_date.iloc[crop_index]
if ref_tracer.datei != ddi or ref_tracer.datef != ddf:
crop_index = crop_monitor(
crop_date, ddi, ddf,
return_index=True,
keep_partial=True
)
crop_index = crop_date.index[crop_index].values
# Now read the data itself
sat_aks = read_datastore(
file_aks,
col2dump=colsat,
coord2dump=coord2dump,
keep_default=False,
to_pandas=False
)
if 'index' in sat_aks:
sat_aks = sat_aks.drop('index')
# Check that all variables are float64 to avoid precision issues
for v in sat_aks:
if sat_aks.dtypes[v] == np.dtype('float32'):
sat_aks[v] = sat_aks[v].astype('float64')
# Crop to sub-date
sat_aks["index"] = np.arange(sat_aks.dims["index"])
sat_aks = sat_aks.isel(index=crop_index)
# Keep in memory the file name
sat_aks = sat_aks.assign(
file_aks=("index", sat_aks.dims["index"] * [ind_file]))
if len(sat_aks) == 0:
warning(f"WARNING: There is no data in file {file_aks}")
continue
if len(all_sat_aks) == 0:
all_sat_aks = sat_aks
else:
all_sat_aks = xr.concat([all_sat_aks, sat_aks], "index")
# # Crop the monitor to replicate what is done
# # in pycif/plugins/obsvects/standard/utils/__init__.py
# if hasattr(ref_tracer, "split_freq"):
# crop_date = init_empty()
# crop_date[("metadata", "date")] = all_sat_aks["date"].values
# crop_date[
# ("metadata", "duration")
# ] = all_sat_aks["duration"].values
# ddf = ref_tracer.datef
# obsvect_dates = pd.date_range(
# ref_tracer.datei, ref_tracer.datef,
# freq=ref_tracer.split_freq)
# i0 = np.argwhere(obsvect_dates == ddi)[0][0]
# ddf = obsvect_dates[i0 + 1]
# crop_index = crop_monitor(
# crop_date, ddi, ddf,
# return_index=True,
# keep_partial=True
# )
# Reset index
all_sat_aks["index"] = np.arange(all_sat_aks.dims["index"])
# all_sat_aks = all_sat_aks.isel(index=crop_index)
# If no station was specified in the original file, just fill with 0
if "station" not in all_sat_aks:
all_sat_aks["station"] = xr.full_like(all_sat_aks.index, satID)
# Crop first observations for debugging purposes if specified in yml
if getattr(ref_tracer, "crop_datastore", False):
crop_istart = getattr(ref_tracer, "crop_istart", 0)
all_sat_aks = all_sat_aks.isel(
index=slice(crop_istart, crop_istart + ref_tracer.nobs2crop))
# Selecting only lines used in simulation
mask = all_sat_aks["date"].isin(
target_datastore['metadata']["date"].drop_duplicates()
) & (all_sat_aks["station"] == satID)
sat_aks = all_sat_aks.loc[{"index": mask}]
except IOError:
# Assumes total columns?
raise IOError("Could not fetch "
"satellite info from {}".format(file_aks))
# datastore['qdp'] = datastore['sim'] * datastore['dp']
# groups = datastore.groupby(['indorig'])
# y0.loc[:, 'sim'] += \
# groups['qdp'].sum().values / groups['dp'].sum().values
#
# if 'sim_tl' in datastore:
# datastore['qdp'] = datastore['sim_tl'] * datastore['dp']
# groups = datastore.groupby(['indorig'])
# y0.loc[:, 'sim_tl'] += \
# groups['qdp'].sum().values / groups['dp'].sum().values
# continue
aks = sat_aks["ak"][:, ::-1].T
pavgs = sat_aks["pavg0"][:, ::-1].T
if transf.pressure == "hPa":
pavgs *= 100
# -- For level-based, given pressures are averages over the partial column (n)
# -- For layer-based, given pressures are the ones at inter-levels (n + 1)
level0_size = aks.level.size + 1
level1_size = aks.level.size
if transf.level_based:
level0_size -= 1
coords0 = {"index": np.arange(nobs),
"level": np.arange(level0_size)}
coords1 = {"index": np.arange(nobs),
"level": np.arange(level1_size)}
dims = ("level", "index")
if transf.use_prior:
qa0 = sat_aks["qa0"][:, ::-1].T
else:
qa0 = 0. * aks
if transf.level_based:
pavgs_mid = xr.DataArray(
np.log(pavgs.values), coords0, dims).bfill("level")
# Assuming goes from surface to the top of the atmosphere
flip_pressure = False
if not np.all(np.diff(pavgs.values, axis=0) > 0):
pavgs.values = np.flip(pavgs.values, axis=0)
flip_pressure = True
# Computes dpavgs from top to bottom
dpavgs = np.diff(pavgs.values, axis=0)
dpavgs = np.concatenate([
np.maximum(0, pavgs.values[[0]] - dpavgs[[0]] / 2),
pavgs.values[1:] - dpavgs / 2,
pavgs.values[[-1]] + dpavgs[[-1]] / 2
], axis=0)
dpavgs = xr.DataArray(np.diff(dpavgs, axis=0), coords1, dims)
# Flip back pressures
if flip_pressure:
pavgs.values = np.flip(pavgs.values, axis=0)
dpavgs.values = np.flip(dpavgs.values, axis=0)
else:
try:
pavgs = xr.DataArray(pavgs, coords0, dims).bfill("level")
except:
print(__file__)
import code
code.interact(local=dict(locals(), **globals()))
dpavgs = np.abs(xr.DataArray(np.diff(-pavgs, axis=0), coords1, dims))
pavgs_mid = xr.DataArray(
np.log(0.5 * (pavgs[:-1].values + pavgs[1:].values)),
coords1, dims)
# Exclude observations where there are all zeros in the simulation
# exclude_zeros = np.where(sim.sum(axis=0) != 0)[0]
exclude_zeros = np.where(pd.notnull(sim.sum(axis=0)))[0]
if exclude_zeros.size == 0:
warning(
"All simulations are invalid. Skipping satellite computations for "
f"{ref_component} / {ref_parameter} and period {ddi}. \n"
f"This can be related to: \n"
f" - all observations are outside the domain of simulation\n"
f" - simulations are 0 for a reasonable cause?\n"
f" - you have a bug somewhere upstream this transformation..."
)
return
datasim = datasim.isel(index=exclude_zeros)
sat_aks = sat_aks.isel(index=exclude_zeros)
aks = aks.isel(index=exclude_zeros)
pavgs_mid = pavgs_mid.isel(index=exclude_zeros)
dpavgs = dpavgs.isel(index=exclude_zeros)
qa0 = qa0.isel(index=exclude_zeros)
nobs = datasim.dims["index"]
# Adding dry air mole fraction if formula 5
if transf.use_drycols:
drycols = sat_aks["dryair"][:, ::-1].T
else:
drycols = qa0 * 0.0 + 1
# Pressure weight to apply on columns
pwgt = dpavgs / dpavgs.sum(axis=0)
if not transf.scale_dpressure:
pwgt = 0 * pwgt + 1
elif transf.precomputed_pwgt:
pwgt = sat_aks["pw"][:, ::-1].T
# Fetch values if fill strato
if transf.fill_strato:
strato_mapper = mapper["inputs"][("stratosphere",
ref_parameter)]
strato_sigma_a = strato_mapper["domain"].sigma_a_mid
strato_sigma_b = strato_mapper["domain"].sigma_b_mid
strato_sigma_a_interface = strato_mapper["domain"].sigma_a
strato_sigma_b_interface = strato_mapper["domain"].sigma_b
strato_nlev = len(strato_sigma_a)
# Expand debug_datastore with info
# from previous transforms in strato
if save_debug:
trid_strato = ("stratosphere", ref_parameter)
df_debug = pd.DataFrame({
c: inout_datastore["inputs"][trid_strato][ddi][c].iloc[::strato_nlev]
for c in inout_datastore["inputs"][trid_strato][ddi].columns
if c[0] not in y0.columns.get_level_values(level=0)
})
y0 = pd.concat([y0, df_debug], axis=1)
# Interpolating simulated values to averaging kernel pressures
sim_ak = 0.0 * pavgs_mid
sim_ak_tl = 0.0 * pavgs_mid
if transf.split_tropo_strato:
sim_ak_tropo = 0.0 * pavgs_mid
sim_ak_tl_tropo = 0.0 * pavgs_mid
sim_ak_strato = 0.0 * pavgs_mid
sim_ak_tl_strato = 0.0 * pavgs_mid
sim_pressure = datasim["pressure"].values
sim_dpressure = datasim["dp"].values
sim = datasim["sim"].values
if mode == "tl":
sim_tl = datasim["sim_tl"].values
# Fetch missing values from stratosphere
if transf.fill_strato:
psurf_strato = np.exp(sim_pressure[0])
pstrato = np.log(
strato_sigma_b * psurf_strato[:, np.newaxis]
+ strato_sigma_a).T
pstrato_interface = (
strato_sigma_b_interface * psurf_strato[:, np.newaxis]
+ strato_sigma_a_interface
).T
dpstrato = np.abs(np.diff(pstrato_interface, axis=0))
# Here merge sim_pressure
# and pstrato properly, then apply vertical_interp
nblevbasic = np.shape(sim_pressure)[0]
cond = pstrato < sim_pressure[-1]
missing_levels = np.argmax(cond, axis=0)
missing_levels[~np.any(cond, axis=0)] = pstrato.shape[0]
nblevremainingstrato = pstrato.shape[0] - missing_levels.min()
sim_pressure_varying = np.full(
(nblevbasic + nblevremainingstrato, len(datasim.index)), 0.)
sim_pressure_varying[:nblevbasic] = sim_pressure[:]
sim_dpressure_varying = np.full(
(nblevbasic + nblevremainingstrato, len(datasim.index)), 0.)
sim_dpressure_varying[:nblevbasic] = sim_dpressure[:]
if ~np.all(np.any(cond, axis=0)):
warning(
"Stratosphere is used with strato pressure too high to complete the "
"troposphere data set. Please check that your yml does the expected "
"configuration."
)
# Filling pressure levels in stratosphere
lev_index_target = [
i for m in missing_levels
for i in list(range(nblevbasic,
nblevbasic + pstrato.shape[0] - m))]
lev_index_orig = [i for m in missing_levels
for i in list(range(m, pstrato.shape[0]))]
obs_index_target = [
j for j, m in enumerate(missing_levels)
for i in list(range(nblevbasic,
nblevbasic + pstrato.shape[0] - m))]
np.add.at(sim_pressure_varying,
(lev_index_target, obs_index_target),
pstrato[lev_index_orig, obs_index_target]
)
sim_pressure = copy.deepcopy(sim_pressure_varying)
# Filling dpressure values in stratosphere
np.add.at(sim_dpressure_varying,
(lev_index_target, obs_index_target),
dpstrato[lev_index_orig, obs_index_target]
)
sim_dpressure = copy.deepcopy(sim_dpressure_varying)
# Stacking strato dataframe to column shape
strato = inout_datastore["inputs"][
("stratosphere", ref_parameter)][ddi]
sim_strato_ref = \
strato["maindata"]["spec"].values.reshape(
(strato_nlev, -1), order="F")[:, exclude_zeros]
incr_strato = 0. * sim_strato_ref
if "incr" in strato["maindata"]:
incr_strato = \
strato["maindata"]["incr"].values.reshape(
(strato_nlev, -1), order="F")
# ppb to molec/cm2 if column product
if transf.product == "column":
dpstrato = \
np.diff(np.concatenate(
[psurf_strato[np.newaxis, :], np.exp(pstrato)],
axis=0), axis=0) / 100 # hPa
G = 9.81
dmass = np.abs(dpstrato / G) # kg/m2
column = dmass * 1e3 # g/m2
column /= 28.96 * 1e4 # mol/cm2
column *= 6.02214076e23 # molec / cm2
column /= 1e9 # scaling from ppb
sim_strato_ref *= column
incr_strato *= column
if transf.split_tropo_strato:
debug("BEWARE, "
"\"if\" not tested in case of varying missing levels")
sim_tropo = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
sim_tropo[:nblevbasic] = sim[:]
sim_strato = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
np.add.at(
sim_strato,
(lev_index_target, obs_index_target),
sim_strato_ref[lev_index_orig, obs_index_target]
)
if mode == "tl":
sim_tropo_tl = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
sim_tropo_tl[:nblevbasic] = sim_tl[:]
sim_strato_tl = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
np.add.at(
sim_strato_tl,
(lev_index_target, obs_index_target),
incr_strato[lev_index_orig, obs_index_target]
)
sim_varying = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
sim_varying[:nblevbasic] = sim[:]
np.add.at(sim_varying,
(lev_index_target, obs_index_target),
sim_strato_ref[lev_index_orig, obs_index_target]
)
sim = copy.deepcopy(sim_varying)
if mode == "tl":
sim_tl_varying = np.full(
(nblevbasic + nblevremainingstrato, len(datasim.index)), 0.)
sim_tl_varying[:nblevbasic, :] += sim_tl[:]
np.add.at(sim_tl_varying,
(lev_index_target, obs_index_target),
incr_strato[lev_index_orig, obs_index_target]
)
sim_tl = copy.deepcopy(sim_tl_varying)
# Forward fill with last top value available
mask = np.full((nblevbasic + nblevremainingstrato,
len(datasim.index)), 0.)
mask[:nblevbasic] = 1
np.add.at(mask,
(lev_index_target, obs_index_target),
np.ones(len(lev_index_target))
)
idx = np.where(mask == 1, np.arange(mask.shape[0])[:, np.newaxis], 0)
np.maximum.accumulate(idx, axis=0, out=idx)
sim = sim[idx, np.arange(nobs)[np.newaxis, :]]
if mode == "tl":
sim_tl = sim_tl[idx, np.arange(nobs)[np.newaxis, :]]
# end fillstrato
# Vertical interpolation
xlow, xhigh, alphalow, alphahigh = vertical_interp(
sim_pressure,
sim_dpressure,
pavgs_mid.values,
dpavgs.values,
transf.cropstrato,
transf.vinterp_type,
transf.weights_nsubsteps
)
# Applying coefficients
meshout_wgt = (
np.arange(pavgs_mid.shape[0])[:, np.newaxis]
* np.ones((1, len(datasim.index)))
).astype(int)
meshout_wgt_iobs = (
np.arange(len(datasim.index))[np.newaxis]
* np.ones((pavgs_mid.shape[0], 1))
).astype(int)
if transf.vinterp_type == "weight":
meshout_wgt = np.floor(
np.linspace(0, pavgs_mid.shape[0],
pavgs_mid.shape[0] * transf.weights_nsubsteps,
endpoint=False
)[:, np.newaxis] * np.ones((1, len(datasim.index)))
).astype(int)
meshout_wgt_iobs = (
np.arange(len(datasim.index))[np.newaxis]
* np.ones((pavgs_mid.shape[0] * transf.weights_nsubsteps, 1))
).astype(int)
np.add.at(
sim_ak.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim[xlow, meshout_wgt_iobs]
+ alphahigh * sim[xhigh, meshout_wgt_iobs]
)
if mode == "tl":
np.add.at(
sim_ak_tl.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim_tl[xlow, meshout_wgt_iobs]
+ alphahigh * sim_tl[xhigh, meshout_wgt_iobs]
)
# Apply coefficient to "partial" columns
if transf.split_tropo_strato:
np.add.at(
sim_ak_tropo.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim_tropo[xlow, meshout_wgt_iobs]
+ alphahigh * sim_tropo[xhigh, meshout_wgt_iobs]
)
np.add.at(
sim_ak_strato.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim_strato[xlow, meshout_wgt_iobs]
+ alphahigh * sim_strato[xhigh, meshout_wgt_iobs]
)
if mode == "tl":
np.add.at(
sim_ak_tl_tropo.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim_tropo_tl[xlow, meshout_wgt_iobs]
+ alphahigh * sim_tropo_tl[xhigh, meshout_wgt_iobs]
)
np.add.at(
sim_ak_tl_strato.values,
(meshout_wgt, meshout_wgt_iobs),
alphalow * sim_strato_tl[xlow, meshout_wgt_iobs]
+ alphahigh * sim_strato_tl[xhigh, meshout_wgt_iobs]
)
# Correction with the pressure thickness
target_datastore[("metadata", "pthick")] = np.nan
if transf.correct_pthick:
if transf.product == "column":
scale_pthick = (sim.sum(axis=0) / sim_ak.sum(axis=0)).values
else:
scale_pthick = (
(sim_dpressure * sim).sum(axis=0)
/ (dpavgs * sim_ak).sum(axis=0).values
* dpavgs.sum(axis=0).values
/ sim_dpressure.sum(axis=0)
)
sim_ak *= scale_pthick
if 'sim_tl' in datasim:
sim_ak_tl *= scale_pthick
target_datastore.iloc[
np.flatnonzero(ref_indexes)[satmask][exclude_zeros],
target_datastore.columns.get_loc(("metadata", "pthick"))
] = scale_pthick
if transf.split_tropo_strato:
sim_ak_tropo *= scale_pthick
sim_ak_strato *= scale_pthick
if 'sim_tl' in datasim:
sim_ak_tl_tropo *= scale_pthick
sim_ak_tl_strato *= scale_pthick
# Applying aks
nbformula = transf.formula
chosenlevel = transf.chosenlev
debug("product: {}".format(transf.product))
debug("nbformula: {}".format(nbformula))
debug("chosenlev: {}".format(chosenlevel))
out_index = satmask.iloc[exclude_zeros].index
y0.loc[out_index, ("maindata", "spec")] = \
apply_ak(
sim_ak.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
if mode == "tl":
y0.loc[out_index, ("maindata", "incr")] = \
apply_ak_tl(
sim_ak_tl.values, sim_ak.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
if transf.split_tropo_strato:
y0.loc[satmask.iloc[exclude_zeros].index,
(transf.transform_id, "tropo")] = \
apply_ak(
sim_ak_tropo.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
y0.loc[satmask.iloc[exclude_zeros].index,
(transf.transform_id, "strato")] = \
apply_ak(
sim_ak_strato.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
if mode == "tl":
y0.loc[satmask.iloc[exclude_zeros].index,
(transf.transform_id, "tropo_tl")] = \
apply_ak_tl(
sim_ak_tl_tropo.values, sim_ak.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
y0.loc[satmask.iloc[exclude_zeros].index,
(transf.transform_id, "strato_tl")] = \
apply_ak_tl(
sim_ak_tl_strato.values, sim_ak.values, aks.values, pwgt.values,
drycols.values, qa0.values,
use_drycols=transf.use_drycols, chosen_level=chosenlevel,
scale_factor=transf.unit_scaling,
normalize_columns=transf.normalize_columns,
log_space=transf.log_space
)
# Saves sim_ak for later use by adjoint
if transf.log_space or transf.force_dump_sim_aks:
file_dump = ddi.strftime(
"{}/sim_ak_{}_%Y%m%d%H%M.nc".format(dir_fwd, satID))
sim_ak.to_netcdf(file_dump)
inout_datastore["outputs"][(ref_component, ref_parameter)][ddi] = y0
# Dump information about what monitor file was used with which observation
infos = sat_aks["file_aks"].to_dataframe().reset_index().rename(
columns={"index": "orig_index", "file_aks": "file_id"})
infos.loc[:, "file_aks"] = pd.DataFrame(
{"files_aks": files_aks.flatten()},
dtype=files_aks.dtype,
index=np.arange(len(files_aks), dtype=int)
).loc[infos["file_id"].values].values.flatten()
del infos["file_id"]
todump = pd.DataFrame(index=y0.index)
todump = todump.reindex(
columns=todump.columns.tolist()
+ [("metadata", "orig_index"), ("metadata", "file_aks")])
todump.loc[satmask.iloc[exclude_zeros].index,
[("metadata", "orig_index"), ("metadata", "file_aks")]] = infos.values
file_infos = ddi.strftime(
"{}/infos_%Y%m%d%H%M.nc".format(dir_fwd)
)
dump_datastore(
todump,
file_monit=file_infos,
dump_default=False,
col2dump=[("metadata", "orig_index"), ("metadata", "file_aks")],
mode="w",
)
# Save exclude_zeros in forward datastore
mask_zeros = target_datastore[("metadata", "indorig")].isin(exclude_zeros)
target_datastore[("metadata", "exclude_zeros")] = True
target_datastore.loc[mask_zeros, ("metadata", "exclude_zeros")] = False
# Save forward datastore for later use by adjoint
file_monit = ddi.strftime(
"{}/monit_%Y%m%d%H%M.nc"
.format(dir_fwd)
)
col2dump = ["pressure", "dp", "indorig",
"hlay", "airm", "sim", "pthick", "exclude_zeros"]
dump_datastore(
target_datastore,
file_monit=file_monit,
dump_default=False,
col2dump=col2dump,
mode="w",
)