Source code for pycif.plugins.datastreams.fluxes.GCP_1x1.fetch

import os

import pandas as pd
import xarray as xr

from .....utils import path
from .....utils.dates import date_range


[docs] def fetch( ref_dir, ref_file, input_interval, target_dir, tracer=None, component=None, **kwargs ): """ Fetch files and dates for GCP. Args: ref_dir (str): the path to the input files ref_file (str): format of the input files input_interval (list): simulation interval (start and end dates) target_dir (str): where to copy tracer: the tracer Plugin, corresponding to the paragraph :bash:`datavect/components/fluxes/parameters/my_species` in the configuration yaml; can be needed to fetch extra information given by the user component: the component Plugin, same as tracer; corresponds to the paragraph :bash:`datavect/components/fluxes` in the configuration yaml Return: list_files: for each date that begins a period, an array containing the names of the files that are available for the dates within this period list_dates: for each date that begins a period, an array containing the names of the dates matching the files listed in list_files """ # List of possible dates datei, datef = input_interval list_period_dates = date_range( datei, datef, period=tracer.file_freq, close="") # Loop over dates list_files = {} list_dates = {} valid_files = [] for dd in list_period_dates: filename = os.path.join(ref_dir, dd.strftime(ref_file)) if not os.path.isfile(filename) or filename in valid_files: continue # Force dates to be full years with xr.open_dataset(filename) as ds: dates = pd.to_datetime(ds["time"].values) # Replace by correct year if is_climato if tracer.is_climato: year_ref = dd.year year_data = dates.year[0] dates += pd.DateOffset(years=year_ref - year_data) date_end = dates[-1] + pd.DateOffset(days=int(dates.days_in_month[-1])) dates = dates.append(pd.DatetimeIndex([date_end])) # Build the output dictionary dates = dates[(datei <= dates) & (dates <= datef)] if dates.empty: continue list_dates[dd] = [ [di.to_pydatetime(), df.to_pydatetime()] for di, df in zip(dates[:-1], dates[1:]) ] list_files[dd] = len(dates[:-1]) * [filename] # Fetching target_file = os.path.join(target_dir, os.path.basename(filename)) path.link(filename, target_file) valid_files.append(filename) return list_files, list_dates