Source code for pycif.plugins.models.lmdz_ico.ini_mapper

from __future__ import annotations

from datetime import datetime
from logging import debug
import pandas as pd
from typing import Any

import numpy as np


[docs] def get_input_intervals( self, input_dates: dict[datetime, np.ndarray], ) -> dict[datetime, np.ndarray]: """Convert per-period date arrays to [start, end] interval pairs. Filters pairs to only those that overlap with the full simulation window ``[self.datei, self.datef]``. Args: self: LMDZ-ico model plugin instance. input_dates: mapping of period-start to sorted date arrays. Returns: dict: mapping of period-start to ``(N, 2)`` arrays of date pairs. """ input_intervals = {} for date, date_array in input_dates.items(): input_intervals[date] = np.array( [ [di, df] for di, df in zip(date_array[:-1], date_array[1:]) if self.datei <= di <= self.datef or self.datei <= df <= self.datef ] ) return input_intervals
[docs] def ini_mapper( self, transform_type, general_mapper={}, backup_comps={}, transforms_order=[], ref_transform="", transform_name="", **kwargs, ) -> dict[ # noqa: E501 str, dict[tuple[str, str], dict[str, Any] | list[str] | list[tuple[str, str]]] ]: """Build the data-flow mapper for the LMDZ-ico (icosahedral-grid) model. Registers: * **Flux inputs** per active species at the configured flux frequency. * **Meteo inputs** (mass fluxes) for transport. * **Chemical field inputs** for prescribed species. * **Initial-condition** and **end-concentration** inputs for period chaining. * **Outputs** — sampled concentrations per output species. * **outputs2inputs** linking each output tracer to its source inputs. The surface-only domain variant (``domain_surface``) is used for 2-D flux inputs. Args: self: LMDZ-ico model plugin instance with all date arrays already set by :func:`ini_periods`. transform_type (str): unused; kept for API compatibility. general_mapper (dict): unused. backup_comps (dict): updated in-place with fallback components. transforms_order (list): unused. ref_transform (str): unused. transform_name (str): unused. **kwargs: unused. Returns: dict: mapper with ``inputs``, ``outputs``, and ``outputs2inputs``. """ domain_surface = self.domain.squeeze_vertical_dim() default_options = { "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": False, "break_adj_onlyinit_pipe": False, "force_loadin": False, "tracer_from_previous": True, } mapper = { "inputs": {}, "outputs": {}, "outputs2inputs": {}, } mapper["inputs"][("meteo", "")] = { "input_dates": get_input_intervals(self, self.meteo_input_dates), **default_options, } for spec in self.chemistry.active_species: # End concentrations from previous period for all active species # are needed for later periods mapper["inputs"][("endconcs", spec)] = { "input_dates": { ddi: np.array([[self.input_dates[ddi][0], self.input_dates[ddi][0]]]) for ddi in list(self.input_dates)[1:] }, "domain": self.domain, "force_dump": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": False, } # End concentrations are saved for all periods mapper["outputs"][("endconcs", spec)] = { "input_dates": { ddi: np.array([[self.input_dates[ddi][-1], self.input_dates[ddi][-1]]]) for ddi in self.input_dates }, "domain": self.domain, "force_loadout": True, "sparse_data": False, "sampled": False, "break_fwd_onlyinit_pipe": True, "break_adj_onlyinit_pipe": True, } for out_comp in self.output_components: mapper["outputs"][(out_comp, spec)] = { "isobs": True, "force_loadout": True, "input_dates": { ddi: np.append( self.tstep_dates[ddi][:-1, np.newaxis], self.tstep_dates[ddi][1:, np.newaxis], axis=1, ) for ddi in self.input_dates }, "domain": self.domain, "sampled": True, "continuous_hdomain": False, "continuous_vdomain": False, "sparse_data": False, "break_adj_onlyinit_pipe": False, "break_fwd_onlyinit_pipe": False, } for spec in self.chemistry.emitted_species: mapper["inputs"][("flux", spec)] = { "input_dates": get_input_intervals(self, self.flux_input_dates), "domain": domain_surface, "fixed_domain": True, "unit": "kg/m2/s", **default_options, } if self.do_chemistry: mapper_dict = { "input_dates": get_input_intervals(self, self.chem_input_dates), "domain": self.domain, "fixed_domain": True, **default_options, } mapper["inputs"][("kinetic", "pmid")] = mapper_dict.copy() mapper["inputs"][("kinetic", "temp")] = mapper_dict.copy() for spec in self.chemistry.prescribed_species: mapper["inputs"][("prescrconcs", spec)] = mapper_dict.copy() for spec in self.chemistry.prodloss_species: mapper["inputs"][("prodloss3d", spec)] = mapper_dict.copy() for var_name in self.chemistry.jrates_varname: mapper["inputs"][("photorates", var_name)] = mapper_dict.copy() mapper_dict["domain"] = domain_surface for spec in self.chemistry.deposition_species: mapper["inputs"][("deposition", spec)] = mapper_dict.copy() # Different choice regarding inicond whether in a loop of ensemble method ensrf_restart_file = getattr(self, "ensrf_restart_file", False) ini_component = "restart_inicond" if ensrf_restart_file else "inicond" for spec in self.chemistry.active_species: mapper["inputs"][(ini_component, spec)] = { "input_dates": {self.datei: np.array([[self.datei, self.datei]])}, "domain": self.domain, "fixed_domain": True, **default_options, } # Accepts backup components instead of reference ones if hasattr(self, "backup_comps"): backup_comps.update(self.backup_comps) # Force the transformation to be in its own precursors and successors # to propagate end concentrations mapper["precursors"] = { ("endconcs", spec): [transform_name] for spec in self.chemistry.active_species } mapper["successors"] = { ("endconcs", spec): [transform_name] for spec in self.chemistry.active_species } # Save propagation of perturbations input_components = ["inicond", "endconcs"] if getattr(self, "ensrf_restart_file", False): inputs[0] = "restart_inicond" mapper["outputs2inputs"] = {} for out_comp in self.output_components + ["endconcs"]: for spec in self.chemistry.active_species: inputs = [(comp, spec) for comp in input_components] if spec in self.chemistry.emitted_species: inputs.append(("flux", spec)) mapper["outputs2inputs"][(out_comp, spec)] = inputs return mapper