Source code for pycif.plugins.datastreams.fluxes.iconart.read
import numpy as np
import xarray as xr
from logging import debug
from calendar import monthrange
import pandas as pd
import datetime as dt
[docs]
def read(self, name, varnames, dates, files, interpol_flx=False, tracer=None, model=None, ddi=None, **kwargs):
"""Get fluxes from pre-computed fluxes and load them into a pyCIF
variables
Args:
self: the fluxes Plugin
name: the name of the component
tracdir, tracfile: flux directory and file format
dates: list of dates to extract
interpol_flx (bool): if True, interpolates fluxes at time t from
values of surrounding available files
"""
var2extract = varnames if varnames != "" else name
# Loop over dates/files and import data
data = []
out_dates = pd.to_datetime([date[0] for date in dates])
debug("OEM: Reading only the first date.")
is_temp_scaled = True
ds = xr.open_dataset(files[0])
da = ds[var2extract]
if hasattr(self, "tfactors_hoy_file"):
debug("OEM: Applying hour-of-year temporal scaling factors")
da_country_ids = ds["country_ids"]
new_da = da.expand_dims(time=out_dates).copy()
tfactors_hoy_file = out_dates[0].strftime(self.tfactors_hoy_file)
da_hoy = xr.open_dataset(tfactors_hoy_file)[self.tfactors_oem_group]
countries = da_hoy.country
ncountries = len(countries)
da_tfactor = xr.DataArray(
np.ones((len(out_dates), ncountries)),
coords={"time": out_dates, "country": da_hoy.country},
dims=("time", "country"),
)
def hour_of_year(current_date):
beginning_of_year = dt.datetime(current_date.year, 1, 1)
return int((current_date - beginning_of_year).total_seconds() // 3600)
for dd in out_dates:
da_tfactor.loc[dd] = da_hoy[hour_of_year(dd)]
elif (
hasattr(self, "tfactors_hod_file")
and hasattr(self, "tfactors_dow_file")
and hasattr(self, "tfactors_moy_file")
):
debug("OEM: Applying month-of-year, day-of-week and hour-of-day temporal scaling factors")
da_country_ids = ds["country_ids"]
new_da = da.expand_dims(time=out_dates).copy()
tfactors_moy_file = out_dates[0].strftime(self.tfactors_moy_file)
tfactors_dow_file = out_dates[0].strftime(self.tfactors_dow_file)
tfactors_hod_file = out_dates[0].strftime(self.tfactors_hod_file)
da_moy = xr.open_dataset(tfactors_moy_file)[self.tfactors_oem_group]
da_dow = xr.open_dataset(tfactors_dow_file)[self.tfactors_oem_group]
da_hod = xr.open_dataset(tfactors_hod_file)[self.tfactors_oem_group]
countries = da_moy.country
ncountries = len(da_moy.country)
da_tfactor = xr.DataArray(
np.zeros((len(out_dates), ncountries)),
coords={"time": out_dates, "country": da_moy.country},
dims=("time", "country"),
)
for dd in out_dates:
tfactor_moy = da_moy[dd.month - 1]
tfactor_dow = da_dow[dd.dayofweek] # Monday = 0, Sunday = 6 here
tfactor_hod = da_hod[dd.hour] # 00:00 = 0 and 23:00 = 23 here
da_tfactor.loc[dd] = tfactor_moy * tfactor_dow * tfactor_hod
else:
debug("OEM: Fetching all the hours in this dataset.")
is_temp_scaled = False
if 'time' not in da.dims:
raise KeyError("No time dimension in the dataset")
try:
data = da.sel(time=out_dates)
except KeyError:
raise KeyError("Some hours are missing in the given file.")
if is_temp_scaled:
ncells = da[da.dims[-1]].size
if ncountries == ncells:
new_da = new_da * da_tfactor.rename({"country": da.dims[-1]})
else:
for icntry, cntry in enumerate(countries):
new_da[:, da_country_ids == icntry] *= da_tfactor.sel(country=cntry)
data = new_da.values
# If only one level for emissions, create the axis:
xmod = xr.DataArray(
np.array(data)[:, np.newaxis, np.newaxis, :],
coords={"time": out_dates},
dims=("time", "lev", "lat", "lon"),
)
return xmod