Source code for pycif.plugins.datastreams.fluxes.gridded_NetCDF.fetch

import os
from logging import debug, info

import numpy as np
import pandas as pd
import xarray as xr

from .....utils import path
from .....utils.dates import date_range
from .time_coord import (
    convert_calendar, find_time_coord, get_calendar,
    preprocess_time_coord, shift_years,
    decode_datetimes
)
from .utils import get_time_intervals, get_year_offset

from .....utils.check.errclass import IllegalArgumentError


[docs] def fetch(ref_dir, ref_file, input_interval, target_dir, tracer=None, component=None, **kwargs): """Fetch files and corresponding dates in Gridded NetCDF files Args: ref_dir (str): Path to the data ref_file (str): File format of the data input_interval (list[datetime.datetime]): Date range target_dir (str): Where to link the data tracer (_type_, optional): _description_. Defaults to None. Returns: _type_: _description_ """ date_i, date_f = input_interval file_freq = tracer.file_freq # Checking for illegal argument combinations if hasattr(tracer, 'time_unit') and hasattr(tracer, 'time_format'): raise IllegalArgumentError( "'time_unit' and 'time_format' arguments cannot be used together") if file_freq and tracer.is_climatology: raise IllegalArgumentError( "'file_freq' argument cannot be used when " "'is_climatology' argument is set to True" ) if tracer.add_time_coord and tracer.is_climatology: raise IllegalArgumentError( "'add_time_coord' argument cannot be used when " "'is_climatology' argument is set to True" ) if tracer.time_midpoint and tracer.time_endpoint: raise IllegalArgumentError( "'time_midpoint' and 'endpoint' are set to True, " "while only one of them can" ) # Getting files dates and paths if tracer.is_climatology: file_dates = [date_i] file_paths = [os.path.join(ref_dir, ref_file)] file_freq = "1YS" elif file_freq: # Getting file dates file_dates = pd.date_range( date_i, date_f, freq=file_freq, inclusive='left') if file_dates.empty: file_dates = pd.to_datetime([date_i]) if file_dates[0] > date_i: file_dates = pd.to_datetime([date_i] + file_dates.to_list()) # Getting files paths file_paths = np.array([os.path.join(ref_dir, date.strftime(ref_file)) for date in file_dates]) # Removing duplicates if tracer.remove_duplicates: _, index = np.unique(file_paths, return_index=True) index.sort() file_dates = file_dates[index] file_paths = file_paths[index] else: raise IllegalArgumentError("'file_freq' argument is required when " "'is_climatology' argument is set to False") debug(f"file_dates={file_dates}") debug(f"file_paths={file_paths}") list_dates = {} list_files = {} for date, f_path in zip(file_dates, file_paths): # Raise Exception if file does not exist if not os.path.isfile(f_path): raise Exception( f"Trying to fetch the file {f_path} that is not available." ) # Linking file to the working directory target_file_path = os.path.join(target_dir, os.path.basename(f_path)) path.link(f_path, target_file_path) # Decode times or not for very badly formatted files... try: with xr.open_dataset(target_file_path) as ds: decode_times = True except ValueError: decode_times = False # Filter MissingDimensionErrors try: ds = xr.open_dataset( target_file_path, decode_times=decode_times, drop_variables=tracer.drop_variables ) except xr.core.variable.MissingDimensionsError as f: info(f) raise Exception( f"Could not open {target_file_path} with xarray. " "This happens when one dimension variable has multiple dimensions, " "with a name common with one of its own dimensions. " "Please consider reformatting your NetCDF properly. " "It is possible to use the argument `drop_variables` in your yaml " "to avoid reading the erroneous variables. \n" "Please check the error message above to find out which variable is the problem." ) # Getting time periods in the file with xr.open_dataset( target_file_path, decode_times=decode_times, drop_variables=tracer.drop_variables ) as ds: ds, time_varname = find_time_coord(ds, tracer, date) # Dropping all variables other than the time coordinate to # avoid useless computations in 'fetch' var_to_keep = [time_varname] if hasattr(tracer, 'period_varname'): var_to_keep.append(tracer.period_varname) ds = ds[var_to_keep] # Getting calendar before decoding (otherwhise it will be lost) ds = preprocess_time_coord( ds, tracer, time_varname, date, time_midpoint=tracer.time_midpoint, time_endpoint=tracer.time_endpoint, file_freq=tracer.file_freq, is_climatology=tracer.is_climatology) calendar = getattr( tracer, "time_calendar", ds[time_varname].dt.calendar ) # Decoding periods if needed if hasattr(tracer, 'period_varname'): ds = decode_datetimes( ds, tracer, tracer.period_varname, date, file_freq=tracer.file_freq) # convert from pandas datetime to python built-in datetime if isinstance(date, pd.Timestamp): date = date.to_pydatetime() # Handling climatology data if necessary if tracer.is_climatology: years_in_file = np.unique(ds[time_varname].dt.year.values) if len(years_in_file) > 1: raise ValueError("multiple years found in 'time' coordinate " f"'{time_varname}' in file '{target_file_path}'") years = pd.date_range(date_i, date_f, freq='1YS', inclusive='left') if years.empty: years = pd.to_datetime([date_i]) if years[0].year > date_i.year: years = pd.to_datetime([date_i] + years.to_list()) year_offsets = years.year - years_in_file[0] # Climatology fixed year to multiple years ds_ref = ds list_dates[date] = [] for yo in year_offsets: # using a copy in case convert calendar modifies the dates ds = ds_ref.copy() ds = shift_years(ds, time_varname, yo) # 'convert_calendar' must be call after 'shift_years', # otherwise leap years will be incorrect ds = convert_calendar(ds, tracer, time_varname, calendar) time_intervals = get_time_intervals( ds, tracer, time_varname, date_i, date_f, calendar, ) # Fill output dictionary list_dates[date].extend(time_intervals) else: # Shift year to match simulation period # (usefull when using a fixed file) year_offset = get_year_offset(ds, time_varname, date.year) ds = shift_years(ds, time_varname, year_offset) # 'convert_calendar' must be call after 'shift_years', # otherwhise leap years will be incorrect ds = convert_calendar(ds, tracer, time_varname, calendar) # Fill output dictionary list_dates[date] = get_time_intervals( ds, tracer, time_varname, date_i, date_f, calendar, time_midpoint=tracer.time_midpoint, time_endpoint=tracer.time_endpoint ) list_files[date] = len(list_dates[date]) * [target_file_path] # Print results for debugging max_lines = 20 msg = "Gridded NetDF surface flux fetch output:\n" for i, date in enumerate(list_dates): if max_lines / 2 <= i < len(list_dates) - max_lines / 2: if not tracer.full_log: if i == max_lines / 2: msg += "...\n" continue msg += f"- {date.isoformat()}:\n" for (di, df), f_path in zip(list_dates[date], list_files[date]): msg += f" - [{di.isoformat()}, {df.isoformat()}] -> {f_path}\n" debug(msg) return list_files, list_dates