import copy
import itertools
from pathlib import Path
import numpy as np
from .....utils.classes.domains import Domain
from .utils import GribDataset, get_grid_type, get_jscan
[docs]
def make_regular_grid(lat, lon, jscan):
dx = np.unique(np.round(np.diff(lon), 6))[0]
dy = np.abs(np.unique(np.round(np.diff(lat), 6))[0])
# Flip latitudes if jscan = 0
if jscan == 0:
lat2 = np.flip(lat)
lat = copy.deepcopy(lat2)
# Define lon/lat variables
nlon = lon.size
nlat = lat.size
zlon, zlat = np.meshgrid(lon, lat)
latc = np.append(lat - dy / 2, lat[-1] + dy / 2)
lonc = np.append(lon - dx / 2, lon[-1] + dx / 2)
zlonc, zlatc = np.meshgrid(lonc, latc)
return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs]
def make_gaussian_grid(lat, lon):
ncell = lat.shape[0]
lat_increasing = np.all(np.diff(lat) >= 0)
(indices,) = np.where(np.diff(lat) != 0)
indices = np.concatenate(([0], indices + 1, [ncell]))
lat_vertices = np.zeros((ncell, 4), dtype=lat.dtype)
lon_vertices = np.zeros((ncell, 4), dtype=lon.dtype)
for ind_start, ind_end in zip(indices[:-1], indices[1:]):
lat_block = lat[ind_start:ind_end]
lon_block = lon[ind_start:ind_end]
delta_lon = np.round(np.diff(lon_block), 12)
if len(np.unique(lat_block)) != 1:
raise ValueError("Latitude values are not constant within the block")
if len(np.unique(delta_lon)) != 1:
raise ValueError(
"Longitude values are not regularly spaced within the block"
)
lat_block = lat_block[0]
delta_lon = delta_lon[0]
if ind_start == 0:
if lat_increasing:
lat_bnds = (-90.0, 0.5 * (lat_block + lat[ind_end]))
else:
lat_bnds = (0.5 * (lat_block + lat[ind_end]), 90.0)
elif ind_end == ncell:
if lat_increasing:
lat_bnds = (0.5 * (lat[ind_start - 1] + lat_block), 90.0)
else:
lat_bnds = (-90.0, 0.5 * (lat[ind_start - 1] + lat_block))
else:
lat_bnds = (
0.5 * (lat[ind_start - 1] + lat_block),
0.5 * (lat_block + lat[ind_end]),
)
lon_bnds = np.concatenate(
(lon_block - 0.5 * delta_lon, [lon_block[-1] + 0.5 * delta_lon])
)
lat_vertices[ind_start:ind_end, 0] = lat_bnds[0]
lat_vertices[ind_start:ind_end, 1] = lat_bnds[1]
lat_vertices[ind_start:ind_end, 2] = lat_bnds[1]
lat_vertices[ind_start:ind_end, 3] = lat_bnds[0]
lon_vertices[ind_start:ind_end, 0] = lon_bnds[:-1]
lon_vertices[ind_start:ind_end, 1] = lon_bnds[:-1]
lon_vertices[ind_start:ind_end, 2] = lon_bnds[1:]
lon_vertices[ind_start:ind_end, 3] = lon_bnds[1:]
if np.any(lon_vertices > 180):
lon_vertices[lon_vertices > 180] -= 360
zlonc = lon_vertices.T
zlatc = lat_vertices.T
zlon = lon[np.newaxis, :]
zlat = lat[np.newaxis, :]
nlon = ncell
nlat = 1
return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs]
def make_octahedral_grid(lat, lon, file_corner):
# Read the csv coordinates
with open(file_corner, "r") as f:
lines = f.readlines()
data = np.zeros((len(lines), 4, 2))
for k, ln in enumerate(lines):
l = ln.strip().replace("[", "").replace("]", "").replace(",", "").split()
data[k] = np.array(l).astype(float).reshape(1, 4, 2)
zlonc = data[:, [0, 2, 3, 1], 0].T
zlatc = data[:, [0, 2, 3, 1], 1].T
zlon = lon[np.newaxis, :]
zlat = lat[np.newaxis, :]
nlon = len(lines)
nlat = 1
return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs]
def get_domain(ref_dir, ref_file, input_interval, target_dir, tracer=None):
input_files = list(itertools.chain.from_iterable(tracer.input_files.values()))
domain_file = input_files[0][0]
if hasattr(tracer, "domain_file"):
domain_file = Path(domain_file).parent.joinpath(tracer.domain_file)
ds = GribDataset(
domain_file,
read_keys=["pv"],
filter_by_keys=tracer.filter_by_keys_dict,
)
lon = ds["longitude"]
lat = ds["latitude"]
grid_type = get_grid_type(domain_file, tracer.filter_by_keys_dict)
if grid_type == "octahedral":
jscan = 0
else:
jscan = get_jscan(domain_file, tracer.filter_by_keys_dict)
if grid_type == "octahedral":
path = Path(tracer.dir_hcoord_corner, tracer.file_hcoord_corner)
if not path.is_file():
raise FileNotFoundError(
f"Path to the corner coordinates does not exist, "
"please check that it is properly defined in your yml.\n"
f"Current definition is:\n"
f"\t- dir_hcoord_corner = {tracer.dir_hcoord_corner}\n"
f"\t- file_hcoord_corner = {tracer.file_hcoord_corner}\n"
)
zlonc, zlatc, zlon, zlat, nlon, nlat = make_octahedral_grid(lat, lon, path)
unstructured_domain = True
elif grid_type == "reduced_gaussian":
zlonc, zlatc, zlon, zlat, nlon, nlat = make_gaussian_grid(lat, lon)
unstructured_domain = True
elif grid_type == "regular":
zlonc, zlatc, zlon, zlat, nlon, nlat = make_regular_grid(lat, lon, jscan)
unstructured_domain = False
else:
raise ValueError(f"Unsupported grid type {grid_type}")
# Reconstruct alpha and beta
if tracer.surface:
ecm_nlevs = 1
sigma_a = np.array([0])
sigma_b = np.array([1])
sigma_a_mid = np.array([0])
sigma_b_mid = np.array([1])
# Initializes domain
domain = Domain(
nlon=nlon,
nlat=nlat,
zlon=zlon,
zlat=zlat,
zlonc=zlonc,
zlatc=zlatc,
nlev=ecm_nlevs,
pressure_unit="Pa",
sigma_b=sigma_b,
sigma_a=sigma_a,
sigma_a_mid=sigma_a_mid,
sigma_b_mid=sigma_b_mid,
unstructured_domain=unstructured_domain,
)
else:
pv = ds.get_attr("pv")
hybrid = np.array(ds["hybrid"]).flatten().astype(int)
ecm_nlevs = int(len(pv) / 2 - 1)
sigma_a = pv[: ecm_nlevs + 1][::-1]
sigma_b = pv[ecm_nlevs + 1:][::-1]
# sigma_a = np.zeros(ecm_nlevs + 1)
# sigma_b = np.zeros(ecm_nlevs + 1)
# for ii in range(ecm_nlevs - 1):
# sigma_a[ii] = (pv[ecm_nlevs - ii] + pv[ecm_nlevs - ii - 1]) / 2
# sigma_b[ii] = \
# (pv[1 + 2 * ecm_nlevs - ii] + pv[1 + 2 * ecm_nlevs - ii - 1]) / 2
# Vertical crop if all hybrid levels are not available
ind_crop = np.concatenate(
[ecm_nlevs - hybrid[::-1], ecm_nlevs + 1 - hybrid[[0]]]
)
sigma_a = sigma_a[ind_crop]
sigma_b = sigma_b[ind_crop]
ecm_nlevs = hybrid.size
# Initializes domain
domain = Domain(
nlon=nlon,
nlat=nlat,
zlon=zlon,
zlat=zlat,
zlonc=zlonc,
zlatc=zlatc,
nlev=ecm_nlevs,
pressure_unit="Pa",
sigma_b=sigma_b,
sigma_a=sigma_a,
unstructured_domain=unstructured_domain,
)
return domain