Source code for pycif.plugins.models.chimere.ini_mapper

import datetime
import numpy as np
from logging import warning
from itertools import chain

from ....utils.classes.setup import Setup
from ....utils.classes.domains import Domain

from ....utils.check.errclass import PluginError


[docs] def ini_mapper(model, general_mapper={}, backup_comps={}, transforms_order=[], ref_transform="", transform_name="", **kwargs): input_intervals = { ddi: np.append( model.input_dates[ddi][:, np.newaxis], model.input_dates[ddi][:, np.newaxis] + datetime.timedelta(hours=1), axis=1) for ddi in model.input_dates} output_intervals = { ddi: np.append( model.tstep_dates[ddi][:-1, np.newaxis], model.tstep_dates[ddi][1:, np.newaxis], axis=1) for ddi in model.input_dates} basic_dict = {"force_dump": True} default_dict = { "input_dates": input_intervals, "force_dump": True, "sparse_data": False, "break_fwd_onlyinit_pipe": False, "break_adj_onlyinit_pipe": False, "force_loadin": False, "sampled": False } dict_surface = dict( default_dict, **{"domain": model.domain, "fixed_domain": True, "unit": "molecules/cm2/s", "recombine_periods": False}) dict_bound = dict(dict_surface, **{"is_lbc": True, "unit": "ppb"}) dict_top = dict(dict_surface, **{"is_top": True, "unit": "ppb"}) dict_ini = dict( dict_surface, **{"input_dates": {model.datei: np.array([[model.datei, model.datei]])}, "unit": "ppb"} ) # For AEMISSIONS, alter the model domain to be consistent with nlevemis domain_in = model.domain domain_out = Setup.load_registered( domain_in.plugin.name, domain_in.plugin.version, "domain", plg_orig=domain_in ) domain_out.nlon = domain_in.nlon domain_out.nlat = domain_in.nlat domain_out.zlon = domain_in.zlon domain_out.zlat = domain_in.zlat domain_out.zlonc = domain_in.zlonc domain_out.zlatc = domain_in.zlatc domain_out.nlon_side = domain_in.nlon_side domain_out.nlat_side = domain_in.nlat_side domain_out.zlonc_side = domain_in.zlonc_side domain_out.zlatc_side = domain_in.zlatc_side domain_out.zlon_side = domain_in.zlon_side domain_out.zlat_side = domain_in.zlat_side domain_out.pressure_unit = domain_in.pressure_unit domain_out.nlev = model.nlevemis domain_out.sigma_a = domain_in.sigma_a[:domain_out.nlev + 1] domain_out.sigma_b = domain_in.sigma_b[:domain_out.nlev + 1] domain_out.sigma_a_mid = domain_in.sigma_a_mid[:domain_out.nlev] domain_out.sigma_b_mid = domain_in.sigma_b_mid[:domain_out.nlev] dict_aemis = dict(default_dict, **{"domain": domain_out, "tracer_from_previous": True}) # For BEMISSIONS, alter the model domain to be consistent with nlevemis=1 domain_in = model.domain domain_out = Setup.load_registered( domain_in.plugin.name, domain_in.plugin.version, "domain", plg_orig=domain_in ) domain_out.nlon = domain_in.nlon domain_out.nlat = domain_in.nlat domain_out.zlon = domain_in.zlon domain_out.zlat = domain_in.zlat domain_out.zlonc = domain_in.zlonc domain_out.zlatc = domain_in.zlatc domain_out.nlon_side = domain_in.nlon_side domain_out.nlat_side = domain_in.nlat_side domain_out.zlonc_side = domain_in.zlonc_side domain_out.zlatc_side = domain_in.zlatc_side domain_out.zlon_side = domain_in.zlon_side domain_out.zlat_side = domain_in.zlat_side domain_out.pressure_unit = domain_in.pressure_unit domain_out.nlev = 1 domain_out.sigma_a = domain_in.sigma_a[:domain_out.nlev + 1] domain_out.sigma_b = domain_in.sigma_b[:domain_out.nlev + 1] domain_out.sigma_a_mid = domain_in.sigma_a_mid[:domain_out.nlev] domain_out.sigma_b_mid = domain_in.sigma_b_mid[:domain_out.nlev] dict_bemis = dict(default_dict, **{"domain": domain_out}) # Outputs mapper = { "inputs": {}, "outputs": { (outcomp, s): { "isobs": True, "force_loadout": True, "input_dates": output_intervals, "domain": model.domain, "sampled": True, "sparse_data": False, "continuous_hdomain": False, "continuous_vdomain": False, "break_adj_onlyinit_pipe": False, "break_fwd_onlyinit_pipe": False, "keep_data_after_init": False } for s in model.chemistry.outspecies.attributes for outcomp in model.output_components }, } # Emissions for emitted species emis = { ("flux", s): dict_aemis for s in model.chemistry.emis_species.attributes } bioemis = { ("bioflux", s): dict_bemis for s in model.chemistry.bio_species.attributes } # Check chemical scheme with regard to emitted species if model.chemistry.emis_species.attributes == []: raise Exception("WARNING: There is no anthropogenic emitted species " "in your chemical scheme\n" "Please check the file ANTHROPIC " "in the chemical scheme folder: {}/{}" .format(model.chemistry.dir_precomp, model.chemistry.schemeid)) if model.chemistry.bio_species.attributes == []: warning("WARNING: There is no biogenic emitted species " "in your chemical scheme") # Initial conditions for all active species (for the first period) inicond = { ("inicond", s): dict_ini for s in model.chemistry.acspecies.attributes } # End concentrations from previous period for all active species # are needed for later periods endconcs_in = { ("endconcs", s): {"input_dates": { ddi: np.array( [[model.input_dates[ddi][0], model.input_dates[ddi][0]]]) for ddi in list(model.input_dates.keys())[1:]}, "domain": model.domain, "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": False } for s in model.chemistry.acspecies.attributes } # End concentrations are saved for all periods endconcs_out = { ("endconcs", s): {"input_dates": { ddi: np.array([[model.input_dates[ddi][-1], model.input_dates[ddi][-1]]]) for ddi in model.input_dates}, "domain": model.domain, "force_loadout": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": True} for s in model.chemistry.acspecies.attributes } # Lateral and top conditions for all active species lbc = { ("latcond", s): dict_bound for s in model.chemistry.acspecies.attributes } top = { ("topcond", s): dict_top for s in model.chemistry.acspecies.attributes } # Meteo dictionary if not getattr(model, 'pre-computed-meteo'): meteo = {**{ ("meteo", s): dict_surface for s in model.meteo_parameters_3d }, **{ ("meteo", s): dict_bemis for s in model.meteo_parameters_2d }} else: meteo = {("meteo", ''): dict_surface} # Put everything in input dictionary, plus outputs for end concentrations mapper["inputs"].update( {**emis, **lbc, **top, **endconcs_in, **meteo}) mapper["outputs"].update(endconcs_out) if model.useemisb: mapper["inputs"].update(bioemis) # Different choice regarding inicond whether in a loop of ensemble method if not getattr(model, "ensrf_restart_file", False): mapper["inputs"].update(inicond) else: mapper["inputs"].update( {("restart_inicond", s): dict_ini for s in model.chemistry.acspecies.attributes } ) # Accepts backup components instead of reference ones backup_comps.update(model.backup_comps) # Force the transformation to be in its own precursors and successors # to propagate end concentrations mapper["precursors"] = {trid: [transform_name] for trid in endconcs_in} mapper["successors"] = {trid: [transform_name] for trid in endconcs_in} # Save propagation of perturbations inputs = ["latcond", "topcond", "endconcs", "inicond"] if getattr(model, "ensrf_restart_file", False): inputs[-1] = "restart_inicond" mapper["outputs2inputs"] = {} for trid in mapper["outputs"]: outcomp, s = trid # Add all species that directly or indirectly produce the species 's' species_to_link = [s] + [ sin for sin in model.chemistry.inout_reaction_graph if s in model.chemistry.inout_reaction_graph[sin] ] # Now loop in input species and update outputs2inputs # For species not outputed, just propagate endconcs mapper["outputs2inputs"][trid] = list(chain.from_iterable( [(cmp, sin) for cmp in inputs] + ([("flux", sin)] if sin in model.chemistry.emis_species.attributes else []) + ([("bioflux", sin)] if sin in model.chemistry.bio_species.attributes else []) for sin in species_to_link )) # Simplify outputs2inputs mapper["outputs2inputs"] = { tr: list(set(mapper["outputs2inputs"][tr])) for tr in mapper["outputs2inputs"] } return mapper