Source code for pycif.plugins.models.lmdz_acc.ini_mapper

import numpy as np
import datetime

from ....utils.classes.setup import Setup


[docs] def ini_mapper(model, transform_type, general_mapper={}, backup_comps={}, transforms_order=[], ref_transform="", transform_name="", **kwargs): input_intervals = { ddi: np.append( model.input_dates[ddi][:-1, np.newaxis], model.input_dates[ddi][1:, np.newaxis], axis=1) for ddi in model.input_dates} flx_intervals = { ddi: np.append( model.flx_input_dates[ddi][:-1, np.newaxis], model.flx_input_dates[ddi][1:, np.newaxis], axis=1) for ddi in model.flx_input_dates} output_intervals = { ddi: np.append( model.tstep_dates[ddi][:-1, np.newaxis], model.tstep_dates[ddi][1:, np.newaxis], axis=1) for ddi in model.input_dates} meteo_intervals = { ddi: np.append( model.meteo_dates[ddi][:-1, np.newaxis], model.meteo_dates[ddi][1:, np.newaxis], axis=1) for ddi in model.meteo_dates} default_dict = { "input_dates": input_intervals, "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": False, "break_adj_onlyinit_pipe": False, "force_loadin": False } meteo_dict = {"input_dates": meteo_intervals, "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": False, "break_adj_onlyinit_pipe": False} dict_surface = dict(default_dict, **{"domain": model.domain, "fixed_domain": True, }) dict_ini = dict( dict_surface, **{"input_dates": { model.datei: np.array([[model.datei, model.datei]])}} ) # For flx, alter the model domain to be consistent with nlevemis domain_in = model.domain domain_out = Setup.load_registered( domain_in.plugin.name, domain_in.plugin.version, "domain", plg_orig=domain_in ) domain_out.nlon = domain_in.nlon domain_out.nlat = domain_in.nlat domain_out.zlon = domain_in.zlon domain_out.zlat = domain_in.zlat domain_out.zlonc = domain_in.zlonc domain_out.zlatc = domain_in.zlatc domain_out.nlon_side = domain_in.nlon_side domain_out.nlat_side = domain_in.nlat_side domain_out.zlonc_side = domain_in.zlonc_side domain_out.zlatc_side = domain_in.zlatc_side domain_out.zlon_side = domain_in.zlon_side domain_out.zlat_side = domain_in.zlat_side domain_out.pressure_unit = domain_in.pressure_unit domain_out.nlev = 1 domain_out.sigma_a = np.array([0, 0]) domain_out.sigma_b = np.array([1, 1]) domain_out.sigma_a_mid = np.array([0]) domain_out.sigma_b_mid = np.array([1]) dict_aemis = dict( dict_surface, **{"domain": domain_out, "input_dates": flx_intervals, "unit": "kg/m2/s" }) # Executable mapper = { "inputs": { ("meteo", ""): meteo_dict, }, "outputs": { (outcomp, s): { "isobs": True, "force_loadout": True, "input_dates": output_intervals, "domain": model.domain, "sampled": True, "sparse_data": False, "continuous_hdomain": False, "continuous_vdomain": False, "break_adj_onlyinit_pipe": False, "break_fwd_onlyinit_pipe": False } for s in model.chemistry.acspecies.attributes for outcomp in ["concs", "pressure", "dpressure", "airm", "hlay"] } } # Inputs dict emis = { ("flux", s): dict_aemis for s in model.chemistry.emis_species.attributes } inicond = { ("inicond", s): dict_ini for s in model.chemistry.acspecies.attributes } prescrcond = {} if hasattr(model.chemistry, "prescrconcs") and model.do_chemistry: prescrcond = { ("prescrconcs", s): dict_surface for s in model.chemistry.prescrconcs.attributes } # End concentrations from previous period for all active species # are needed for later periods endconcs_in = { ("endconcs", s): {"input_dates": { ddi: np.array( [[model.input_dates[ddi][0], model.input_dates[ddi][0]]]) for ddi in list(model.input_dates.keys())[1:]}, "domain": model.domain, "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": False } for s in model.chemistry.acspecies.attributes } # End concentrations are saved for all periods endconcs_out = { ("endconcs", s): {"input_dates": { ddi: np.array( [[model.input_dates[ddi][-1], model.input_dates[ddi][-1]]]) for ddi in model.input_dates}, "domain": model.domain, "force_loadout": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": True} for s in model.chemistry.acspecies.attributes } # Photochemistry photj = {} if model.do_chemistry: list_var = ['pmid', 'temp'] if hasattr(model.chemistry, "photo_rates"): list_var.extend( [j[1][1] for j in model.chemistry.photo_rates.iterrows()] ) photj = { ("kinetic", l): dict_surface for l in list_var } model.photj = photj # Production and loss prodloss3d = {} if hasattr(model.chemistry, "prodloss3d") and model.do_chemistry: prodloss3d = { ("prodloss3d", s): dict_surface for s in model.chemistry.prodloss3d.attributes } mapper["inputs"].update( {**emis, **prescrcond, **prodloss3d, **photj, **endconcs_in}) mapper["outputs"].update(endconcs_out) # Different choice regarding inicond whether in a loop of ensemble method if not getattr(model, "ensrf_restart_file", False): mapper["inputs"].update(inicond) else: mapper["inputs"].update( {("restart_inicond", s): dict_ini for s in model.chemistry.acspecies.attributes } ) # Accepts backup components instead of reference ones if hasattr(model, "backup_comps"): backup_comps.update(model.backup_comps) # Force the transformation to be in its own precursors and successors # to propagate end concentrations mapper["precursors"] = {trid: [transform_name] for trid in endconcs_in} mapper["successors"] = {trid: [transform_name] for trid in endconcs_in} # Save propagation of perturbations input_components = ["inicond", "endconcs", "restart_inicond"] mapper["outputs2inputs"] = {} for out_comp in model.output_components + ["endconcs"]: for spec in model.chemistry.acspecies.attributes: inputs = [(comp, spec) for comp in input_components] if spec in model.chemistry.emis_species.attributes: inputs.append(("flux", spec)) mapper["outputs2inputs"][(out_comp, spec)] = inputs return mapper