Source code for pycif.plugins.datastreams.fluxes.gridded_NetCDF.utils
from logging import debug
import pandas as pd
import xarray as xr
import numpy as np
from .time_coord import convert_calendar, decode_datetimes
OFFSET = pd.offsets.Nano(1) # One nanosecond time offest
[docs]
def get_time_intervals(
ds, tracer, time_varname, date_i, date_f,
calendar,
time_midpoint=False,
time_endpoint=False
):
var_freq = getattr(tracer, 'var_freq', None)
if hasattr(tracer, 'period_varname'):
# Read periods form the NetCDF file
periods = ds[tracer.period_varname].values
time_intervals = np.concatenate([
pd.DatetimeIndex(periods[:, [0]]).to_pydatetime(),
pd.DatetimeIndex(periods[:, [1]]).to_pydatetime()],
axis=1
)
else:
# Compute periods with the NetCDF file time coordinates and the
# optional 'var_freq' argument
if ds[time_varname].size == 1 and var_freq is None:
# Cannot infer period with only one time value, using 'file_freq'
# argument as 'var_freq'
var_freq = tracer.file_freq
# Converting 'Start' offset (not useable as period dtype) to 'End'
# offset (useable as period dtype)
if var_freq[-1] == 'S':
var_freq = var_freq[:-1]
debug(f"single time value in file '{ds.encoding['source']}'\n"
f"using argument 'file_freq' as 'var_freq' ('{var_freq}')")
if var_freq is not None:
if var_freq[-1] == 'S':
raise Exception(
f"The frequency {var_freq} is not accepted by 'to_period'. "
f"The 'S' anchor at the end is not accepted for periods. "
f"Please remove it."
)
try:
dates = pd.to_datetime(ds[time_varname]).values
# Infer periods if argument 'var_freq' is not provided
periods = pd.to_datetime(dates).to_period(freq=var_freq)
if var_freq is not None:
debug(f"Inferred frequency for file '{ds.encoding['source']}': "
f"'{periods.freqstr}'")
except ValueError as e:
if var_freq is None:
msg = ("could not infer frequency from 'time' coordinate "
f"values in file '{ds.encoding['source']}'. "
f"Please use the 'var_freq' argument.")
else:
msg = ("could not parse 'time' coordinate values in file "
f"'{ds.encoding['source']}' with var_freq={var_freq}.")
raise ValueError(msg) from e
time_intervals = np.concatenate([
periods.start_time.to_pydatetime()[:, np.newaxis],
(periods.end_time + OFFSET).to_pydatetime()[:, np.newaxis]
], axis=1)
if len(time_intervals) == 1:
di, df = time_intervals[0]
# Checking if time interval and simulation window are overlaping
if df <= date_i or date_f <= di:
raise ValueError(
f"Simulation window [{date_i.isoformat(), date_f.isoformat()}] "
f"and the single period [{di.isoformat(), df.isoformat()}] "
f"in file '{ds.encoding['source']}' are not overlaping"
)
else:
mask = ((time_intervals[:, 0] <= date_f)
& (time_intervals[:, 1] >= date_i))
time_intervals = time_intervals[mask]
if time_intervals.size == 0:
raise ValueError(
"No period found whithin the simulation window "
f"[{date_i.isoformat(), date_f.isoformat()}] in file "
f"'{ds.encoding['source']}'"
)
return time_intervals.tolist()
[docs]
def get_year_offset(
ds: xr.Dataset,
time_varname: str,
ref_year: int,
is_climatology: bool = False
) -> int:
if is_climatology:
years = ref_year - ds[time_varname].dt.year.values[0]
else:
year_file = ds[time_varname].dt.year.values
if ref_year > year_file.max():
years = ref_year - year_file.max()
elif ref_year < year_file.min():
years = ref_year - year_file.min()
else:
years = 0
return years