Source code for pycif.utils.classes.domains

import traceback
from logging import debug
from types import MethodType

import numpy as np

from ...utils.check.errclass import PluginError
from ...utils.geometry.areas import calc_areas
from .baseclass import Plugin


[docs] class Domain(Plugin): """Plugin type for spatial model domains. Stores horizontal (lon/lat grid centres and corners) and vertical (hybrid sigma-pressure or height-above-ground) grid definitions, and provides utilities for computing cell areas and boundary coordinates. Concrete implementations live in ``pycif/plugins/domains/``. """ def __init__(self, **kwargs): """Initialise a Domain plugin with default structured-grid settings.""" super(Domain, self).__init__(**kwargs) # By default domains are structured self.unstructured_domain = getattr(self, "unstructured_domain", False) # Get middle vertical coordinates self.get_vmiddle() self.vflip = False
[docs] @classmethod def register_plugin(cls, name, version, module, subtype="", **kwargs): """Register a module for a plugin and version with possibly options Args: name (str): name of the plugin version (str): version of the plugin module (types.ModuleType): module defining the interface between pyCIF and the plugin plugin_type (str): type of plugin **kwargs (dictionary): default options for module """ super(Domain, cls).register_plugin( name, version, module, plugin_type="domain", subtype=subtype )
@property def vdomain_type(self): """Type of vertical coordinate system used by this domain. Returns: str: ``"sigma"`` if hybrid sigma-pressure coordinates are defined, ``"height"`` if AGL heights are defined, ``"none"`` otherwise. """ if hasattr(self, "sigma_a"): return "sigma" elif hasattr(self, "heights"): return "height" else: return "none"
[docs] def read_grid(self, *args, **kwargs): """Read a grid from an existing file Args: self (Domain): plugin defining the domain. Should include filegrid to be able to read the grid from a file Return: Grid domain with meshgrids for center lon/lat and corner lon/lat """ raise PluginError("The function read_grid was not defined")
[docs] def create_domain(self, *args, **kwargs): """Creates a grid if needed Args: domain (dictionary): dictionary defining the domain. """ raise PluginError("The function create_domain was not defined")
[docs] def calc_areas(self, *args, **kwargs): """Compute the area of each grid cell in the domain. Delegates to :func:`~pycif.utils.geometry.areas.calc_areas`. Returns: numpy.ndarray: Array of cell areas in m². """ return calc_areas(self, **kwargs)
[docs] def initiate_template(self): """Initialise the Domain plugin template. Loads the registered domain module and attaches ``read_grid``, ``create_domain``, ``calc_areas`` and ``get_sides`` as bound methods. Also wraps ``ini_data`` to automatically call :meth:`read_grid` (or :meth:`create_domain` on failure), :meth:`get_sides` and :meth:`get_vmiddle` after data initialisation. """ default_functions = ["read_grid", "create_domain", "calc_areas", "get_sides"] super(Domain, self).initiate_template( plg_type="domain", default_functions={name: True for name in default_functions}, ) def get_domain(self, **kwargs): try: self.read_grid(**kwargs) # Read domain except (IOError, AttributeError) as e1: debug( "Failed to read the domain due to the following exception:\n" f"{traceback.format_exc()}\n" "Generating it." ) try: self.create_domain(**kwargs) # Generate a domain except Exception as e2: raise e2 from e1 # Keep track of both exceptions self.get_sides() # Get side coordinates self.get_vmiddle() # Get sigma middle # Extend the plugin 'ini_data' method if hasattr(self, "ini_data"): self.ini_data_orig = self.ini_data def ini_data(self, **kwargs): self.ini_data_orig(**kwargs) if hasattr(self, "plugin"): get_domain(self, **kwargs) else: def ini_data(self, **kwargs): if hasattr(self, "plugin"): get_domain(self, **kwargs) self.ini_data = MethodType(ini_data, self)
[docs] def get_sides(self): """Compute and store the boundary (side) coordinates of the domain. Concatenates the West, East, South and North edge lon/lat corner coordinates into ``zlonc_side`` / ``zlatc_side``, and the corresponding midpoint coordinates into ``zlon_side`` / ``zlat_side``. Sets ``nlon_side`` and ``nlat_side``. """ # Manage exception of empty domain if self.nlon == 0 or self.nlat == 0: self.zlonc_side = np.zeros((self.nlat, self.nlon)) self.zlatc_side = np.zeros((self.nlat, self.nlon)) self.nlon_side = 0 self.nlat_side = 1 return # Concatenating together the longitudes and latitudes of the sides # Orders: West, East, South, North zlonc = np.rad2deg(np.unwrap(np.deg2rad(self.zlonc), axis=1)) self.zlonc_side = np.concatenate( [ zlonc[:, 0], zlonc[:, -1], zlonc[0, :], zlonc[-1, :], ] )[:, np.newaxis] self.zlatc_side = np.concatenate( [ self.zlatc[:, 0], self.zlatc[:, -1], self.zlatc[0, :], self.zlatc[-1, :], ] )[:, np.newaxis] self.zlon_side = np.concatenate( [ 0.5 * (zlonc[:-1, 0] + zlonc[1:, 0]), 0.5 * (zlonc[:-1, -1] + zlonc[1:, -1]), 0.5 * (zlonc[0, :-1] + zlonc[0, 1:]), 0.5 * (zlonc[-1, :-1] + zlonc[-1, 1:]), ] )[np.newaxis, :] self.zlat_side = np.concatenate( [ 0.5 * (self.zlatc[:-1, 0] + self.zlatc[1:, 0]), 0.5 * (self.zlatc[:-1, -1] + self.zlatc[1:, -1]), 0.5 * (self.zlatc[0, :-1] + self.zlatc[0, 1:]), 0.5 * (self.zlatc[-1, :-1] + self.zlatc[-1, 1:]), ] )[np.newaxis, :] self.nlon_side = self.zlon_side.size self.nlat_side = 1
[docs] def get_vmiddle(self): """Derive mid-level vertical coordinates from layer interfaces, or vice-versa. - If ``sigma_a`` / ``sigma_b`` hybrid-sigma interfaces are defined but mid-levels are not, computes ``sigma_a_mid`` and ``sigma_b_mid`` as arithmetic means of adjacent levels. - If mid-levels are defined but interfaces are not, reconstructs the interface arrays. - The same logic applies symmetrically for ``heights`` / ``heights_mid`` (heights above ground level). """ # Deal with sigmas if not hasattr(self, "sigma_a_mid") and hasattr(self, "sigma_a"): self.sigma_a_mid = 0.5 * (self.sigma_a[1:] + self.sigma_a[:-1]) self.sigma_b_mid = 0.5 * (self.sigma_b[1:] + self.sigma_b[:-1]) if not hasattr(self, "sigma_a") and hasattr(self, "sigma_a_mid"): self.sigma_a = [0] self.sigma_b = [1] for a, b in zip(self.sigma_a_mid, self.sigma_b_mid): self.sigma_a.append( self.sigma_a[-1] + 2 * (a - self.sigma_a[-1])) self.sigma_b.append( self.sigma_b[-1] + 2 * (b - self.sigma_b[-1])) self.sigma_a = np.maximum(0, self.sigma_a) self.sigma_b = np.maximum(0, self.sigma_b) # Deal with heights if not hasattr(self, "heights_mid") and hasattr(self, "heights"): self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1]) self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1]) if not hasattr(self, "heights") and hasattr(self, "heights_mid"): self.heights = [0] for a in self.heights_mid: self.heights.append( self.heights[-1] + 2 * (a - self.heights[-1])) self.heights = np.maximum(0, self.heights)
def __eq__(self, other): """Test whether two Domain instances cover the same grid. Args: other: Object to compare with. Returns: bool: True if ``zlon``, ``zlat`` and the vertical coordinate arrays (sigma or height) are element-wise equal. """ if self is other: return True if not isinstance(other, Domain): return False same_zlon = np.array_equal(self.zlon, other.zlon) same_zlat = np.array_equal(self.zlat, other.zlat) same_vdomain = False if self.vdomain_type == other.vdomain_type: if self.vdomain_type == "sigma": same_vdomain = np.array_equal(self.sigma_a, other.sigma_a) elif self.vdomain_type == "height": same_vdomain = np.array_equal(self.heights, other.heights) return same_zlon and same_zlat and same_vdomain