Source code for pycif.plugins.datastreams.fluxes.flexpart.read

import os
import xarray as xr
import datetime
import numpy as np
from .....utils.netcdf import readnc
from .....utils.dataarrays.reindex import reindex
from logging import debug
import pandas as pd

# from .....utils.dates import j2d


[docs] def read(self, name, varnames, dates, files, interpol_flx=False, tracer=None, **kwargs): """Get fluxes from pre-computed fluxes and load them into a pycif variables Args: self: the model Plugin name: the name of the component tracdir, tracfic: flux directory and file format dates: list of dates to extract interpol_flx (bool): if True, interpolates fluxes at time t from values of surrounding available files """ # Reading fluxes for periods within the simulation window trcr_flx = np.empty((0, *tracer.domain.zlat.shape), dtype=float) times = [] ref_file_in = None ref_file_out = None for dd, file_flx in zip(dates, files): if file_flx != ref_file_in: debug("Read {} for FLEXPART fluxes".format(file_flx)) # First load inside domain # data_in, lat_in, lon_in, time_jd_in = \ data_in, lat_in, lon_in = \ readnc(file_flx, [varnames, tracer.latname_flx, tracer.lonname_flx]) # self.lonname_flx, self.timename_flx]) ddi = datetime.datetime(year=dd[0].year, month=1, day=1) ddf = datetime.datetime(year=ddi.year + 1, month=1, day=1) file_dates = pd.date_range(ddi, ddf, freq="1MS").to_list() # Take only correct time index if dates are present if len(data_in.shape) == 3: idata_in = file_dates.index(dd[0]) if dd[0] in file_dates else 0 data_in = data_in[idata_in] # Convert julian day (since 1-1-1900) to datetime times.append(dd[0]) # Extract only data covering the inversion region if tracer.crop_region: ix0 = np.argmin(np.abs(lon_in - tracer.domain.lon_in[0])) iy0 = np.argmin(np.abs(lat_in - tracer.domain.lat_in[0])) data_in = data_in[iy0:iy0 + tracer.domain.nlat, ix0:ix0 + tracer.domain.nlon] flx_reg_in = data_in.flatten() # Loading outside data if available if tracer.domain.nested and hasattr(tracer, "file_glob"): out_file = os.path.join( os.path.dirname(file_flx), dd[0].strftime(tracer.file_glob) ) if ref_file_out != out_file: debug("Read {} for FLEXPART fluxes".format(out_file)) time_jd_out = None with xr.open_dataset(out_file) as ds: time_jd_out = file_dates if tracer.timename_flx in ds: time_raw = ds[tracer.timename_flx].to_dataframe()[ tracer.timename_flx] if hasattr(time_raw, "dt"): time_jd_out = list( time_raw.dt.to_pydatetime() ) idata_out = ( time_jd_out.index(dd[0]) if dd[0] in time_jd_out else 0 ) data_out = ds[varnames].values # data_out, time_jd_out = \ # readnc(out_file, # [varnames, self.timename_flx]) if time_jd_out is not None and len(data_out.shape) == 3: data_out = data_out[idata_out] # # Extract data outside nest domain # flx_reg_out = \ # np.delete(data_out.flatten(), # self.domain.raveled_indexes_glob) flx_reg_out = data_out.flatten() else: flx_reg_out = \ np.zeros((1, tracer.domain.zlat.size - flx_reg_in.size)) + np.nan # Concatenate nest and global fluxes out_flx = np.append(flx_reg_in, flx_reg_out)[np.newaxis, np.newaxis] # Convert to ng/m2/s if tracer.convert_to_flux: numscale = float(getattr(tracer, 'numscale', 1.E12)) out_flx *= numscale / 3600. # Concatenate trcr_flx = np.append(trcr_flx, out_flx, axis=0) # Put data into dataarray xmod = xr.DataArray(trcr_flx[:, np.newaxis], coords={'time': np.array(times)}, dims=('time', 'lev', 'lat', 'lon')) # Reindex to required dates # xmod = reindex(xmod, levels={"time": np.array(dates).astype(np.datetime64)}) # # # # TODO: take care if several files are read # # TODO: scale flux contribution by area weight for boxes # # TODO: consider storing fluxes at original time resolution and # # interpolate as needed # # flx = np.ndarray((self.ndates, self.domain.nlat, self.domain.nlon)) # # # Interpolate fluxes to start time of control period # for ddt in range(self.ndates): # if interpol_flx: # flx[ddt, :, :] = xmod.interp(time=self.dates[ddt]) # else: # flx[ddt, :, :] = xmod.sel(time=self.dates[ddt], method='nearest') return xmod