Source code for pycif.plugins.datastreams.fluxes.gridded_NetCDF.utils

from logging import debug

import pandas as pd
import xarray as xr
import numpy as np

from .time_coord import convert_calendar, decode_datetimes

OFFSET = pd.offsets.Nano(1)  # One nanosecond time offest


[docs] def get_time_intervals( ds, tracer, time_varname, date_i, date_f, calendar, time_midpoint=False, time_endpoint=False ): var_freq = getattr(tracer, 'var_freq', None) if hasattr(tracer, 'period_varname'): # Read periods form the NetCDF file periods = ds[tracer.period_varname].values time_intervals = np.concatenate([ pd.DatetimeIndex(periods[:, [0]]).to_pydatetime(), pd.DatetimeIndex(periods[:, [1]]).to_pydatetime()], axis=1 ) else: # Compute periods with the NetCDF file time coordinates and the # optional 'var_freq' argument if ds[time_varname].size == 1 and var_freq is None: # Cannot infer period with only one time value, using 'file_freq' # argument as 'var_freq' var_freq = tracer.file_freq # Converting 'Start' offset (not useable as period dtype) to 'End' # offset (useable as period dtype) if var_freq[-1] == 'S': var_freq = var_freq[:-1] debug(f"single time value in file '{ds.encoding['source']}'\n" f"using argument 'file_freq' as 'var_freq' ('{var_freq}')") if var_freq is not None: if var_freq[-1] == 'S': raise Exception( f"The frequency {var_freq} is not accepted by 'to_period'. " f"The 'S' anchor at the end is not accepted for periods. " f"Please remove it." ) try: dates = pd.to_datetime(ds[time_varname]).values # Infer periods if argument 'var_freq' is not provided periods = pd.to_datetime(dates).to_period(freq=var_freq) if var_freq is not None: debug(f"Inferred frequency for file '{ds.encoding['source']}': " f"'{periods.freqstr}'") except ValueError as e: if var_freq is None: msg = ("could not infer frequency from 'time' coordinate " f"values in file '{ds.encoding['source']}'. " f"Please use the 'var_freq' argument.") else: msg = ("could not parse 'time' coordinate values in file " f"'{ds.encoding['source']}' with var_freq={var_freq}.") raise ValueError(msg) from e time_intervals = np.concatenate([ periods.start_time.to_pydatetime()[:, np.newaxis], (periods.end_time + OFFSET).to_pydatetime()[:, np.newaxis] ], axis=1) if len(time_intervals) == 1: di, df = time_intervals[0] # Checking if time interval and simulation window are overlaping if df <= date_i or date_f <= di: raise ValueError( f"Simulation window [{date_i.isoformat(), date_f.isoformat()}] " f"and the single period [{di.isoformat(), df.isoformat()}] " f"in file '{ds.encoding['source']}' are not overlaping" ) else: mask = ((time_intervals[:, 0] <= date_f) & (time_intervals[:, 1] >= date_i)) time_intervals = time_intervals[mask] if time_intervals.size == 0: raise ValueError( "No period found whithin the simulation window " f"[{date_i.isoformat(), date_f.isoformat()}] in file " f"'{ds.encoding['source']}'" ) return time_intervals.tolist()
[docs] def get_year_offset( ds: xr.Dataset, time_varname: str, ref_year: int, is_climatology: bool = False ) -> int: if is_climatology: years = ref_year - ds[time_varname].dt.year.values[0] else: year_file = ds[time_varname].dt.year.values if ref_year > year_file.max(): years = ref_year - year_file.max() elif ref_year < year_file.min(): years = ref_year - year_file.min() else: years = 0 return years