import numpy as np
import xarray as xr
import glob
import datetime
import os
import pandas as pd
import itertools
from .....utils.classes.setup import Setup
from .....utils.classes.domains import Domain
from logging import info, debug
[docs]
def get_domain(ref_dir, ref_file, input_interval, target_dir, tracer=None):
"""
Args:
--------
ref_dir: directory where the original files are found
ref_file: (template) name of the original files
input_interval: list of the periods to simulate, each item is the list of the dates of
the period
target_dir: directory where the links to the orginal files are created
Ouputs:
--------
setup of the domain in section "Initializes domain"
"""
if not tracer.point_sources:
domain = get_area_domain(tracer)
else:
domain = get_point_domain(tracer)
return domain
[docs]
def get_area_domain(tracer):
# Looking for a reference file to read lon/lat in
domain_file = None
ref_file = list(itertools.chain.from_iterable(tracer.input_files.values()))
if len(ref_file) != 0:
domain_file = ref_file[0][0]
if domain_file is None:
raise Exception(
"TNO domain could not be initialized as no file was found"
)
# Read lon/lat in
nc = xr.open_dataset(domain_file, decode_times=False)
llon = nc['longitude'].values
llat = nc['latitude'].values
llonb = nc['longitude_bounds'].values
llatb = nc['latitude_bounds'].values
# compute the corner matrix
llonc = np.append(llonb[:, 0], llonb[-1, 1])
llatc = np.append(llatb[:, 0], llatb[-1, 1])
lon, lat = np.meshgrid(llon, llat)
lonc, latc = np.meshgrid(llonc, llatc)
nlat, nlon = lat.shape[0], lat.shape[1]
# If no vertical dimension for emissions, provide dummy vertical
punit = "Pa"
nlevs = 1
sigma_a_mid = np.array([0])
sigma_b_mid = np.array([1])
# Put it to a domain Plugin
domain = Domain(nlon=nlon, nlat=nlat,
zlon=lon, zlat=lat,
zlonc=lonc, zlatc=latc,
nlev=nlevs, pressure_unit=punit,
sigma_b_mid=sigma_b_mid, sigma_a_mid=sigma_a_mid)
return domain
[docs]
def get_point_domain(tracer):
# Get vertical extent of point sources
VerticalP_file = tracer.dir_profiles + 'TNO_height-distribution_GNFR.csv'
coef = pd.read_csv(VerticalP_file, sep=';', comment='#')
l = list(coef.columns)
height = l[l.index('GNFR_Category_Name') + 1:]
height_down = np.array([float(x.split('-')[0].replace(" ", ""))
for x in height])
height_top = np.array(
[float(x.split('-')[1].replace("m", "").replace(" ", ""))
for x in height])
nlevs = len(height_top)
info(f'Top heighs: {height_top}')
# Now loop over files to get lon/lat of point sources
dates = tracer.input_dates
files = tracer.input_files
varnames = tracer.varname
TNO_array = []
nc = None
opened_file = ""
for dd_ref in dates:
for ddi, ff in zip(dates[dd_ref], files[dd_ref]):
# Open file if not already processed
if ff != opened_file or nc is None:
debug('Reading of {} in {} for {}'.format([varnames], ff, ddi))
opened_file = ff
ds = pd.DataFrame({})
nc = xr.open_dataset(ff[0], decode_times=False)
list_cat_name = nc['emis_cat_code'].values
list_cat_name = [b.decode("utf-8") for b in list_cat_name]
debug(f"List of category names: {list_cat_name}")
ds['cat_index'] = nc['emission_category_index'].values
ds['source_type'] = nc['source_type_index'].values
ds['emis'] = nc[varnames].values
nlon = nc.dims["longitude"]
nlat = nc.dims["latitude"]
ds["lon"] = nc["longitude_source"]
ds["lat"] = nc["latitude_source"]
if tracer.cat_select:
cat_list = tracer.cat_select
else:
cat_list = np.arange(
ds['cat_index'].min(), ds['cat_index'].max() + 1)
else:
continue
# Loop over point sources
ds_cat = ds[(ds.cat_index.isin(cat_list)) & (ds.source_type == 2)]
mask_nonzero = ds_cat['emis'] != 0
ds_cat = ds_cat.loc[mask_nonzero]
lons = ds_cat["lon"]
lats = ds_cat["lat"]
tmp_ds = pd.DataFrame({
"lon": lons, "lat": lats,
"file": len(lats) * [ff[0]],
})
tmp_ds["file"] = tmp_ds["file"].astype("category")
TNO_array.append(tmp_ds)
TNO_array = pd.concat(TNO_array)
# Now prepare variables for the domain
zlon = TNO_array["lon"].values[np.newaxis, :]
zlat = TNO_array["lat"].values[np.newaxis, :]
zlonc = zlon[:, :]
zlatc = zlat[:, :]
nlon = len(TNO_array["lon"])
nlat = 1
# Put it to a domain Plugin
domain = Domain(
nlon=nlon, nlat=nlat,
zlon=zlon, zlat=zlat,
zlonc=zlonc, zlatc=zlatc,
nlev=nlevs,
heights=height_top,
height_unit="m",
unstructured_domain=True,
value_file = TNO_array["file"],
)
return domain