from logging import warning
import cftime
import numpy as np
import pandas as pd
import xarray as xr
from .....utils.check.errclass import IllegalArgumentError
from ....domains.gridded_NetCDF.utils import find_coord
OFFSET = pd.offsets.Nano(1) # One nanosecond time offest
[docs]
def find_time_coord(ds: xr.Dataset, tracer, ref_date):
if tracer.add_time_coord:
# Impose var_freq if specified
var_freq = getattr(tracer, "var_freq", None)
# time dimension may be missing, in this case, add it
if tracer.time_dimname not in ds.dims:
ds = ds.expand_dims(tracer.time_dimname)
if var_freq is None:
# Adding 'time' coordinate with a single element
time = [ref_date]
if ds.sizes[tracer.time_dimname] != 1:
raise ValueError(
f"time dimension '{tracer.time_dimname}' size > 1. "
"When using the 'add_time_coord' argument, for files with a "
"time dimension of size greater than 1, please also use the "
"'var_freq' argument."
)
else:
time = pd.date_range(ref_date, periods=ds.sizes[tracer.time_dimname], freq=var_freq)
if tracer.time_dimname != "time":
ds = ds.rename_dims({tracer.time_dimname: "time"})
ds = ds.assign_coords({"time": time})
return ds, "time"
else:
time_varname = getattr(tracer, "time_varname", None)
if time_varname is None:
try:
time_varname = find_coord(ds, "time", "time")
except ValueError as e:
raise ValueError(
"Could not find the coordinate `time` (see Exception message above)\n"
"This can be fixed by explicitly specifying the time variable name "
"using `time_varname`, or by adding a time_coord and a var_freq"
) from e
return ds, time_varname
[docs]
def get_calendar(ds: xr.Dataset, time_varname: str):
if 'calendar' in ds[time_varname].attrs:
calendar = ds[time_varname].attrs.get('calendar')
else:
calendar = getattr(ds[time_varname].values[0], 'calendar', None)
return calendar
[docs]
def preprocess_time_coord(
ds: xr.Dataset,
tracer,
time_varname: str,
ref_date,
time_midpoint: bool = False,
time_endpoint: bool = False,
file_freq: str = "",
is_climatology: bool = False
) -> xr.Dataset:
"""Preprocesses the time coordinate
"""
time_dimname, = ds[time_varname].dims
# Ensuring that time's coordinate and dimension have the
# same name (necessary for slicing below)
if time_dimname != time_varname:
ds = ds.rename({time_dimname: time_varname})
ds = ds.assign_coords({time_varname: ds[time_varname]})
# Decode datetimes with time units if necessary
if ds[time_varname].dtype.kind != np.dtype('datetime64').kind or tracer.force_var_freq:
ds = decode_datetimes(ds, tracer, time_varname,
ref_date, file_freq=file_freq,
is_climatology=is_climatology)
# Shift dates if needed
if time_midpoint or time_endpoint:
dt = np.unique(np.diff(ds[time_varname].values))
# Shifting dates can be applied only with uniform delta
if len(dt) != 1:
# Non-uniform deltas can happen for leap-years
dt = pd.DatetimeIndex(ds[time_varname]).to_series().diff().iloc[1:]
dt_days = np.unique(dt.dt.days)
dt_seconds = np.unique(dt.dt.seconds)
if len(dt_seconds) == 1:
dt = np.timedelta64(
np.int64(1e9 * (dt_seconds[0] + dt_days.min() * 86400)))
# Otherwise, raise error
else:
raise Exception(
"Trying to deduce periods from midpoints with "
"non-constant time interval")
if time_midpoint:
ds[time_varname] = (
time_varname, ds[time_varname].values - 0.5 * dt)
else:
ds[time_varname] = (
time_varname, ds[time_varname].values - dt)
return ds
[docs]
def decode_datetimes(
ds: xr.Dataset,
tracer,
var_name: str,
ref_date,
file_freq: str = "",
is_climatology: bool = False
) -> xr.Dataset:
"""Decode datetimes from variable 'var_name' in the dataset 'ds'
"""
# If dates were already properly decoded, just skip
if isinstance(ds[var_name].values.flatten()[0], cftime.datetime):
ds[var_name] = ds[var_name].astype('datetime64[ns]')
return ds
if hasattr(tracer, "time_unit"):
try:
time_units = ref_date.strftime(tracer.time_unit)
if 'units' in ds[var_name].attrs:
warning(
f"Replacing time units '{ds[var_name].attrs['units']}'"
f" in file '{ds.encoding['source']}' with '{time_units}"
)
ds[var_name].attrs['units'] = time_units
ds = xr.decode_cf(ds, decode_times=True)
# If did not work, force decoding
if ds[var_name].dtype.kind != np.dtype('datetime64').kind:
raise Exception
except Exception:
ds = decode_datetimes_with_units(ds, var_name, time_units)
elif hasattr(tracer, "time_format"):
ds = decode_datetimes_with_format(ds, var_name, tracer.time_format)
# Impose var_freq if specified
if getattr(tracer, "var_freq", None) is not None:
# Check that time coord is consistent with var_freq
if ds[var_name].dtype.kind == np.dtype('datetime64').kind and not tracer.force_var_freq:
pass
else:
# Check that frequency is compatible with time lenght if is_climatology
if is_climatology:
time_length = ds[var_name].values.shape[0]
climato_length = pd.date_range(
"20000101", "20010101", freq=tracer.var_freq).size
if time_length > climato_length + 1:
raise Exception(
"Trying to process time variable for a climatology, "
f"but time dimension in file is not compatible (len = {time_length}) with "
f"expected time length with var_freq (var_freq={tracer.var_freq} -> time_length={climato_length}).\n"
"This can happen when you think you are processing a climatological file while the variable is time dependent."
)
# Manage if file_freq is empty
if file_freq == "":
raise Exception(
"Trying to interpret dates from a file while no file_freq was specified.\n"
"This should not happen. Please contact support."
)
# First take floor of ref_date according to file_freq
delta_ref = pd.date_range(
ref_date, freq=file_freq, periods=2).to_series().diff()[1]
start_date = pd.date_range(ref_date - 2 * delta_ref,
ref_date + 2 * delta_ref,
freq=file_freq)
start_date = start_date[start_date <= ref_date][-1]
# Set time to midnight if not hourly frequency
if "H" in file_freq:
try:
if int(file_freq[:-1]) % 24 == 0:
start_date = start_date.normalize()
except:
pass
else:
start_date = start_date.normalize()
ds[var_name] = pd.date_range(
start_date, freq=tracer.var_freq, periods=len(ds[var_name])
)
# Raise an exception if time coordinate is still not datetime-like
if ds[var_name].dtype.kind != np.dtype('datetime64').kind:
raise ValueError(
"Could not decode time coordinate in file "
f"'{ds.encoding['source']}'\n"
f"Please check the time units and/or calendar in your files.\n"
f"Consider adding a 'time_unit' manually in your yml.")
return ds
[docs]
def decode_datetimes_with_units(
ds: xr.Dataset,
var_name: str,
units: str
) -> xr.Dataset:
"""Decode datetimes from variable 'var_name' in the dataset 'ds' using
its 'units' attribute
"""
time = ds[var_name].values
units_elements = units.split(' ')
# TODO: change tin 'in' by something more specifiic
if "years" in units:
new_time = [pd.to_datetime(f"{year}-01-01") for year in time]
elif units_elements[1] == "since":
reference_date = np.datetime64(units_elements[2])
# units == 'months since {reference_date}'
if units_elements[0] == "months":
months = time.astype('int')
new_time = pd.date_range(
reference_date, periods=np.max(months) + 1, freq='MS')[months]
# units == 'years since {reference_date}'
elif units_elements[0] == "years":
years = np.floor(time)
leap_years = pd.to_datetime(years, format='%Y').is_leap_year
delta_years = np.array(
years, dtype='timedelta64[Y]').astype('timedelta64[D]')
delta_days = np.array((time % 1) *
(365 + leap_years), dtype='timedelta64[D]')
new_time = reference_date + \
delta_years.astype('timedelta64[D]') + delta_days
else:
raise ValueError(
f"unknown unit '{units_elements[0]}' in '{units}'")
# units == 'as {date_format}' with date_format = '%Y-%m' for example
elif units_elements[1] == "as":
return decode_datetimes_with_format(ds, var_name, units_elements[2])
else:
raise ValueError(f"unknown time units '{units}'")
ds[var_name] = (ds[var_name].dims, new_time)
return ds
[docs]
def shift_years(
ds: xr.Dataset,
var_name: str,
year_offset: int
) -> xr.Dataset:
"""Offset years in files if necessary then converts to standard calendar
"""
if year_offset == 0:
return ds
year_offset = pd.offsets.DateOffset(years=year_offset)
# Shifting years
offseted_time = np.array(pd.to_datetime(ds[var_name].values) + year_offset)
ds[var_name] = (ds[var_name].dims, offseted_time)
# Dropping duplicated days (happens when shifting from a leap to non-leap year)
ds = ds.drop_duplicates(dim=ds[var_name].dims[0])
return ds
[docs]
def convert_calendar(
ds: xr.Dataset,
tracer,
var_name: str,
calendar: str,
) -> xr.Dataset:
"""Converts to standard calendar
"""
if calendar in ["noleap", "365_day"]:
# var_freq has to be '1D' (one day) in this case
if hasattr(tracer, 'var_freq') \
and tracer.var_freq != '1D' \
and "H" not in tracer.var_freq:
raise IllegalArgumentError(
f"'var_freq' argument is set to '{tracer.var_freq}' while "
f"the time coordinate calendar is '{calendar}'. "
"Please set the 'var_freq' argument to '1D' or a multiple of 'H' in your YAML "
"configuration file. Other frequencies do not work."
)
ds = expand_leap_years(ds, tracer, var_name)
return ds
[docs]
def expand_leap_years(ds: xr.Dataset, tracer, var_name: str) -> xr.Dataset:
time = pd.to_datetime(ds[var_name].values)
# # Skipping if any February 29th is already present in time coordinate
# if np.any((time.month == 2) & (time.day == 29)):
# return ds
# Get leap year's February 28th index
index, = np.where((time.is_leap_year) &
(time.month == 2) &
(time.day == 28))
# Skipping if no February 28th in time coordinate
if len(index) == 0:
return ds
has_period = hasattr(tracer, 'period_varname')
if has_period:
period_varname = tracer.period_varname
period = ds[period_varname].values
if isinstance(period[0, 0], cftime.datetime):
period = period.astype('datetime64[ns]')
period_start = pd.to_datetime(period[:, 0])
period_stop = pd.to_datetime(period[:, 1])
# New index and dimensions
new_index = []
new_time = [] # Time coordinate
new_time_pd = []
if has_period:
new_period_start = [] # Period variable
new_period_stop = [] # Period variable
# Loop over years to extend
list_years = np.unique(time.year)
for yyyy in list_years:
mask_year, = np.where(time.year == yyyy)
time_year = time[mask_year]
if has_period:
period_year_start = period_start[mask_year]
period_year_stop = period_stop[mask_year]
# Extend target index with all values from this year
new_index.append(mask_year)
new_time.append(time_year.to_numpy())
if has_period:
new_period_start.append(period_year_start)
new_period_stop.append(period_year_stop)
# Skip if not a leap year
if yyyy % 4 != 0:
continue
# Get values for the 29th and 28th of February
mask_28, = np.where(
(time_year.month == 2)
& (time_year.day == 28))
mask_29, = np.where(
(time_year.month == 2)
& (time_year.day == 29))
# If as many values for 29th and 28th, can skip this year
if mask_28.size == mask_29.size:
continue
# Otherwise, remove values for the 29th
index_year = np.arange(len(time_year))
mask_no29 = ~np.isin(index_year, mask_29)
index_year = index_year[mask_no29]
time_year = time_year[mask_no29]
if has_period:
period_year_start = period_year_start[mask_no29]
period_year_stop = period_year_stop[mask_no29]
# Now replace values by those of 28th of February
index_year = np.concatenate([
index_year[:mask_28.max() + 1],
mask_28,
index_year[mask_28.max() + 1:]
])
time_year = np.concatenate([
time_year[:mask_28.max() + 1],
(time_year[mask_28] + pd.offsets.Day()).to_numpy(),
time_year[mask_28.max() + 1:]
])
if has_period:
period_year_start = np.concatenate([
period_year_start[:mask_28.max() + 1],
(period_year_start[mask_28] + pd.offsets.Day()),
period_year_start[mask_28.max() + 1:]
])
period_year_stop = np.concatenate([
period_year_stop[:mask_28.max() + 1],
(period_year_stop[mask_28] + pd.offsets.Day()),
period_year_stop[mask_28.max() + 1:]
])
# Now replace the year values for the index
new_index[-1] = mask_year[index_year]
new_time[-1] = time_year
if has_period:
new_period_start[-1] = period_year_start
new_period_stop[-1] = period_year_stop
# Concatenate years
new_index = np.concatenate(new_index)
new_time = np.concatenate(new_time)
if has_period:
new_period_start = np.concatenate(new_period_start)
new_period_stop = np.concatenate(new_period_stop)
# Now generate the output datastore
ds = ds.isel({var_name: new_index})
ds[var_name] = ([var_name], new_time)
if has_period:
new_period = np.concatenate([new_period_start[:, np.newaxis],
new_period_stop[:, np.newaxis]], axis=1)
ds[period_varname] = (ds[period_varname].dims, new_period)
return ds