Source code for pycif.plugins.obsvects.standard.fetch

import os
import datetime
import numpy as np

from ....utils import path
from logging import info


[docs] def default_fetch( ref_dir, ref_file, input_dates, target_dir, tracer=None, **kwargs ): """Resolve observation monitor files and symlink them to the run directory. For each sub-simulation date in *input_dates*, expands *ref_dir* and *ref_file* using ``strftime`` formatting, creates a symlink from the source file to *target_dir*, and returns de-duplicated sorted lists of local file paths and their associated dates. Args: ref_dir (str): directory template for the source files; may contain ``strftime`` format codes (e.g. ``/data/%Y/%m``). ref_file (str): file name template; may contain ``strftime`` codes (e.g. ``monitor_%Y%m%d.nc``). input_dates (dict): mapping from sub-simulation start dates to lists of dates for which files should be fetched. target_dir (str): local run directory into which symbolic links are written; the base name of each source file is preserved. tracer: unused; accepted for interface consistency with other fetch functions. **kwargs: unused; accepted for interface consistency. Returns: tuple[dict, dict]: a pair ``(list_files, list_dates)`` where both are dicts keyed by the same sub-simulation start dates as *input_dates*. Each value is a sorted, de-duplicated list of local file paths or resolved datetime objects respectively. """ info("Fetching input files using directory and file format") info(f"{ref_dir}/{ref_file}") list_files = {} list_dates = {} for datei in input_dates: tmp_files = [] tmp_dates = [] for dd in input_dates[datei]: dir_dd = dd.strftime(ref_dir) file_dd = dd.strftime(ref_file) tmp_files.append(f"{dir_dd}/{file_dd}") tmp_dates.append( dd if file_dd == ref_file else datetime.datetime.strptime(file_dd, ref_file)) # Fetching local_files = [] for f in tmp_files: target_file = f"{target_dir}/{os.path.basename(f)}" path.link(f, target_file) local_files.append(target_file) # Remove duplicates dates_unique, index_unique = np.unique(tmp_dates, return_index=True) files_unique = np.array(local_files)[index_unique] index_sorted = np.argsort(dates_unique) list_files[datei] = list(files_unique[index_sorted]) list_dates[datei] = list(dates_unique[index_sorted]) return list_files, list_dates