import numpy as np
import datetime
from ....utils.classes.setup import Setup
[docs]
def ini_mapper(model, transform_type, general_mapper={}, backup_comps={},
transforms_order=[], ref_transform="", transform_name="",
**kwargs):
input_intervals = {
ddi: np.append(
model.input_dates[ddi][:-1, np.newaxis],
model.input_dates[ddi][1:, np.newaxis],
axis=1)
for ddi in model.input_dates}
flx_intervals = {
ddi: np.append(
model.flx_input_dates[ddi][:-1, np.newaxis],
model.flx_input_dates[ddi][1:, np.newaxis],
axis=1)
for ddi in model.flx_input_dates}
output_intervals = {
ddi: np.append(
model.tstep_dates[ddi][:-1, np.newaxis],
model.tstep_dates[ddi][1:, np.newaxis],
axis=1)
for ddi in model.input_dates}
meteo_intervals = {
ddi: np.append(
model.meteo_dates[ddi][:-1, np.newaxis],
model.meteo_dates[ddi][1:, np.newaxis],
axis=1)
for ddi in model.meteo_dates}
default_dict = {
"input_dates": input_intervals, "force_dump": True,
"sparse_data": False,
"break_fwd_onlyinit_pipe": False,
"break_adj_onlyinit_pipe": False
}
meteo_dict = {"input_dates": meteo_intervals, "force_dump": True,
"sparse_data": False,
"break_fwd_onlyinit_pipe": False,
"break_adj_onlyinit_pipe": False}
dict_surface = dict(default_dict, **{"domain": model.domain,
"fixed_domain": True,
})
dict_ini = dict(
dict_surface, **{"input_dates": {
model.datei: np.array([[model.datei, model.datei]])}}
)
# For flx, alter the model domain to be consistent with nlevemis
domain_in = model.domain
domain_out = Setup.load_registered(
domain_in.plugin.name, domain_in.plugin.version,
"domain", plg_orig=domain_in
)
domain_out.nlon = domain_in.nlon
domain_out.nlat = domain_in.nlat
domain_out.zlon = domain_in.zlon
domain_out.zlat = domain_in.zlat
domain_out.zlonc = domain_in.zlonc
domain_out.zlatc = domain_in.zlatc
domain_out.nlon_side = domain_in.nlon_side
domain_out.nlat_side = domain_in.nlat_side
domain_out.zlonc_side = domain_in.zlonc_side
domain_out.zlatc_side = domain_in.zlatc_side
domain_out.zlon_side = domain_in.zlon_side
domain_out.zlat_side = domain_in.zlat_side
domain_out.pressure_unit = domain_in.pressure_unit
domain_out.nlev = 1
domain_out.sigma_a = np.array([0, 0])
domain_out.sigma_b = np.array([1, 1])
domain_out.sigma_a_mid = np.array([0])
domain_out.sigma_b_mid = np.array([1])
dict_aemis = dict(
dict_surface, **{"domain": domain_out,
"input_dates": flx_intervals,
"unit": "kg/m2/s"
})
# Executable
mapper = {
"inputs": {
("meteo", ""): meteo_dict,
},
"outputs": {
(outcomp, s): {
"isobs": True, "force_loadout": True,
"input_dates": output_intervals,
"domain": model.domain,
"sampled": True,
"sparse_data": False,
"break_adj_onlyinit_pipe": False,
"break_fwd_onlyinit_pipe": False
}
for s in model.chemistry.acspecies.attributes
for outcomp in ["concs", "pressure", "dpressure", "airm", "hlay"]
}
}
# Inputs dict
emis = {
("flux", s): dict_aemis
for s in model.chemistry.emis_species.attributes
}
inicond = {
("inicond", s): dict_ini for s in model.chemistry.acspecies.attributes
}
prescrcond = {}
if hasattr(model.chemistry, "prescrconcs") and model.do_chemistry:
prescrcond = {
("prescrconcs", s): dict_surface
for s in model.chemistry.prescrconcs.attributes
}
# End concentrations from previous period for all active species
# are needed for later periods
endconcs_in = {
("endconcs", s):
{"input_dates": {
ddi: np.array(
[[model.input_dates[ddi][0],
model.input_dates[ddi][0]]])
for ddi in list(model.input_dates.keys())[1:]},
"domain": model.domain, "force_dump": True,
"sparse_data": False,
"break_fwd_onlyinit_pipe": True,
"break_adj_onlyinit_pipe": False
}
for s in model.chemistry.acspecies.attributes
}
# End concentrations are saved for all periods
endconcs_out = {
("endconcs", s):
{"input_dates": {
ddi: np.array(
[[model.input_dates[ddi][-1],
model.input_dates[ddi][-1]]])
for ddi in model.input_dates},
"domain": model.domain, "force_loadout": True,
"sparse_data": False,
"break_fwd_onlyinit_pipe": True,
"break_adj_onlyinit_pipe": True}
for s in model.chemistry.acspecies.attributes
}
# Photochemistry
photj = {}
if model.do_chemistry:
list_var = ['pmid', 'temp']
if hasattr(model.chemistry, "photo_rates"):
list_var.extend(
[j[1][1] for j in model.chemistry.photo_rates.iterrows()]
)
photj = {
("kinetic", l): dict_surface
for l in list_var
}
model.photj = photj
# Production and loss
prodloss3d = {}
if hasattr(model.chemistry, "prodloss3d") and model.do_chemistry:
prodloss3d = {
("prodloss3d", s): dict_surface
for s in model.chemistry.prodloss3d.attributes
}
mapper["inputs"].update(
{**emis, **prescrcond, **prodloss3d, **photj, **endconcs_in})
mapper["outputs"].update(endconcs_out)
# Different choice regarding inicond whether in a loop of ensemble method
if not getattr(model, "ensrf_restart_file", False):
mapper["inputs"].update(inicond)
else:
mapper["inputs"].update(
{("restart_inicond", s):
dict_ini
for s in model.chemistry.acspecies.attributes
}
)
# Accepts backup components instead of reference ones
if hasattr(model, "backup_comps"):
backup_comps.update(model.backup_comps)
# Force the transformation to be in its own precursors and successors
# to propagate end concentrations
mapper["precursors"] = {trid: [transform_name] for trid in endconcs_in}
mapper["successors"] = {trid: [transform_name] for trid in endconcs_in}
# Save propagation of perturbations
inputs = ["inicond", "endconcs", "restart_inicond"]
mapper["outputs2inputs"] = {
(outcomp, s):
[(cmp, s) for cmp in inputs]
+ ([("flux", s)] if s in model.chemistry.emis_species.attributes
else [])
for s in model.chemistry.acspecies.attributes
for outcomp in model.output_components
}
return mapper