Source code for pycif.plugins.datastreams.fluxes.EDGARv8_EYECLIMA_nc.read

from logging import info, debug
from pathlib import Path

from netCDF4 import Dataset
import numpy as np
import pandas as pd
import xarray as xr

import time

try:
    from pytz import country_timezones  # Not in pycif requirements.txt
    import pycountry  # Not in pycif requirements.txt
    import zoneinfo as zi  # Not in the standard library for Python 3.8 and below
    all_imported = True
except ModuleNotFoundError:
    country_timezones = None
    pycountry = None
    zi = None
    all_imported = False

try:
    import pycountry
except ModuleNotFoundError:
    pycountry = None
try:
    import zoneinfo as zi
except ModuleNotFoundError:
    zi = None


[docs] def read( self, name, varnames, dates, files, interpol_flx=False, comp_type=None, tracer=None, **kwargs, ): """Get fluxes from raw files and load them into a pyCIF variables """ if not all_imported: raise ImportError( "pycountry and zoneinfo are required for this plugin") Pdir = tracer.dir_profils # day type correspondances weekday_file = Path(Pdir, 'weekdays.csv') weekday_type = pd.read_csv(weekday_file, sep=';') weekendtype_file = Path(Pdir, 'weekenddays.csv') weekend_type = pd.read_csv(weekendtype_file, sep=';') # Time profiles TPweek_file = Path(Pdir, 'weekly_profiles.csv') coef_day = pd.read_csv(TPweek_file, sep=',') TPhour_file = Path(Pdir, 'hourly_profiles.csv', sep=',') coef_hour = pd.read_csv(TPhour_file, sep=',') # Read total number of country in the temporal profils country_list = weekend_type['Country_code_A3'].unique() nreg_total = len(country_list) debug(f" {nreg_total} countries with temporal profils, use debug to display") debug(country_list) # recommandation EYECLIMA MS2 d'Equivalences secteurs-profils temporels et alternatives profils_eq = {'RCO':['RCO'], 'REF':['REF'], 'REF_TRF':['REF'], 'IND':['IND'], 'NMM':['NMM'], 'CHE':['NMM','CHE'], 'IRO':['NMM','IRO'], 'NFE':['NMM','NFE'], 'NEU':['NMM','NEU'], 'ENE':['ENE'], 'TNR':['TRO','TNR'], 'TRO':['TRO','TNR'], 'SWD_IND':['SWD'], 'SWD':['SWD']} info("WARNING recommandations EYECLIMA MS2 d'Equivalences secteurs-profils temporels, use debug to display") debug(profils_eq) # Read EDGAR country mask mask_countries = xr.open_dataset(tracer.global_mask) country_list_glob = mask_countries['regions_name'].values.tolist() nreg_mask = len(country_list_glob) info(f"{nreg_mask} countries in edgar global mask, use debug to display") debug(country_list_glob) if nreg_total != nreg_mask: debug("WARNING not same number of countries between EDGAR temporal profil dataset and GLOBAL mask") # read mask country to select country_list_select = country_list debug('Force loop on country list with available temporal profiles') nreg_select = len(country_list_select) debug(f" {nreg_select} countries to select, use debug to display") debug(country_list_select) country_code_A2_list = {} mask = {} country_list_select_inmask = [] if tracer.truncated : # imin, imax = 1210,1651 # jmin, jmax = 1638, 2161 imin, imax = 1651,mask_countries['regions'].values.shape[0] jmin, jmax = 1638,mask_countries['regions'].values.shape[1] info((imin, imax)) info((jmin, jmax)) else: imin, imax = 0,mask_countries['regions'].values.shape[0] jmin, jmax = 0,mask_countries['regions'].values.shape[1] for country_code_A3 in country_list_select: idx = np.where(mask_countries['regions_name'].values == country_code_A3) mask[country_code_A3] = np.where(mask_countries['regions'].values == idx, 1, 0) if (np.sum(mask[country_code_A3])==0.): debug(f" WARNING no country {country_code_A3} in the mask ; emissions will probably be taken into account with flat temporal profiles") continue if (np.sum(mask[country_code_A3][imin:imax,jmin:jmax])==0.): debug(f" WARNING no country {country_code_A3} in the truncature ; emissions will probably be taken into account with flat temporal profiles") continue if pycountry.countries.get(alpha_3=country_code_A3): country_code_A2_list[country_code_A3] = country_timezones( pycountry.countries.get(alpha_3=country_code_A3).alpha_2) elif country_code_A3 == 'SEA': country_code_A2_list[country_code_A3] = ['UTC'] debug(f" WARNING use UTC for SEA ") else: debug(f" WARNING {country_code_A3} no timezone, use UTC") #continue country_code_A2_list[country_code_A3] = ['UTC'] country_list_select_inmask.append(country_code_A3) debug(country_list_select) debug(len(country_list_select)) debug('after trunc : ') debug(country_list_select_inmask) debug(len(country_list_select_inmask)) outdate = [] nc = None opened_file = "" idate = 0 data = [] for ddi, ff in zip(dates, files): info(ddi) if ff != opened_file or nc is None: info(' Reading of {} in {} for {}'.format([varnames], ff, ddi)) opened_file = ff if nc is not None: nc.close() nc = xr.open_dataset(ff[0], decode_times=False) nlon = nc.dims["lon"] nlat = nc.dims["lat"] #ntime = nc.dims["time"] emis = nc[varnames].values # kg/m2/s EDGAR_array = np.zeros((1, nlat, nlon)) control_mask = np.zeros((nlat, nlon)) debug(f" loop on {nreg_select} countries with available temporal profile") #for country, country_code_A3 in enumerate(country_list_select): t0 = time.process_time() for country_code_A3 in country_list_select_inmask: country_code_A2 = country_code_A2_list[country_code_A3] dd_local = ddi[0].astimezone(zi.ZoneInfo(country_code_A2[0])) yr = dd_local.year mm = dd_local.month # method to get the weekday of a given date as an integer, where Monday is 1 and Sunday is 7 weekday = dd_local.isoweekday() # Find weekend type Weekend_type_id = weekend_type['Weekend_type_id'][weekend_type['Country_code_A3'] == country_code_A3].values[0] # Find day type idx = ( (weekday_type['Weekend_type_id'] == Weekend_type_id) & (weekday_type['Weekday_id'] == weekday) ) Weekday_id = weekday_type.loc[idx, 'Daytype_id'].values[0] # daily coef extraction profil_type = profils_eq[tracer.profil_select][0] idx = ( (coef_day['Country_code_A3'] == country_code_A3) & (coef_day['activity_code'] == profil_type) & (coef_day['Weekday_id'] == weekday) ) if idx.sum() == 0: debug(f" WARNING no daily profil for {country_code_A3}, {profil_type}, {weekday} ") if len(profils_eq[tracer.profil_select])>1: profil_type = profils_eq[tracer.profil_select][1] debug(f" Try alternative {country_code_A3}, {profil_type}, {weekday} ") idx = ( (coef_day['Country_code_A3'] == country_code_A3) & (coef_day['activity_code'] == profil_type) & (coef_day['Weekday_id'] == weekday) ) if idx.sum() == 0: debug(f" no daily profil for alternative {country_code_A3}, {profil_type}, {weekday} ") debug(" emissions will probably be taken into account with flat temporal profiles") continue else: debug(f" no alternative ") continue local_wday_coef = coef_day.loc[idx, 'daily_factor'].values[0] idx = ( (coef_hour['Country_code_A3'] == country_code_A3) & (coef_hour['activity_code'] == profil_type) & (coef_hour['Daytype_id'] == Weekday_id) & (coef_hour['month_id'] == mm) ) local_hour_coef = coef_hour.loc[idx, 'h' + str(dd_local.hour+1)].values[0] control_mask += mask[country_code_A3] EDGAR_array += (emis[mm-1, :] * 24. * 7 * local_wday_coef) * local_hour_coef * mask[country_code_A3] # kg/m2/s # deal with areas without temporal profil debug(' flat profile for other countries') other = np.where(control_mask > 0, 0., 1.) tot = other.size cells = np.sum(other) pc = cells/tot*100 debug(f"{cells} gridcells on {tot}, i.e. {pc}% are using flat temporal profiles") EDGAR_array += emis[ddi[0].month-1, :] * other data.append(EDGAR_array) outdate.append(ddi[0]) idate += 1 if nc is not None: nc.close() return xr.DataArray( data, coords={"time": outdate}, dims=("time", "lev", "lat", "lon"), )
# else: # # return pd.concat(data)