Source code for pycif.plugins.datastreams.fluxes.GFEDv4.read

import numpy as np
from logging import debug
import xarray as xr
import pandas as pd
import netCDF4
import h5py


[docs] def read( self, name, varnames, dates, files, interpol_flx=False, tracer=None, model=None, ddi=None, **kwargs ): """Get fluxes from raw files and load them into a pyCIF variables. Args: name (str): name of the component varnames (list[str]): original names of variables to read; use `name` if `varnames` is empty dates (list): list of the date intervals to extract files (list): list of the files matching dates Return: xr.DataArray: the actual data with dimension: time, levels, latitudes, longitudes """ var2extract = varnames if varnames != "" else name # Read emission factors ecosystems = ["SAVA", "BORF", "TEMF", "DEFO", "PEAT", "AGRI"] emis_factors = {k: 1000 for k in ecosystems} if hasattr(tracer, "file_emisfactors"): emis_factors = pd.read_csv(tracer.file_emisfactors, comment="#", header=None, sep=r"\s+", index_col=0, names=ecosystems) # Raise exception if var2extract not in emis_factors.index: raise Exception( "Could not find the species {} in the emission factor file: {}\n" "Please consider specifying the variable name " "explicitly using 'varname' in your yaml if the " "parameter name is not an existing species" .format(var2extract, tracer.file_emisfactors) ) emis_factors = emis_factors.loc[var2extract].to_dict() # Loop over dates/files and import data data = [] out_dates = [] ref_file = "" ref_month = -1 ref_day = -1 for dd, ff in zip(dates, files): debug( "Reading the file {} for the date interval {}".format( ff, dd ) ) # Open HDF file if ff != ref_file: f = h5py.File(ff, "r") ref_file = ff # read in dry matter emissions month = dd[0].month year = dd[0].year month_str = '{:02d}'.format(month) string = '/emissions/' + month_str + '/DM' if month != ref_month: ref_month = month # Dry matter emissions in kgDM/m2/month DM_emissions = f[string][:] # Convert to emissions per hour DM_emissions /= 24 * pd.DatetimeIndex([dd[0]]).days_in_month[0] # Compute conversion factor depending on contributions contrib_correction = 0 for source in tracer.ecosystems: # Exception if required ecosystems not in emission factors if source not in emis_factors: raise Exception("According to the yaml, I am expected to extract the " "contribution from the ecosystem {}, whereas I have " "only the following ecosystems available: {}\n" "Please check your yaml".format(source, emis_factors.keys())) # read in the fractional contribution of each source string = '/emissions/{}/partitioning/DM_{}'.format( month_str, source) contribution = f[string][:] # Apply the emission factor per ecosystem from kgDM to g-species contrib_correction += contribution * emis_factors[source] DM_emissions *= contrib_correction # Load finer temporal resolution if tfrac = tracer.temporal_fraction if tfrac in ["daily", "diurnal"]: if ref_day != dd[0].day: daily_frac_str = f'emissions/{month_str}/daily_fraction/day_{dd[0].day}' if daily_frac_str not in f: raise KeyError( f"The variable {daily_frac_str} is not available in {ff} " "whereas it is needed to extract daily fractions. " "Use the parameter `temporal_fraction = monthly` to use " "monthly values only, or fix your input file." ) daily_frac = \ f['emissions/{}/daily_fraction/day_{}' .format(month_str, dd[0].day)][:] DM_emissions *= daily_frac ref_day = dd[0].day if tfrac == "diurnal": dirunal_frac_str = f'emissions/{month_str}/diurnal_cycle/UTC_{dd[0].hour}-{dd[0].hour + 3}h' if dirunal_frac_str not in f: raise KeyError( f"The variable {dirunal_frac_str} is not available in {ff} " "whereas it is needed to extract diurnal fractions. " "Use the parameter `temporal_fraction = monthly` to use " "monthly values only, or fix your input file." ) diurnal_frac = \ f['emissions/{}/diurnal_cycle/UTC_{}-{}h' .format(month_str, dd[0].hour, dd[0].hour + 3)][:] DM_emissions *= diurnal_frac # Save data and swap latitudes data.append(DM_emissions[::-1]) out_dates.append(dd[0]) # if only one level for emissions, create the axis dataout = np.array(data)[:, np.newaxis] dataout[np.isnan(dataout)] = 0 xmod = xr.DataArray( dataout, coords={"time": out_dates}, dims=("time", "lev", "lat", "lon"), ) return xmod