import traceback
from logging import debug
from types import MethodType
import numpy as np
from ...utils.check.errclass import PluginError
from ...utils.geometry.areas import calc_areas
from .baseclass import Plugin
[docs]
class Domain(Plugin):
vshifted = False
def __init__(self, **kwargs):
"""Create a Model Class"""
super(Domain, self).__init__(**kwargs)
# By default domains are structured
self.unstructured_domain = getattr(self, "unstructured_domain", False)
# Get middle vertical coordinates
self.get_vmiddle()
@classmethod
def register_plugin(cls, name, version, module, subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
plugin_type (str): type of plugin
**kwargs (dictionary): default options for module
"""
super(Domain, cls).register_plugin(
name, version, module, plugin_type="domain", subtype=subtype
)
def read_grid(self, *args, **kwargs):
"""Read a grid from an existing file
Args:
self (Domain): plugin defining the domain. Should include
filegrid to be able to read the grid from a file
Return:
Grid domain with meshgrids for center lon/lat and corner lon/lat
"""
raise PluginError("The function read_grid was not defined")
def create_domain(self, *args, **kwargs):
"""Creates a grid if needed
Args:
domain (dictionary): dictionary defining the domain.
"""
raise PluginError("The function create_domain was not defined")
def calc_areas(self, *args, **kwargs):
"""Computes the area of each grid cell in your domain."""
return calc_areas(self, **kwargs)
def initiate_template(self):
default_functions = ["read_grid", "create_domain", "calc_areas", "get_sides"]
super(Domain, self).initiate_template(
plg_type="domain",
default_functions={name: True for name in default_functions},
)
def get_domain(self, **kwargs):
try:
self.read_grid(**kwargs) # Read domain
except (IOError, AttributeError) as e1:
debug(
"Failed to read the domain due to the following exception:\n"
f"{traceback.format_exc()}\n"
"Generating it."
)
try:
self.create_domain(**kwargs) # Generate a domain
except Exception as e2:
raise e2 from e1 # Keep track of both exceptions
self.get_sides() # Get side coordinates
self.get_vmiddle() # Get sigma middle
# Extend the plugin 'ini_data' method
if hasattr(self, "ini_data"):
self.ini_data_orig = self.ini_data
def ini_data(self, **kwargs):
self.ini_data_orig(**kwargs)
if hasattr(self, "plugin"):
get_domain(self, **kwargs)
else:
def ini_data(self, **kwargs):
if hasattr(self, "plugin"):
get_domain(self, **kwargs)
self.ini_data = MethodType(ini_data, self)
[docs]
def get_sides(self):
"""Gets the sides of the domain"""
# Concatenating together the longitudes and latitudes of the sides
# Orders: West, East, South, North
zlonc = np.rad2deg(np.unwrap(np.deg2rad(self.zlonc), axis=1))
self.zlonc_side = np.concatenate(
[
zlonc[:, 0],
zlonc[:, -1],
zlonc[0, :],
zlonc[-1, :],
]
)[:, np.newaxis]
self.zlatc_side = np.concatenate(
[
self.zlatc[:, 0],
self.zlatc[:, -1],
self.zlatc[0, :],
self.zlatc[-1, :],
]
)[:, np.newaxis]
self.zlon_side = np.concatenate(
[
0.5 * (zlonc[:-1, 0] + zlonc[1:, 0]),
0.5 * (zlonc[:-1, -1] + zlonc[1:, -1]),
0.5 * (zlonc[0, :-1] + zlonc[0, 1:]),
0.5 * (zlonc[-1, :-1] + zlonc[-1, 1:]),
]
)[np.newaxis, :]
self.zlat_side = np.concatenate(
[
0.5 * (self.zlatc[:-1, 0] + self.zlatc[1:, 0]),
0.5 * (self.zlatc[:-1, -1] + self.zlatc[1:, -1]),
0.5 * (self.zlatc[0, :-1] + self.zlatc[0, 1:]),
0.5 * (self.zlatc[-1, :-1] + self.zlatc[-1, 1:]),
]
)[np.newaxis, :]
self.nlon_side = self.zlon_side.size
self.nlat_side = 1
def get_vmiddle(self):
# Deal with sigmas
if not hasattr(self, "sigma_a_mid") and hasattr(self, "sigma_a"):
self.sigma_a_mid = 0.5 * (self.sigma_a[1:] + self.sigma_a[:-1])
self.sigma_b_mid = 0.5 * (self.sigma_b[1:] + self.sigma_b[:-1])
if not hasattr(self, "sigma_a") and hasattr(self, "sigma_a_mid"):
self.sigma_a = [0]
self.sigma_b = [1]
for a, b in zip(self.sigma_a_mid, self.sigma_b_mid):
self.sigma_a.append(self.sigma_a[-1] + 2 * (a - self.sigma_a[-1]))
self.sigma_b.append(self.sigma_b[-1] + 2 * (b - self.sigma_b[-1]))
self.sigma_a = np.maximum(0, self.sigma_a)
self.sigma_b = np.maximum(0, self.sigma_b)
# Deal with heights
if not hasattr(self, "heights_mid") and hasattr(self, "heights"):
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
self.heights_mid = 0.5 * (self.heights[1:] + self.heights[:-1])
if not hasattr(self, "heights") and hasattr(self, "heights_mid"):
self.heights = [0]
for a in self.heights_mid:
self.heights.append(self.heights[-1] + 2 * (a - self.heights[-1]))
self.heights = np.maximum(0, self.heights)
def compare_to_another(self, other_domain):
if type(other_domain) != Domain:
return False
if self == other_domain:
return True
same_zlon = np.all(self.zlon == other_domain.zlon)
same_zlat = np.all(self.zlat == other_domain.zlat)
return same_zlon and same_zlat
# print(__file__)
# import code
# code.interact(local=dict(locals(), **globals()))