Source code for pycif.plugins.datastreams.fluxes.gridded_NetCDF.time_coord

from logging import warning

import cftime
import numpy as np
import pandas as pd
import xarray as xr

from .....utils.check.errclass import IllegalArgumentError
from ....domains.gridded_NetCDF.utils import find_coord

OFFSET = pd.offsets.Nano(1)  # One nanosecond time offest


[docs] def find_time_coord(ds: xr.Dataset, tracer, ref_date): if tracer.add_time_coord: # Impose var_freq if specified var_freq = getattr(tracer, "var_freq", None) # time dimension may be missing, in this case, add it if tracer.time_dimname not in ds.dims: ds = ds.expand_dims(tracer.time_dimname) if var_freq is None: # Adding 'time' coordinate with a single element time = [ref_date] if ds.sizes[tracer.time_dimname] != 1: raise ValueError( f"time dimension '{tracer.time_dimname}' size > 1. " "When using the 'add_time_coord' argument, for files with a " "time dimension of size greater than 1, please also use the " "'var_freq' argument." ) else: time = pd.date_range(ref_date, periods=ds.sizes[tracer.time_dimname], freq=var_freq) if tracer.time_dimname != "time": ds = ds.rename_dims({tracer.time_dimname: "time"}) ds = ds.assign_coords({"time": time}) return ds, "time" else: time_varname = getattr(tracer, "time_varname", None) if time_varname is None: try: time_varname = find_coord(ds, "time", "time") except ValueError as e: raise ValueError( "Could not find the coordinate `time` (see Exception message above)\n" "This can be fixed by explicitly specifying the time variable name " "using `time_varname`, or by adding a time_coord and a var_freq" ) from e return ds, time_varname
[docs] def get_calendar(ds: xr.Dataset, time_varname: str): if 'calendar' in ds[time_varname].attrs: calendar = ds[time_varname].attrs.get('calendar') else: calendar = getattr(ds[time_varname].values[0], 'calendar', None) return calendar
[docs] def preprocess_time_coord( ds: xr.Dataset, tracer, time_varname: str, ref_date, time_midpoint: bool = False, time_endpoint: bool = False, file_freq: str = "", is_climatology: bool = False ) -> xr.Dataset: """Preprocesses the time coordinate """ time_dimname, = ds[time_varname].dims # Ensuring that time's coordinate and dimension have the # same name (necessary for slicing below) if time_dimname != time_varname: ds = ds.rename({time_dimname: time_varname}) ds = ds.assign_coords({time_varname: ds[time_varname]}) # Decode datetimes with time units if necessary if ds[time_varname].dtype.kind != np.dtype('datetime64').kind or tracer.force_var_freq: ds = decode_datetimes(ds, tracer, time_varname, ref_date, file_freq=file_freq, is_climatology=is_climatology) # Shift dates if needed if time_midpoint or time_endpoint: dt = np.unique(np.diff(ds[time_varname].values)) # Shifting dates can be applied only with uniform delta if len(dt) != 1: # Non-uniform deltas can happen for leap-years dt = pd.DatetimeIndex(ds[time_varname]).to_series().diff().iloc[1:] dt_days = np.unique(dt.dt.days) dt_seconds = np.unique(dt.dt.seconds) if len(dt_seconds) == 1: dt = np.timedelta64( np.int64(1e9 * (dt_seconds[0] + dt_days.min() * 86400))) # Otherwise, raise error else: raise Exception( "Trying to deduce periods from midpoints with " "non-constant time interval") if time_midpoint: ds[time_varname] = ( time_varname, ds[time_varname].values - 0.5 * dt) else: ds[time_varname] = ( time_varname, ds[time_varname].values - dt) return ds
[docs] def decode_datetimes( ds: xr.Dataset, tracer, var_name: str, ref_date, file_freq: str = "", is_climatology: bool = False ) -> xr.Dataset: """Decode datetimes from variable 'var_name' in the dataset 'ds' """ # If dates were already properly decoded, just skip if isinstance(ds[var_name].values.flatten()[0], cftime.datetime): ds[var_name] = ds[var_name].astype('datetime64[ns]') return ds if hasattr(tracer, "time_unit"): try: time_units = ref_date.strftime(tracer.time_unit) if 'units' in ds[var_name].attrs: warning( f"Replacing time units '{ds[var_name].attrs['units']}'" f" in file '{ds.encoding['source']}' with '{time_units}" ) ds[var_name].attrs['units'] = time_units ds = xr.decode_cf(ds, decode_times=True) # If did not work, force decoding if ds[var_name].dtype.kind != np.dtype('datetime64').kind: raise Exception except Exception: ds = decode_datetimes_with_units(ds, var_name, time_units) elif hasattr(tracer, "time_format"): ds = decode_datetimes_with_format(ds, var_name, tracer.time_format) # Impose var_freq if specified if getattr(tracer, "var_freq", None) is not None: # Check that time coord is consistent with var_freq if ds[var_name].dtype.kind == np.dtype('datetime64').kind and not tracer.force_var_freq: pass else: # Check that frequency is compatible with time lenght if is_climatology if is_climatology: time_length = ds[var_name].values.shape[0] climato_length = pd.date_range( "20000101", "20010101", freq=tracer.var_freq).size if time_length > climato_length + 1: raise Exception( "Trying to process time variable for a climatology, " f"but time dimension in file is not compatible (len = {time_length}) with " f"expected time length with var_freq (var_freq={tracer.var_freq} -> time_length={climato_length}).\n" "This can happen when you think you are processing a climatological file while the variable is time dependent." ) # Manage if file_freq is empty if file_freq == "": raise Exception( "Trying to interpret dates from a file while no file_freq was specified.\n" "This should not happen. Please contact support." ) # First take floor of ref_date according to file_freq delta_ref = pd.date_range( ref_date, freq=file_freq, periods=2).to_series().diff()[1] start_date = pd.date_range(ref_date - 2 * delta_ref, ref_date + 2 * delta_ref, freq=file_freq) start_date = start_date[start_date <= ref_date][-1] # Set time to midnight if not hourly frequency if "H" in file_freq: try: if int(file_freq[:-1]) % 24 == 0: start_date = start_date.normalize() except: pass else: start_date = start_date.normalize() ds[var_name] = pd.date_range( start_date, freq=tracer.var_freq, periods=len(ds[var_name]) ) # Raise an exception if time coordinate is still not datetime-like if ds[var_name].dtype.kind != np.dtype('datetime64').kind: raise ValueError( "Could not decode time coordinate in file " f"'{ds.encoding['source']}'\n" f"Please check the time units and/or calendar in your files.\n" f"Consider adding a 'time_unit' manually in your yml.") return ds
[docs] def decode_datetimes_with_units( ds: xr.Dataset, var_name: str, units: str ) -> xr.Dataset: """Decode datetimes from variable 'var_name' in the dataset 'ds' using its 'units' attribute """ time = ds[var_name].values units_elements = units.split(' ') # TODO: change tin 'in' by something more specifiic if "years" in units: new_time = [pd.to_datetime(f"{year}-01-01") for year in time] elif units_elements[1] == "since": reference_date = np.datetime64(units_elements[2]) # units == 'months since {reference_date}' if units_elements[0] == "months": months = time.astype('int') new_time = pd.date_range( reference_date, periods=np.max(months) + 1, freq='MS')[months] # units == 'years since {reference_date}' elif units_elements[0] == "years": years = np.floor(time) leap_years = pd.to_datetime(years, format='%Y').is_leap_year delta_years = np.array( years, dtype='timedelta64[Y]').astype('timedelta64[D]') delta_days = np.array((time % 1) * (365 + leap_years), dtype='timedelta64[D]') new_time = reference_date + \ delta_years.astype('timedelta64[D]') + delta_days else: raise ValueError( f"unknown unit '{units_elements[0]}' in '{units}'") # units == 'as {date_format}' with date_format = '%Y-%m' for example elif units_elements[1] == "as": return decode_datetimes_with_format(ds, var_name, units_elements[2]) else: raise ValueError(f"unknown time units '{units}'") ds[var_name] = (ds[var_name].dims, new_time) return ds
[docs] def decode_datetimes_with_format( ds: xr.Dataset, var_name: str, time_format: str ) -> xr.Dataset: """Decode datetimes from variable 'var_name' in the dataset 'ds' using a time format string """ new_time = pd.to_datetime( ds[var_name].values.astype(str), format=time_format) ds[var_name] = (ds[var_name].dims, new_time) return ds
[docs] def shift_years( ds: xr.Dataset, var_name: str, year_offset: int ) -> xr.Dataset: """Offset years in files if necessary then converts to standard calendar """ if year_offset == 0: return ds year_offset = pd.offsets.DateOffset(years=year_offset) # Shifting years offseted_time = np.array(pd.to_datetime(ds[var_name].values) + year_offset) ds[var_name] = (ds[var_name].dims, offseted_time) # Dropping duplicated days (happens when shifting from a leap to non-leap year) ds = ds.drop_duplicates(dim=ds[var_name].dims[0]) return ds
[docs] def convert_calendar( ds: xr.Dataset, tracer, var_name: str, calendar: str, ) -> xr.Dataset: """Converts to standard calendar """ if calendar in ["noleap", "365_day"]: # var_freq has to be '1D' (one day) in this case if hasattr(tracer, 'var_freq') \ and tracer.var_freq != '1D' \ and "H" not in tracer.var_freq: raise IllegalArgumentError( f"'var_freq' argument is set to '{tracer.var_freq}' while " f"the time coordinate calendar is '{calendar}'. " "Please set the 'var_freq' argument to '1D' or a multiple of 'H' in your YAML " "configuration file. Other frequencies do not work." ) ds = expand_leap_years(ds, tracer, var_name) return ds
[docs] def expand_leap_years(ds: xr.Dataset, tracer, var_name: str) -> xr.Dataset: time = pd.to_datetime(ds[var_name].values) # # Skipping if any February 29th is already present in time coordinate # if np.any((time.month == 2) & (time.day == 29)): # return ds # Get leap year's February 28th index index, = np.where((time.is_leap_year) & (time.month == 2) & (time.day == 28)) # Skipping if no February 28th in time coordinate if len(index) == 0: return ds has_period = hasattr(tracer, 'period_varname') if has_period: period_varname = tracer.period_varname period = ds[period_varname].values if isinstance(period[0, 0], cftime.datetime): period = period.astype('datetime64[ns]') period_start = pd.to_datetime(period[:, 0]) period_stop = pd.to_datetime(period[:, 1]) # New index and dimensions new_index = [] new_time = [] # Time coordinate new_time_pd = [] if has_period: new_period_start = [] # Period variable new_period_stop = [] # Period variable # Loop over years to extend list_years = np.unique(time.year) for yyyy in list_years: mask_year, = np.where(time.year == yyyy) time_year = time[mask_year] if has_period: period_year_start = period_start[mask_year] period_year_stop = period_stop[mask_year] # Extend target index with all values from this year new_index.append(mask_year) new_time.append(time_year.to_numpy()) if has_period: new_period_start.append(period_year_start) new_period_stop.append(period_year_stop) # Skip if not a leap year if yyyy % 4 != 0: continue # Get values for the 29th and 28th of February mask_28, = np.where( (time_year.month == 2) & (time_year.day == 28)) mask_29, = np.where( (time_year.month == 2) & (time_year.day == 29)) # If as many values for 29th and 28th, can skip this year if mask_28.size == mask_29.size: continue # Otherwise, remove values for the 29th index_year = np.arange(len(time_year)) mask_no29 = ~np.isin(index_year, mask_29) index_year = index_year[mask_no29] time_year = time_year[mask_no29] if has_period: period_year_start = period_year_start[mask_no29] period_year_stop = period_year_stop[mask_no29] # Now replace values by those of 28th of February index_year = np.concatenate([ index_year[:mask_28.max() + 1], mask_28, index_year[mask_28.max() + 1:] ]) time_year = np.concatenate([ time_year[:mask_28.max() + 1], (time_year[mask_28] + pd.offsets.Day()).to_numpy(), time_year[mask_28.max() + 1:] ]) if has_period: period_year_start = np.concatenate([ period_year_start[:mask_28.max() + 1], (period_year_start[mask_28] + pd.offsets.Day()), period_year_start[mask_28.max() + 1:] ]) period_year_stop = np.concatenate([ period_year_stop[:mask_28.max() + 1], (period_year_stop[mask_28] + pd.offsets.Day()), period_year_stop[mask_28.max() + 1:] ]) # Now replace the year values for the index new_index[-1] = mask_year[index_year] new_time[-1] = time_year if has_period: new_period_start[-1] = period_year_start new_period_stop[-1] = period_year_stop # Concatenate years new_index = np.concatenate(new_index) new_time = np.concatenate(new_time) if has_period: new_period_start = np.concatenate(new_period_start) new_period_stop = np.concatenate(new_period_stop) # Now generate the output datastore ds = ds.isel({var_name: new_index}) ds[var_name] = ([var_name], new_time) if has_period: new_period = np.concatenate([new_period_start[:, np.newaxis], new_period_stop[:, np.newaxis]], axis=1) ds[period_varname] = (ds[period_varname].dims, new_period) return ds