Source code for pycif.plugins.obsvects.standard.fetch
import os
import datetime
import numpy as np
from ....utils import path
from logging import info
[docs]
def default_fetch(
ref_dir, ref_file, input_dates, target_dir, tracer=None, **kwargs
):
"""Resolve observation monitor files and symlink them to the run directory.
For each sub-simulation date in *input_dates*, expands *ref_dir* and
*ref_file* using ``strftime`` formatting, creates a symlink from the
source file to *target_dir*, and returns de-duplicated sorted lists of
local file paths and their associated dates.
Args:
ref_dir (str): directory template for the source files; may contain
``strftime`` format codes (e.g. ``/data/%Y/%m``).
ref_file (str): file name template; may contain ``strftime`` codes
(e.g. ``monitor_%Y%m%d.nc``).
input_dates (dict): mapping from sub-simulation start dates to lists
of dates for which files should be fetched.
target_dir (str): local run directory into which symbolic links are
written; the base name of each source file is preserved.
tracer: unused; accepted for interface consistency with other fetch
functions.
**kwargs: unused; accepted for interface consistency.
Returns:
tuple[dict, dict]: a pair ``(list_files, list_dates)`` where both are
dicts keyed by the same sub-simulation start dates as *input_dates*.
Each value is a sorted, de-duplicated list of local file paths or
resolved datetime objects respectively.
"""
info("Fetching input files using directory and file format")
info(f"{ref_dir}/{ref_file}")
list_files = {}
list_dates = {}
for datei in input_dates:
tmp_files = []
tmp_dates = []
for dd in input_dates[datei]:
dir_dd = dd.strftime(ref_dir)
file_dd = dd.strftime(ref_file)
tmp_files.append(f"{dir_dd}/{file_dd}")
tmp_dates.append(
dd if file_dd == ref_file else
datetime.datetime.strptime(file_dd, ref_file))
# Fetching
local_files = []
for f in tmp_files:
target_file = f"{target_dir}/{os.path.basename(f)}"
path.link(f, target_file)
local_files.append(target_file)
# Remove duplicates
dates_unique, index_unique = np.unique(tmp_dates, return_index=True)
files_unique = np.array(local_files)[index_unique]
index_sorted = np.argsort(dates_unique)
list_files[datei] = list(files_unique[index_sorted])
list_dates[datei] = list(dates_unique[index_sorted])
return list_files, list_dates