Source code for pycif.plugins.datavects.standard.utils

import numpy as np
import itertools
import datetime
import os
import pathlib
import pandas as pd
from ....utils.dates import date_range


[docs] def split_dates(plugin, tracer, list_dates, list_files): out_periods = date_range( plugin.datei, plugin.datef, period=tracer.split_freq ) if np.array(list(list_dates.keys())).shape == out_periods.shape: if np.all(np.array(list(list_dates.keys())) == out_periods): return list_dates, list_files all_dates = pd.concat(list_dates.values(), ignore_index=True) all_files = list(itertools.chain(*list_files.values())) # Expand out_periods before and after if needed if all_dates.stack().min() < out_periods.min(): out_periods = np.concatenate([ [all_dates.stack().min()], out_periods ]) out_dates = {} out_files = {} for dd0, dd1 in zip(out_periods[:-1], out_periods[1:]): if all_dates.ndim == 2: ids = np.where( ((dd0 <= all_dates["start_date"]).values & (all_dates["start_date"] <= dd1).values & (all_dates["start_date"] == all_dates["end_date"]).values) | ((dd0 <= all_dates["start_date"]).values & (all_dates["start_date"] < dd1).values & ( all_dates["start_date"] != all_dates["end_date"]).values) )[0] out_dates[dd0] = all_dates.iloc[ids] elif all_dates.ndim == 1: raise Exception( "all_dates has only one dimension, but should have two (start_date and end_date)") else: raise Exception("all_dates has too many dimensions") out_files[dd0] = [all_files[i] for i in ids] all_dates = all_dates.reset_index(drop=True).drop(ids) all_files = [ all_files[i] for i in range(len(all_files)) if i not in ids ] out_dates[out_periods[-1]] = all_dates out_files[out_periods[-1]] = list(all_files) return out_dates, out_files
[docs] def clean_filenames(list_files): """Cleaning file names to be sure to avoid double-slash and other problematic combinations Args: list_files (dict[datetime.datetime]): dictionary of input files. For each entry, list_files can be either a string (file name) or a list of file names Returns: dict[datetime.datetime]: Curated list_files """ for ddi in list_files: for k, files in enumerate(list_files[ddi]): # Different treatment if list or string if isinstance(files, list): for i, ff in enumerate(files): if os.path.isfile(ff): files[i] = str(pathlib.Path(ff).resolve()) continue # Now for strings list_files[ddi][k] = str(pathlib.Path(files).resolve()) return list_files