import traceback
from logging import debug
from types import MethodType
import numpy as np
from ...utils.check.errclass import PluginError
from ...utils.geometry.areas import calc_areas
from .baseclass import Plugin
[docs]
class Domain(Plugin):
"""Plugin type for spatial model domains.
Stores horizontal (lon/lat grid centres and corners) and vertical
(hybrid sigma-pressure or height-above-ground) grid definitions, and
provides utilities for computing cell areas and boundary coordinates.
Concrete implementations live in ``pycif/plugins/domains/``.
"""
def __init__(self, **kwargs):
"""Initialise a Domain plugin with default structured-grid settings."""
super(Domain, self).__init__(**kwargs)
# By default domains are structured
self.unstructured_domain = getattr(self, "unstructured_domain", False)
# Get middle vertical coordinates
self.get_vmiddle()
self.vflip = False
[docs]
@classmethod
def register_plugin(cls, name, version, module, subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
plugin_type (str): type of plugin
**kwargs (dictionary): default options for module
"""
super(Domain, cls).register_plugin(
name, version, module, plugin_type="domain", subtype=subtype
)
@property
def vdomain_type(self):
"""Type of vertical coordinate system used by this domain.
Returns:
str: ``"sigma"`` if hybrid sigma-pressure coordinates are defined,
``"height"`` if AGL heights are defined, ``"none"`` otherwise.
"""
if hasattr(self, "sigma_a"):
return "sigma"
elif hasattr(self, "heights"):
return "height"
else:
return "none"
[docs]
def read_grid(self, *args, **kwargs):
"""Read a grid from an existing file
Args:
self (Domain): plugin defining the domain. Should include
filegrid to be able to read the grid from a file
Return:
Grid domain with meshgrids for center lon/lat and corner lon/lat
"""
raise PluginError("The function read_grid was not defined")
[docs]
def create_domain(self, *args, **kwargs):
"""Creates a grid if needed
Args:
domain (dictionary): dictionary defining the domain.
"""
raise PluginError("The function create_domain was not defined")
[docs]
def calc_areas(self, *args, **kwargs):
"""Compute the area of each grid cell in the domain.
Delegates to :func:`~pycif.utils.geometry.areas.calc_areas`.
Returns:
numpy.ndarray: Array of cell areas in m².
"""
return calc_areas(self, **kwargs)
[docs]
def initiate_template(self):
"""Initialise the Domain plugin template.
Loads the registered domain module and attaches ``read_grid``,
``create_domain``, ``calc_areas`` and ``get_sides`` as bound methods.
Also wraps ``ini_data`` to automatically call :meth:`read_grid` (or
:meth:`create_domain` on failure), :meth:`get_sides` and
:meth:`get_vmiddle` after data initialisation.
"""
default_functions = ["read_grid",
"create_domain", "calc_areas", "get_sides"]
super(Domain, self).initiate_template(
plg_type="domain",
default_functions={name: True for name in default_functions},
)
def get_domain(self, **kwargs):
try:
self.read_grid(**kwargs) # Read domain
except (IOError, AttributeError) as e1:
debug(
"Failed to read the domain due to the following exception:\n"
f"{traceback.format_exc()}\n"
"Generating it."
)
try:
self.create_domain(**kwargs) # Generate a domain
except Exception as e2:
raise e2 from e1 # Keep track of both exceptions
self.get_sides() # Get side coordinates
self.get_vmiddle() # Get sigma middle
# Extend the plugin 'ini_data' method
if hasattr(self, "ini_data"):
self.ini_data_orig = self.ini_data
def ini_data(self, **kwargs):
self.ini_data_orig(**kwargs)
if hasattr(self, "plugin"):
get_domain(self, **kwargs)
else:
def ini_data(self, **kwargs):
if hasattr(self, "plugin"):
get_domain(self, **kwargs)
self.ini_data = MethodType(ini_data, self)
[docs]
def get_sides(self):
"""Compute and store the boundary (side) coordinates of the domain.
Concatenates the West, East, South and North edge lon/lat corner
coordinates into ``zlonc_side`` / ``zlatc_side``, and the
corresponding midpoint coordinates into ``zlon_side`` / ``zlat_side``.
Sets ``nlon_side`` and ``nlat_side``.
"""
# Manage exception of empty domain
if self.nlon == 0 or self.nlat == 0:
self.zlonc_side = np.zeros((self.nlat, self.nlon))
self.zlatc_side = np.zeros((self.nlat, self.nlon))
self.nlon_side = 0
self.nlat_side = 1
return
# Concatenating together the longitudes and latitudes of the sides
# Orders: West, East, South, North
zlonc = np.rad2deg(np.unwrap(np.deg2rad(self.zlonc), axis=1))
self.zlonc_side = np.concatenate(
[
zlonc[:, 0],
zlonc[:, -1],
zlonc[0, :],
zlonc[-1, :],
]
)[:, np.newaxis]
self.zlatc_side = np.concatenate(
[
self.zlatc[:, 0],
self.zlatc[:, -1],
self.zlatc[0, :],
self.zlatc[-1, :],
]
)[:, np.newaxis]
self.zlon_side = np.concatenate(
[
0.5 * (zlonc[:-1, 0] + zlonc[1:, 0]),
0.5 * (zlonc[:-1, -1] + zlonc[1:, -1]),
0.5 * (zlonc[0, :-1] + zlonc[0, 1:]),
0.5 * (zlonc[-1, :-1] + zlonc[-1, 1:]),
]
)[np.newaxis, :]
self.zlat_side = np.concatenate(
[
0.5 * (self.zlatc[:-1, 0] + self.zlatc[1:, 0]),
0.5 * (self.zlatc[:-1, -1] + self.zlatc[1:, -1]),
0.5 * (self.zlatc[0, :-1] + self.zlatc[0, 1:]),
0.5 * (self.zlatc[-1, :-1] + self.zlatc[-1, 1:]),
]
)[np.newaxis, :]
self.nlon_side = self.zlon_side.size
self.nlat_side = 1
[docs]
def get_vmiddle(self):
"""Derive mid-level vertical coordinates from layer interfaces, or vice-versa.
- If ``sigma_a`` / ``sigma_b`` hybrid-sigma interfaces are defined but
mid-levels are not, computes ``sigma_a_mid`` and ``sigma_b_mid`` as
arithmetic means of adjacent levels.
- If mid-levels are defined but interfaces are not, reconstructs the
interface arrays.
- The same logic applies symmetrically for ``heights`` / ``heights_mid``
(heights above ground level).
"""
# Deal with sigmas
if not hasattr(self, "sigma_a_mid") and hasattr(self, "sigma_a"):
self.sigma_a_mid = 0.5 * (self.sigma_a[1:] + self.sigma_a[:-1])
self.sigma_b_mid = 0.5 * (self.sigma_b[1:] + self.sigma_b[:-1])
if not hasattr(self, "sigma_a") and hasattr(self, "sigma_a_mid"):
self.sigma_a = [0]
self.sigma_b = [1]
for a, b in zip(self.sigma_a_mid, self.sigma_b_mid):
self.sigma_a.append(
self.sigma_a[-1] + 2 * (a - self.sigma_a[-1]))
self.sigma_b.append(
self.sigma_b[-1] + 2 * (b - self.sigma_b[-1]))
self.sigma_a = np.maximum(0, self.sigma_a)
self.sigma_b = np.maximum(0, self.sigma_b)
# Deal with heights
if not hasattr(self, "heights_mid") and hasattr(self, "heights"):
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
if not hasattr(self, "heights") and hasattr(self, "heights_mid"):
self.heights = [0]
for a in self.heights_mid:
self.heights.append(
self.heights[-1] + 2 * (a - self.heights[-1]))
self.heights = np.maximum(0, self.heights)
def __eq__(self, other):
"""Test whether two Domain instances cover the same grid.
Args:
other: Object to compare with.
Returns:
bool: True if ``zlon``, ``zlat`` and the vertical coordinate arrays
(sigma or height) are element-wise equal.
"""
if self is other:
return True
if not isinstance(other, Domain):
return False
same_zlon = np.array_equal(self.zlon, other.zlon)
same_zlat = np.array_equal(self.zlat, other.zlat)
same_vdomain = False
if self.vdomain_type == other.vdomain_type:
if self.vdomain_type == "sigma":
same_vdomain = np.array_equal(self.sigma_a, other.sigma_a)
elif self.vdomain_type == "height":
same_vdomain = np.array_equal(self.heights, other.heights)
return same_zlon and same_zlat and same_vdomain