Source code for pycif.plugins.datastreams.fluxes.becker_ocean.fetch
import datetime
import glob
import os
import pandas as pd
import xarray as xr
from xarray import SerializationWarning
import numpy as np
import warnings
from .....utils import path
from logging import debug
[docs]
def fetch(ref_dir, ref_file, date_interval, target_dir,
tracer=None, component=None, **kwargs):
"""
Fetch files and dates for Becker coastal fluxes
Args:
ref_dir (str): the path to the input files
ref_file (str): format of the input files
input_interval (list): simulation interval (start and end dates)
target_dir (str): where to copy
tracer: the tracer Plugin, corresponding to the paragraph
:bash:`datavect/components/fluxes/parameters/my_species` in the
configuration yaml; can be needed to fetch extra information
given by the user
component: the component Plugin, same as tracer; corresponds to the paragraph
:bash:`datavect/components/fluxes` in the configuration yaml
Return:
(dict, dict): returns two dictionaries: list_files and list_dates
list_files: for each date that begins a period, a list containing
the names of the files that are available for the dates within this period
list_dates: for each date that begins a period, a list containing
the date intervals (in the form of a list of two dates each)
matching the files listed in list_files
"""
# Reshape input interval to include full years
datei, datef = date_interval
datei = datetime.datetime(year=datei.year, month=1, day=1)
datef = datetime.datetime(year=datef.year + 1, month=1, day=1)
list_period_dates = pd.date_range(datei, datef, freq=tracer.file_freq)
list_dates = {}
list_files = {}
valid_files = []
for dd in list_period_dates:
file = dd.strftime("{}/{}".format(ref_dir, ref_file))
if not os.path.isfile(file) or file in valid_files:
continue
# Load times (Ignore warnings due to formating issue)
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=SerializationWarning)
times = xr.open_dataset(file)["time"].values[:, np.newaxis]
freq = np.unique(np.diff(times.flatten()))
out_dates = np.concatenate([times, times + freq[0]],
axis=1)
list_dates[dd] = [list(d) for d in out_dates]
list_files[dd] = len(times) * [file]
# Fetching
target_file = "{}/{}".format(target_dir, os.path.basename(file))
path.link(file, target_file)
valid_files.append(file)
return list_files, list_dates