from logging import info, debug
from pathlib import Path
from netCDF4 import Dataset
import numpy as np
import pandas as pd
import xarray as xr
import time
try:
from pytz import country_timezones # Not in pycif requirements.txt
import pycountry # Not in pycif requirements.txt
import zoneinfo as zi # Not in the standard library for Python 3.8 and below
all_imported = True
except ModuleNotFoundError:
country_timezones = None
pycountry = None
zi = None
all_imported = False
try:
import pycountry
except ModuleNotFoundError:
pycountry = None
try:
import zoneinfo as zi
except ModuleNotFoundError:
zi = None
[docs]
def read(
self,
name,
varnames,
dates,
files,
interpol_flx=False,
comp_type=None,
tracer=None,
**kwargs,
):
"""Get fluxes from raw files and load them into a pyCIF variables
"""
if not all_imported:
raise ImportError(
"pycountry and zoneinfo are required for this plugin")
Pdir = tracer.dir_profils
# day type correspondances
weekday_file = Path(Pdir, 'weekdays.csv')
weekday_type = pd.read_csv(weekday_file, sep=';')
weekendtype_file = Path(Pdir, 'weekenddays.csv')
weekend_type = pd.read_csv(weekendtype_file, sep=';')
# Time profiles
TPweek_file = Path(Pdir, 'weekly_profiles.csv')
coef_day = pd.read_csv(TPweek_file, sep=',')
TPhour_file = Path(Pdir, 'hourly_profiles.csv', sep=',')
coef_hour = pd.read_csv(TPhour_file, sep=',')
# Read total number of country in the temporal profils
country_list = weekend_type['Country_code_A3'].unique()
nreg_total = len(country_list)
debug(f" {nreg_total} countries with temporal profils, use debug to display")
debug(country_list)
# recommandation EYECLIMA MS2 d'Equivalences secteurs-profils temporels et alternatives
profils_eq = {'RCO':['RCO'],
'REF':['REF'],
'REF_TRF':['REF'],
'IND':['IND'],
'NMM':['NMM'],
'CHE':['NMM','CHE'],
'IRO':['NMM','IRO'],
'NFE':['NMM','NFE'],
'NEU':['NMM','NEU'],
'ENE':['ENE'],
'TNR':['TRO','TNR'],
'TRO':['TRO','TNR'],
'SWD_IND':['SWD'],
'SWD':['SWD']}
info("WARNING recommandations EYECLIMA MS2 d'Equivalences secteurs-profils temporels, use debug to display")
debug(profils_eq)
# Read EDGAR country mask
mask_countries = xr.open_dataset(tracer.global_mask)
country_list_glob = mask_countries['regions_name'].values.tolist()
nreg_mask = len(country_list_glob)
info(f"{nreg_mask} countries in edgar global mask, use debug to display")
debug(country_list_glob)
if nreg_total != nreg_mask:
debug("WARNING not same number of countries between EDGAR temporal profil dataset and GLOBAL mask")
# read mask country to select
country_list_select = country_list
debug('Force loop on country list with available temporal profiles')
nreg_select = len(country_list_select)
debug(f" {nreg_select} countries to select, use debug to display")
debug(country_list_select)
country_code_A2_list = {}
mask = {}
country_list_select_inmask = []
if tracer.truncated :
# imin, imax = 1210,1651
# jmin, jmax = 1638, 2161
imin, imax = 1651,mask_countries['regions'].values.shape[0]
jmin, jmax = 1638,mask_countries['regions'].values.shape[1]
info((imin, imax))
info((jmin, jmax))
else:
imin, imax = 0,mask_countries['regions'].values.shape[0]
jmin, jmax = 0,mask_countries['regions'].values.shape[1]
for country_code_A3 in country_list_select:
idx = np.where(mask_countries['regions_name'].values == country_code_A3)
mask[country_code_A3] = np.where(mask_countries['regions'].values == idx, 1, 0)
if (np.sum(mask[country_code_A3])==0.):
debug(f" WARNING no country {country_code_A3} in the mask ; emissions will probably be taken into account with flat temporal profiles")
continue
if (np.sum(mask[country_code_A3][imin:imax,jmin:jmax])==0.):
debug(f" WARNING no country {country_code_A3} in the truncature ; emissions will probably be taken into account with flat temporal profiles")
continue
if pycountry.countries.get(alpha_3=country_code_A3):
country_code_A2_list[country_code_A3] = country_timezones(
pycountry.countries.get(alpha_3=country_code_A3).alpha_2)
elif country_code_A3 == 'SEA':
country_code_A2_list[country_code_A3] = ['UTC']
debug(f" WARNING use UTC for SEA ")
else:
debug(f" WARNING {country_code_A3} no timezone, use UTC")
#continue
country_code_A2_list[country_code_A3] = ['UTC']
country_list_select_inmask.append(country_code_A3)
debug(country_list_select)
debug(len(country_list_select))
debug('after trunc : ')
debug(country_list_select_inmask)
debug(len(country_list_select_inmask))
outdate = []
nc = None
opened_file = ""
idate = 0
data = []
for ddi, ff in zip(dates, files):
info(ddi)
if ff != opened_file or nc is None:
info(' Reading of {} in {} for {}'.format([varnames], ff, ddi))
opened_file = ff
if nc is not None:
nc.close()
nc = xr.open_dataset(ff[0], decode_times=False)
nlon = nc.dims["lon"]
nlat = nc.dims["lat"]
#ntime = nc.dims["time"]
emis = nc[varnames].values # kg/m2/s
EDGAR_array = np.zeros((1, nlat, nlon))
control_mask = np.zeros((nlat, nlon))
debug(f" loop on {nreg_select} countries with available temporal profile")
#for country, country_code_A3 in enumerate(country_list_select):
t0 = time.process_time()
for country_code_A3 in country_list_select_inmask:
country_code_A2 = country_code_A2_list[country_code_A3]
dd_local = ddi[0].astimezone(zi.ZoneInfo(country_code_A2[0]))
yr = dd_local.year
mm = dd_local.month
# method to get the weekday of a given date as an integer, where Monday is 1 and Sunday is 7
weekday = dd_local.isoweekday()
# Find weekend type
Weekend_type_id = weekend_type['Weekend_type_id'][weekend_type['Country_code_A3']
== country_code_A3].values[0]
# Find day type
idx = (
(weekday_type['Weekend_type_id'] == Weekend_type_id)
& (weekday_type['Weekday_id'] == weekday)
)
Weekday_id = weekday_type.loc[idx, 'Daytype_id'].values[0]
# daily coef extraction
profil_type = profils_eq[tracer.profil_select][0]
idx = (
(coef_day['Country_code_A3'] == country_code_A3)
& (coef_day['activity_code'] == profil_type)
& (coef_day['Weekday_id'] == weekday)
)
if idx.sum() == 0:
debug(f" WARNING no daily profil for {country_code_A3}, {profil_type}, {weekday} ")
if len(profils_eq[tracer.profil_select])>1:
profil_type = profils_eq[tracer.profil_select][1]
debug(f" Try alternative {country_code_A3}, {profil_type}, {weekday} ")
idx = (
(coef_day['Country_code_A3'] == country_code_A3)
& (coef_day['activity_code'] == profil_type)
& (coef_day['Weekday_id'] == weekday)
)
if idx.sum() == 0:
debug(f" no daily profil for alternative {country_code_A3}, {profil_type}, {weekday} ")
debug(" emissions will probably be taken into account with flat temporal profiles")
continue
else:
debug(f" no alternative ")
continue
local_wday_coef = coef_day.loc[idx, 'daily_factor'].values[0]
idx = (
(coef_hour['Country_code_A3'] == country_code_A3)
& (coef_hour['activity_code'] == profil_type)
& (coef_hour['Daytype_id'] == Weekday_id)
& (coef_hour['month_id'] == mm)
)
local_hour_coef = coef_hour.loc[idx,
'h' + str(dd_local.hour+1)].values[0]
control_mask += mask[country_code_A3]
EDGAR_array += (emis[mm-1, :] * 24. * 7 *
local_wday_coef) * local_hour_coef * mask[country_code_A3] # kg/m2/s
# deal with areas without temporal profil
debug(' flat profile for other countries')
other = np.where(control_mask > 0, 0., 1.)
tot = other.size
cells = np.sum(other)
pc = cells/tot*100
debug(f"{cells} gridcells on {tot}, i.e. {pc}% are using flat temporal profiles")
EDGAR_array += emis[ddi[0].month-1, :] * other
data.append(EDGAR_array)
outdate.append(ddi[0])
idate += 1
if nc is not None:
nc.close()
return xr.DataArray(
data,
coords={"time": outdate},
dims=("time", "lev", "lat", "lon"),
)
# else:
#
# return pd.concat(data)