import datetime
import numpy as np
from logging import warning
from itertools import chain
from ....utils.classes.setup import Setup
from ....utils.classes.domains import Domain
from ....utils.check.errclass import PluginError
[docs]
def ini_mapper(model, general_mapper={}, backup_comps={},
transforms_order=[], ref_transform="",
transform_name="", **kwargs):
input_intervals = {
ddi: np.append(
model.input_dates[ddi][:, np.newaxis],
model.input_dates[ddi][:, np.newaxis]
+ datetime.timedelta(hours=1),
axis=1)
for ddi in model.input_dates}
output_intervals = {
ddi: np.append(
model.tstep_dates[ddi][:-1, np.newaxis],
model.tstep_dates[ddi][1:, np.newaxis],
axis=1)
for ddi in model.input_dates}
basic_dict = {"force_dump": True}
default_dict = {
"input_dates": input_intervals, "force_dump": True,
"sparse_data": False,
"break_fwd_onlyinit_pipe": False,
"break_adj_onlyinit_pipe": False,
"force_loadin": False,
"sampled": False
}
dict_surface = dict(
default_dict,
**{"domain": model.domain,
"fixed_domain": True,
"unit": "molecules/cm2/s",
"recombine_periods": False})
dict_bound = dict(dict_surface, **{"is_lbc": True, "unit": "ppb"})
dict_top = dict(dict_surface, **{"is_top": True, "unit": "ppb"})
dict_ini = dict(
dict_surface,
**{"input_dates": {model.datei: np.array([[model.datei, model.datei]])},
"unit": "ppb"}
)
# For AEMISSIONS, alter the model domain to be consistent with nlevemis
domain_in = model.domain
domain_out = Setup.load_registered(
domain_in.plugin.name, domain_in.plugin.version,
"domain", plg_orig=domain_in
)
domain_out.nlon = domain_in.nlon
domain_out.nlat = domain_in.nlat
domain_out.zlon = domain_in.zlon
domain_out.zlat = domain_in.zlat
domain_out.zlonc = domain_in.zlonc
domain_out.zlatc = domain_in.zlatc
domain_out.nlon_side = domain_in.nlon_side
domain_out.nlat_side = domain_in.nlat_side
domain_out.zlonc_side = domain_in.zlonc_side
domain_out.zlatc_side = domain_in.zlatc_side
domain_out.zlon_side = domain_in.zlon_side
domain_out.zlat_side = domain_in.zlat_side
domain_out.pressure_unit = domain_in.pressure_unit
domain_out.nlev = model.nlevemis
domain_out.sigma_a = domain_in.sigma_a[:domain_out.nlev + 1]
domain_out.sigma_b = domain_in.sigma_b[:domain_out.nlev + 1]
domain_out.sigma_a_mid = domain_in.sigma_a_mid[:domain_out.nlev]
domain_out.sigma_b_mid = domain_in.sigma_b_mid[:domain_out.nlev]
dict_aemis = dict(default_dict,
**{"domain": domain_out,
"tracer_from_previous": True})
# For BEMISSIONS, alter the model domain to be consistent with nlevemis=1
domain_in = model.domain
domain_out = Setup.load_registered(
domain_in.plugin.name, domain_in.plugin.version,
"domain", plg_orig=domain_in
)
domain_out.nlon = domain_in.nlon
domain_out.nlat = domain_in.nlat
domain_out.zlon = domain_in.zlon
domain_out.zlat = domain_in.zlat
domain_out.zlonc = domain_in.zlonc
domain_out.zlatc = domain_in.zlatc
domain_out.nlon_side = domain_in.nlon_side
domain_out.nlat_side = domain_in.nlat_side
domain_out.zlonc_side = domain_in.zlonc_side
domain_out.zlatc_side = domain_in.zlatc_side
domain_out.zlon_side = domain_in.zlon_side
domain_out.zlat_side = domain_in.zlat_side
domain_out.pressure_unit = domain_in.pressure_unit
domain_out.nlev = 1
domain_out.sigma_a = domain_in.sigma_a[:domain_out.nlev + 1]
domain_out.sigma_b = domain_in.sigma_b[:domain_out.nlev + 1]
domain_out.sigma_a_mid = domain_in.sigma_a_mid[:domain_out.nlev]
domain_out.sigma_b_mid = domain_in.sigma_b_mid[:domain_out.nlev]
dict_bemis = dict(default_dict, **{"domain": domain_out})
# Outputs
mapper = {
"inputs": {},
"outputs": {
(outcomp, s): {
"isobs": True, "force_loadout": True,
"input_dates": output_intervals,
"domain": model.domain,
"sampled": True,
"sparse_data": False,
"continuous_hdomain": False,
"continuous_vdomain": False,
"break_adj_onlyinit_pipe": False,
"break_fwd_onlyinit_pipe": False,
"keep_data_after_init": False
}
for s in model.chemistry.outspecies.attributes
for outcomp in model.output_components
},
}
# Emissions for emitted species
emis = {
("flux", s): dict_aemis
for s in model.chemistry.emis_species.attributes
}
bioemis = {
("bioflux", s): dict_bemis
for s in model.chemistry.bio_species.attributes
}
# Check chemical scheme with regard to emitted species
if model.chemistry.emis_species.attributes == []:
raise Exception("WARNING: There is no anthropogenic emitted species "
"in your chemical scheme\n"
"Please check the file ANTHROPIC "
"in the chemical scheme folder: {}/{}"
.format(model.chemistry.dir_precomp,
model.chemistry.schemeid))
if model.chemistry.bio_species.attributes == []:
warning("WARNING: There is no biogenic emitted species "
"in your chemical scheme")
# Initial conditions for all active species (for the first period)
inicond = {
("inicond", s): dict_ini for s in model.chemistry.acspecies.attributes
}
# End concentrations from previous period for all active species
# are needed for later periods
endconcs_in = {
("endconcs", s):
{"input_dates": {
ddi: np.array(
[[model.input_dates[ddi][0],
model.input_dates[ddi][0]]])
for ddi in list(model.input_dates.keys())[1:]},
"domain": model.domain, "force_dump": True,
"sparse_data": False,
"sampled": False,
"break_fwd_onlyinit_pipe": True,
"break_adj_onlyinit_pipe": False
}
for s in model.chemistry.acspecies.attributes
}
# End concentrations are saved for all periods
endconcs_out = {
("endconcs", s):
{"input_dates": {
ddi: np.array([[model.input_dates[ddi][-1],
model.input_dates[ddi][-1]]])
for ddi in model.input_dates},
"domain": model.domain, "force_loadout": True,
"sparse_data": False,
"sampled": False,
"break_fwd_onlyinit_pipe": True,
"break_adj_onlyinit_pipe": True}
for s in model.chemistry.acspecies.attributes
}
# Lateral and top conditions for all active species
lbc = {
("latcond", s): dict_bound
for s in model.chemistry.acspecies.attributes
}
top = {
("topcond", s): dict_top
for s in model.chemistry.acspecies.attributes
}
# Meteo dictionary
if not getattr(model, 'pre-computed-meteo'):
meteo = {**{
("meteo", s): dict_surface for s in model.meteo_parameters_3d
}, **{
("meteo", s): dict_bemis for s in model.meteo_parameters_2d
}}
else:
meteo = {("meteo", ''): dict_surface}
# Put everything in input dictionary, plus outputs for end concentrations
mapper["inputs"].update(
{**emis, **lbc, **top, **endconcs_in, **meteo})
mapper["outputs"].update(endconcs_out)
if model.useemisb:
mapper["inputs"].update(bioemis)
# Different choice regarding inicond whether in a loop of ensemble method
if not getattr(model, "ensrf_restart_file", False):
mapper["inputs"].update(inicond)
else:
mapper["inputs"].update(
{("restart_inicond", s):
dict_ini
for s in model.chemistry.acspecies.attributes
}
)
# Accepts backup components instead of reference ones
backup_comps.update(model.backup_comps)
# Force the transformation to be in its own precursors and successors
# to propagate end concentrations
mapper["precursors"] = {trid: [transform_name] for trid in endconcs_in}
mapper["successors"] = {trid: [transform_name] for trid in endconcs_in}
# Save propagation of perturbations
inputs = ["latcond", "topcond", "endconcs", "inicond"]
if getattr(model, "ensrf_restart_file", False):
inputs[-1] = "restart_inicond"
mapper["outputs2inputs"] = {}
for trid in mapper["outputs"]:
outcomp, s = trid
# Add all species that directly or indirectly produce the species 's'
species_to_link = [s] + [
sin
for sin in model.chemistry.inout_reaction_graph
if s in model.chemistry.inout_reaction_graph[sin]
]
# Now loop in input species and update outputs2inputs
# For species not outputed, just propagate endconcs
mapper["outputs2inputs"][trid] = list(chain.from_iterable(
[(cmp, sin) for cmp in inputs]
+ ([("flux", sin)] if sin in model.chemistry.emis_species.attributes
else [])
+ ([("bioflux", sin)] if sin in model.chemistry.bio_species.attributes
else [])
for sin in species_to_link
))
# Simplify outputs2inputs
mapper["outputs2inputs"] = {
tr: list(set(mapper["outputs2inputs"][tr]))
for tr in mapper["outputs2inputs"]
}
return mapper