Source code for pycif.plugins.datastreams.fluxes.iconart.read

import numpy as np
import xarray as xr
from logging import debug
from calendar import monthrange
import pandas as pd
import datetime as dt


[docs] def read(self, name, varnames, dates, files, interpol_flx=False, tracer=None, model=None, ddi=None, **kwargs): """Get fluxes from pre-computed fluxes and load them into a pyCIF variables Args: self: the fluxes Plugin name: the name of the component tracdir, tracfile: flux directory and file format dates: list of dates to extract interpol_flx (bool): if True, interpolates fluxes at time t from values of surrounding available files """ var2extract = varnames if varnames != "" else name # Loop over dates/files and import data data = [] out_dates = pd.to_datetime([date[0] for date in dates]) debug("OEM: Reading only the first date.") is_temp_scaled = True ds = xr.open_dataset(files[0]) da = ds[var2extract] if hasattr(self, "tfactors_hoy_file"): debug("OEM: Applying hour-of-year temporal scaling factors") da_country_ids = ds["country_ids"] new_da = da.expand_dims(time=out_dates).copy() tfactors_hoy_file = out_dates[0].strftime(self.tfactors_hoy_file) da_hoy = xr.open_dataset(tfactors_hoy_file)[self.tfactors_oem_group] countries = da_hoy.country ncountries = len(countries) da_tfactor = xr.DataArray( np.ones((len(out_dates), ncountries)), coords={"time": out_dates, "country": da_hoy.country}, dims=("time", "country"), ) def hour_of_year(current_date): beginning_of_year = dt.datetime(current_date.year, 1, 1) return int((current_date - beginning_of_year).total_seconds() // 3600) for dd in out_dates: da_tfactor.loc[dd] = da_hoy[hour_of_year(dd)] elif ( hasattr(self, "tfactors_hod_file") and hasattr(self, "tfactors_dow_file") and hasattr(self, "tfactors_moy_file") ): debug("OEM: Applying month-of-year, day-of-week and hour-of-day temporal scaling factors") da_country_ids = ds["country_ids"] new_da = da.expand_dims(time=out_dates).copy() tfactors_moy_file = out_dates[0].strftime(self.tfactors_moy_file) tfactors_dow_file = out_dates[0].strftime(self.tfactors_dow_file) tfactors_hod_file = out_dates[0].strftime(self.tfactors_hod_file) da_moy = xr.open_dataset(tfactors_moy_file)[self.tfactors_oem_group] da_dow = xr.open_dataset(tfactors_dow_file)[self.tfactors_oem_group] da_hod = xr.open_dataset(tfactors_hod_file)[self.tfactors_oem_group] countries = da_moy.country ncountries = len(da_moy.country) da_tfactor = xr.DataArray( np.zeros((len(out_dates), ncountries)), coords={"time": out_dates, "country": da_moy.country}, dims=("time", "country"), ) for dd in out_dates: tfactor_moy = da_moy[dd.month - 1] tfactor_dow = da_dow[dd.dayofweek] # Monday = 0, Sunday = 6 here tfactor_hod = da_hod[dd.hour] # 00:00 = 0 and 23:00 = 23 here da_tfactor.loc[dd] = tfactor_moy * tfactor_dow * tfactor_hod else: debug("OEM: Fetching all the hours in this dataset.") is_temp_scaled = False if 'time' not in da.dims: raise KeyError("No time dimension in the dataset") try: data = da.sel(time=out_dates) except KeyError: raise KeyError("Some hours are missing in the given file.") if is_temp_scaled: ncells = da[da.dims[-1]].size if ncountries == ncells: new_da = new_da * da_tfactor.rename({"country": da.dims[-1]}) else: for icntry, cntry in enumerate(countries): new_da[:, da_country_ids == icntry] *= da_tfactor.sel(country=cntry) data = new_da.values # If only one level for emissions, create the axis: xmod = xr.DataArray( np.array(data)[:, np.newaxis, np.newaxis, :], coords={"time": out_dates}, dims=("time", "lev", "lat", "lon"), ) return xmod