import os
from logging import debug, info
import numpy as np
import pandas as pd
import xarray as xr
from .....utils import path
from .....utils.dates import date_range
from .time_coord import (
convert_calendar, find_time_coord, get_calendar,
preprocess_time_coord, shift_years,
decode_datetimes
)
from .utils import get_time_intervals, get_year_offset
from .....utils.check.errclass import IllegalArgumentError
[docs]
def fetch(ref_dir, ref_file, input_interval, target_dir,
tracer=None, component=None, **kwargs):
"""Fetch files and corresponding dates in Gridded NetCDF files
Args:
ref_dir (str): Path to the data
ref_file (str): File format of the data
input_interval (list[datetime.datetime]): Date range
target_dir (str): Where to link the data
tracer (_type_, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
date_i, date_f = input_interval
file_freq = tracer.file_freq
# Checking for illegal argument combinations
if hasattr(tracer, 'time_unit') and hasattr(tracer, 'time_format'):
raise IllegalArgumentError(
"'time_unit' and 'time_format' arguments cannot be used together")
if file_freq and tracer.is_climatology:
raise IllegalArgumentError(
"'file_freq' argument cannot be used when "
"'is_climatology' argument is set to True"
)
if tracer.add_time_coord and tracer.is_climatology:
raise IllegalArgumentError(
"'add_time_coord' argument cannot be used when "
"'is_climatology' argument is set to True"
)
if tracer.time_midpoint and tracer.time_endpoint:
raise IllegalArgumentError(
"'time_midpoint' and 'endpoint' are set to True, "
"while only one of them can"
)
# Getting files dates and paths
if tracer.is_climatology:
file_dates = [date_i]
file_paths = [os.path.join(ref_dir, ref_file)]
file_freq = "1YS"
elif file_freq:
# Getting file dates
file_dates = pd.date_range(
date_i, date_f, freq=file_freq, inclusive='left')
if file_dates.empty:
file_dates = pd.to_datetime([date_i])
if file_dates[0] > date_i:
file_dates = pd.to_datetime([date_i] + file_dates.to_list())
# Getting files paths
file_paths = np.array([os.path.join(ref_dir, date.strftime(ref_file))
for date in file_dates])
# Removing duplicates
if tracer.remove_duplicates:
_, index = np.unique(file_paths, return_index=True)
index.sort()
file_dates = file_dates[index]
file_paths = file_paths[index]
else:
raise IllegalArgumentError("'file_freq' argument is required when "
"'is_climatology' argument is set to False")
debug(f"file_dates={file_dates}")
debug(f"file_paths={file_paths}")
list_dates = {}
list_files = {}
for date, f_path in zip(file_dates, file_paths):
# Raise Exception if file does not exist
if not os.path.isfile(f_path):
raise Exception(
f"Trying to fetch the file {f_path} that is not available."
)
# Linking file to the working directory
target_file_path = os.path.join(target_dir, os.path.basename(f_path))
path.link(f_path, target_file_path)
# Decode times or not for very badly formatted files...
try:
with xr.open_dataset(target_file_path) as ds:
decode_times = True
except ValueError:
decode_times = False
# Filter MissingDimensionErrors
try:
ds = xr.open_dataset(
target_file_path,
decode_times=decode_times,
drop_variables=tracer.drop_variables
)
except xr.core.variable.MissingDimensionsError as f:
info(f)
raise Exception(
f"Could not open {target_file_path} with xarray. "
"This happens when one dimension variable has multiple dimensions, "
"with a name common with one of its own dimensions. "
"Please consider reformatting your NetCDF properly. "
"It is possible to use the argument `drop_variables` in your yaml "
"to avoid reading the erroneous variables. \n"
"Please check the error message above to find out which variable is the problem."
)
# Getting time periods in the file
with xr.open_dataset(
target_file_path,
decode_times=decode_times,
drop_variables=tracer.drop_variables
) as ds:
ds, time_varname = find_time_coord(ds, tracer, date)
# Dropping all variables other than the time coordinate to
# avoid useless computations in 'fetch'
var_to_keep = [time_varname]
if hasattr(tracer, 'period_varname'):
var_to_keep.append(tracer.period_varname)
ds = ds[var_to_keep]
# Getting calendar before decoding (otherwhise it will be lost)
ds = preprocess_time_coord(
ds, tracer, time_varname, date,
time_midpoint=tracer.time_midpoint,
time_endpoint=tracer.time_endpoint,
file_freq=tracer.file_freq,
is_climatology=tracer.is_climatology)
calendar = getattr(
tracer, "time_calendar",
ds[time_varname].dt.calendar
)
# Decoding periods if needed
if hasattr(tracer, 'period_varname'):
ds = decode_datetimes(
ds, tracer, tracer.period_varname, date,
file_freq=tracer.file_freq)
# convert from pandas datetime to python built-in datetime
if isinstance(date, pd.Timestamp):
date = date.to_pydatetime()
# Handling climatology data if necessary
if tracer.is_climatology:
years_in_file = np.unique(ds[time_varname].dt.year.values)
if len(years_in_file) > 1:
raise ValueError("multiple years found in 'time' coordinate "
f"'{time_varname}' in file '{target_file_path}'")
years = pd.date_range(date_i, date_f, freq='1YS', inclusive='left')
if years.empty:
years = pd.to_datetime([date_i])
if years[0].year > date_i.year:
years = pd.to_datetime([date_i] + years.to_list())
year_offsets = years.year - years_in_file[0]
# Climatology fixed year to multiple years
ds_ref = ds
list_dates[date] = []
for yo in year_offsets:
# using a copy in case convert calendar modifies the dates
ds = ds_ref.copy()
ds = shift_years(ds, time_varname, yo)
# 'convert_calendar' must be call after 'shift_years',
# otherwise leap years will be incorrect
ds = convert_calendar(ds, tracer, time_varname, calendar)
time_intervals = get_time_intervals(
ds, tracer, time_varname, date_i, date_f, calendar,
)
# Fill output dictionary
list_dates[date].extend(time_intervals)
else:
# Shift year to match simulation period
# (usefull when using a fixed file)
year_offset = get_year_offset(ds, time_varname, date.year)
ds = shift_years(ds, time_varname, year_offset)
# 'convert_calendar' must be call after 'shift_years',
# otherwhise leap years will be incorrect
ds = convert_calendar(ds, tracer, time_varname, calendar)
# Fill output dictionary
list_dates[date] = get_time_intervals(
ds, tracer, time_varname, date_i, date_f, calendar,
time_midpoint=tracer.time_midpoint,
time_endpoint=tracer.time_endpoint
)
list_files[date] = len(list_dates[date]) * [target_file_path]
# Print results for debugging
max_lines = 20
msg = "Gridded NetDF surface flux fetch output:\n"
for i, date in enumerate(list_dates):
if max_lines / 2 <= i < len(list_dates) - max_lines / 2:
if not tracer.full_log:
if i == max_lines / 2:
msg += "...\n"
continue
msg += f"- {date.isoformat()}:\n"
for (di, df), f_path in zip(list_dates[date], list_files[date]):
msg += f" - [{di.isoformat()}, {df.isoformat()}] -> {f_path}\n"
debug(msg)
return list_files, list_dates