Source code for pycif.plugins.datastreams.fields.grib2_ecmwf.get_domain

import copy
import itertools
from pathlib import Path

import numpy as np

from .....utils.classes.domains import Domain
from .utils import GribDataset, get_grid_type, get_jscan


[docs] def make_regular_grid(lat, lon, jscan): dx = np.unique(np.round(np.diff(lon), 6))[0] dy = np.abs(np.unique(np.round(np.diff(lat), 6))[0]) # Flip latitudes if jscan = 0 if jscan == 0: lat2 = np.flip(lat) lat = copy.deepcopy(lat2) # Define lon/lat variables nlon = lon.size nlat = lat.size zlon, zlat = np.meshgrid(lon, lat) latc = np.append(lat - dy / 2, lat[-1] + dy / 2) lonc = np.append(lon - dx / 2, lon[-1] + dx / 2) zlonc, zlatc = np.meshgrid(lonc, latc) return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs] def make_gaussian_grid(lat, lon): ncell = lat.shape[0] lat_increasing = np.all(np.diff(lat) >= 0) (indices,) = np.where(np.diff(lat) != 0) indices = np.concatenate(([0], indices + 1, [ncell])) lat_vertices = np.zeros((ncell, 4), dtype=lat.dtype) lon_vertices = np.zeros((ncell, 4), dtype=lon.dtype) for ind_start, ind_end in zip(indices[:-1], indices[1:]): lat_block = lat[ind_start:ind_end] lon_block = lon[ind_start:ind_end] delta_lon = np.round(np.diff(lon_block), 12) if len(np.unique(lat_block)) != 1: raise ValueError("Latitude values are not constant within the block") if len(np.unique(delta_lon)) != 1: raise ValueError( "Longitude values are not regularly spaced within the block" ) lat_block = lat_block[0] delta_lon = delta_lon[0] if ind_start == 0: if lat_increasing: lat_bnds = (-90.0, 0.5 * (lat_block + lat[ind_end])) else: lat_bnds = (0.5 * (lat_block + lat[ind_end]), 90.0) elif ind_end == ncell: if lat_increasing: lat_bnds = (0.5 * (lat[ind_start - 1] + lat_block), 90.0) else: lat_bnds = (-90.0, 0.5 * (lat[ind_start - 1] + lat_block)) else: lat_bnds = ( 0.5 * (lat[ind_start - 1] + lat_block), 0.5 * (lat_block + lat[ind_end]), ) lon_bnds = np.concatenate( (lon_block - 0.5 * delta_lon, [lon_block[-1] + 0.5 * delta_lon]) ) lat_vertices[ind_start:ind_end, 0] = lat_bnds[0] lat_vertices[ind_start:ind_end, 1] = lat_bnds[1] lat_vertices[ind_start:ind_end, 2] = lat_bnds[1] lat_vertices[ind_start:ind_end, 3] = lat_bnds[0] lon_vertices[ind_start:ind_end, 0] = lon_bnds[:-1] lon_vertices[ind_start:ind_end, 1] = lon_bnds[:-1] lon_vertices[ind_start:ind_end, 2] = lon_bnds[1:] lon_vertices[ind_start:ind_end, 3] = lon_bnds[1:] if np.any(lon_vertices > 180): lon_vertices[lon_vertices > 180] -= 360 zlonc = lon_vertices.T zlatc = lat_vertices.T zlon = lon[np.newaxis, :] zlat = lat[np.newaxis, :] nlon = ncell nlat = 1 return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs] def make_octahedral_grid(lat, lon, file_corner): # Read the csv coordinates with open(file_corner, "r") as f: lines = f.readlines() data = np.zeros((len(lines), 4, 2)) for k, ln in enumerate(lines): l = ln.strip().replace("[", "").replace("]", "").replace(",", "").split() data[k] = np.array(l).astype(float).reshape(1, 4, 2) zlonc = data[:, [0, 2, 3, 1], 0].T zlatc = data[:, [0, 2, 3, 1], 1].T zlon = lon[np.newaxis, :] zlat = lat[np.newaxis, :] nlon = len(lines) nlat = 1 return zlonc, zlatc, zlon, zlat, nlon, nlat
[docs] def get_domain(ref_dir, ref_file, input_interval, target_dir, tracer=None): input_files = list(itertools.chain.from_iterable(tracer.input_files.values())) domain_file = input_files[0][0] if hasattr(tracer, "domain_file"): domain_file = Path(domain_file).parent.joinpath(tracer.domain_file) ds = GribDataset( domain_file, read_keys=["pv"], filter_by_keys=tracer.filter_by_keys_dict, ) lon = ds["longitude"] lat = ds["latitude"] grid_type = get_grid_type(domain_file, tracer.filter_by_keys_dict) if grid_type == "octahedral": jscan = 0 else: jscan = get_jscan(domain_file, tracer.filter_by_keys_dict) if grid_type == "octahedral": path = Path(tracer.dir_hcoord_corner, tracer.file_hcoord_corner) if not path.is_file(): raise FileNotFoundError( f"Path to the corner coordinates does not exist, " "please check that it is properly defined in your yml.\n" f"Current definition is:\n" f"\t- dir_hcoord_corner = {tracer.dir_hcoord_corner}\n" f"\t- file_hcoord_corner = {tracer.file_hcoord_corner}\n" ) zlonc, zlatc, zlon, zlat, nlon, nlat = make_octahedral_grid(lat, lon, path) unstructured_domain = True elif grid_type == "reduced_gaussian": zlonc, zlatc, zlon, zlat, nlon, nlat = make_gaussian_grid(lat, lon) unstructured_domain = True elif grid_type == "regular": zlonc, zlatc, zlon, zlat, nlon, nlat = make_regular_grid(lat, lon, jscan) unstructured_domain = False else: raise ValueError(f"Unsupported grid type {grid_type}") # Reconstruct alpha and beta if tracer.surface: ecm_nlevs = 1 sigma_a = np.array([0]) sigma_b = np.array([1]) sigma_a_mid = np.array([0]) sigma_b_mid = np.array([1]) # Initializes domain domain = Domain( nlon=nlon, nlat=nlat, zlon=zlon, zlat=zlat, zlonc=zlonc, zlatc=zlatc, nlev=ecm_nlevs, pressure_unit="Pa", sigma_b=sigma_b, sigma_a=sigma_a, sigma_a_mid=sigma_a_mid, sigma_b_mid=sigma_b_mid, unstructured_domain=unstructured_domain, ) else: pv = ds.get_attr("pv") hybrid = np.array(ds["hybrid"]).flatten().astype(int) ecm_nlevs = int(len(pv) / 2 - 1) sigma_a = pv[: ecm_nlevs + 1][::-1] sigma_b = pv[ecm_nlevs + 1:][::-1] # sigma_a = np.zeros(ecm_nlevs + 1) # sigma_b = np.zeros(ecm_nlevs + 1) # for ii in range(ecm_nlevs - 1): # sigma_a[ii] = (pv[ecm_nlevs - ii] + pv[ecm_nlevs - ii - 1]) / 2 # sigma_b[ii] = \ # (pv[1 + 2 * ecm_nlevs - ii] + pv[1 + 2 * ecm_nlevs - ii - 1]) / 2 # Vertical crop if all hybrid levels are not available ind_crop = np.concatenate( [ecm_nlevs - hybrid[::-1], ecm_nlevs + 1 - hybrid[[0]]] ) sigma_a = sigma_a[ind_crop] sigma_b = sigma_b[ind_crop] ecm_nlevs = hybrid.size # Initializes domain domain = Domain( nlon=nlon, nlat=nlat, zlon=zlon, zlat=zlat, zlonc=zlonc, zlatc=zlatc, nlev=ecm_nlevs, pressure_unit="Pa", sigma_b=sigma_b, sigma_a=sigma_a, unstructured_domain=unstructured_domain, ) return domain