from types import MethodType
import traceback
import numpy as np
from logging import info, debug
from ...utils.check.errclass import PluginError
from ...utils.geometry.areas import calc_areas
from .baseclass import Plugin
[docs]
class Domain(Plugin):
vshifted = False
def __init__(self, **kwargs):
"""Create a Model Class"""
super(Domain, self).__init__(**kwargs)
# By default domains are structured
self.unstructured_domain = getattr(self, "unstructured_domain", False)
# Get middle vertical coordinates
self.get_vmiddle()
@classmethod
def register_plugin(cls, name, version, module, subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
plugin_type (str): type of plugin
**kwargs (dictionary): default options for module
"""
super(Domain, cls).register_plugin(
name, version, module, plugin_type="domain", subtype=subtype
)
def read_grid(self, *args, **kwargs):
"""Read a grid from an existing file
Args:
self (Domain): plugin defining the domain. Should include
filegrid to be able to read the grid from a file
Return:
Grid domain with meshgrids for center lon/lat and corner lon/lat
"""
raise PluginError("The function read_grid was not defined")
def create_domain(self, *args, **kwargs):
"""Creates a grid if needed
Args:
domain (dictionary): dictionary defining the domain.
"""
raise PluginError("The function create_domain was not defined")
def calc_areas(self, *args, **kwargs):
"""Computes the area of each grid cell in your domain."""
return calc_areas(self, **kwargs)
def initiate_template(self):
super(Domain, self).initiate_template(
plg_type="domain",
default_functions={"read_grid": True, "create_domain": True,
"calc_areas": True, "get_sides": True}
)
def ini_data(self, **kwargs):
"""Initializes the domain depending on the model used for the inversion.
Defines a domain grid from a grid file or a set of parameters if the
domain was not already defined. Domains are model dependant, so the
outputs can be different depending
on the model.
Args:
plugin (DomainPlugin): domain definition
Returns:
Updates on the fly the domain
"""
if not hasattr(self, "plugin"):
return
# Read or create a domain
try:
# Read domain
self.read_grid(**kwargs)
except (IOError, AttributeError) as e:
# Print the exception
debug("Failed to read the domain due to the following exception:\n")
debug(traceback.format_exc())
# Generate a domain
debug("\nGenerating it.\n")
self.create_domain(**kwargs)
# Compute areas that can be needed for emissions or diagnostics
# if not hasattr(self, "areas") and getattr(
# self, "compute_areas", False
# ):
# info("Computing areas")
# self.calc_areas(**kwargs)
# Get side coordinates
self.get_sides()
# Get sigma middle
self.get_vmiddle()
# Check vertical order
[docs]
def get_sides(self):
"""Gets the sides of the domain
"""
# Concatenating together the longitudes and latitudes of the sides
# Orders: West, East, South, North
zlonc = np.rad2deg(np.unwrap(np.deg2rad(self.zlonc), axis=1))
self.zlonc_side = np.concatenate(
[
zlonc[:, 0],
zlonc[:, -1],
zlonc[0, :],
zlonc[-1, :],
]
)[:, np.newaxis]
self.zlatc_side = np.concatenate(
[
self.zlatc[:, 0],
self.zlatc[:, -1],
self.zlatc[0, :],
self.zlatc[-1, :],
]
)[:, np.newaxis]
self.zlon_side = np.concatenate(
[
0.5 * (zlonc[:-1, 0] + zlonc[1:, 0]),
0.5 * (zlonc[:-1, -1] + zlonc[1:, -1]),
0.5 * (zlonc[0, :-1] + zlonc[0, 1:]),
0.5 * (zlonc[-1, :-1] + zlonc[-1, 1:]),
]
)[np.newaxis, :]
self.zlat_side = np.concatenate(
[
0.5 * (self.zlatc[:-1, 0] + self.zlatc[1:, 0]),
0.5 * (self.zlatc[:-1, -1] + self.zlatc[1:, -1]),
0.5 * (self.zlatc[0, :-1] + self.zlatc[0, 1:]),
0.5 * (self.zlatc[-1, :-1] + self.zlatc[-1, 1:]),
]
)[np.newaxis, :]
self.nlon_side = self.zlon_side.size
self.nlat_side = 1
def get_vmiddle(self):
# Deal with sigmas
if not hasattr(self, "sigma_a_mid") and hasattr(self, "sigma_a"):
self.sigma_a_mid = 0.5 * (self.sigma_a[1:] + self.sigma_a[:-1])
self.sigma_b_mid = 0.5 * (self.sigma_b[1:] + self.sigma_b[:-1])
if not hasattr(self, "sigma_a") and hasattr(self, "sigma_a_mid"):
self.sigma_a = [0]
self.sigma_b = [1]
for a, b in zip(self.sigma_a_mid, self.sigma_b_mid):
self.sigma_a.append(
self.sigma_a[-1] + 2 * (a - self.sigma_a[-1]))
self.sigma_b.append(
self.sigma_b[-1] + 2 * (b - self.sigma_b[-1]))
self.sigma_a = np.maximum(0, self.sigma_a)
self.sigma_b = np.maximum(0, self.sigma_b)
# Deal with heights
if not hasattr(self, "heights_mid") and hasattr(self, "heights"):
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
if not hasattr(self, "heights") and hasattr(self, "heights_mid"):
self.heights = [0]
for a in self.heights_mid:
self.heights.append(
self.heights[-1] + 2 * (a - self.heights[-1]))
self.heights = np.maximum(0, self.heights)