Source code for pycif.plugins.datavects.standard.utils
import numpy as np
import itertools
import datetime
import os
import pathlib
import pandas as pd
from ....utils.dates import date_range
[docs]
def split_dates(plugin, tracer, list_dates, list_files):
out_periods = date_range(
plugin.datei, plugin.datef, period=tracer.split_freq
)
if np.array(list(list_dates.keys())).shape == out_periods.shape:
if np.all(np.array(list(list_dates.keys())) == out_periods):
return list_dates, list_files
all_dates = pd.concat(list_dates.values(),
ignore_index=True)
all_files = list(itertools.chain(*list_files.values()))
# Expand out_periods before and after if needed
if all_dates.stack().min() < out_periods.min():
out_periods = np.concatenate([
[all_dates.stack().min()], out_periods
])
out_dates = {}
out_files = {}
for dd0, dd1 in zip(out_periods[:-1], out_periods[1:]):
if all_dates.ndim == 2:
ids = np.where(
((dd0 <= all_dates["start_date"]).values & (all_dates["start_date"] <= dd1).values & (all_dates["start_date"] == all_dates["end_date"]).values) |
((dd0 <= all_dates["start_date"]).values & (all_dates["start_date"] < dd1).values & (
all_dates["start_date"] != all_dates["end_date"]).values)
)[0]
out_dates[dd0] = all_dates.iloc[ids]
elif all_dates.ndim == 1:
raise Exception(
"all_dates has only one dimension, but should have two (start_date and end_date)")
else:
raise Exception("all_dates has too many dimensions")
out_files[dd0] = [all_files[i] for i in ids]
all_dates = all_dates.reset_index(drop=True).drop(ids)
all_files = [
all_files[i] for i in range(len(all_files))
if i not in ids
]
out_dates[out_periods[-1]] = all_dates
out_files[out_periods[-1]] = list(all_files)
return out_dates, out_files
[docs]
def clean_filenames(list_files):
"""Cleaning file names to be sure to avoid double-slash and other problematic combinations
Args:
list_files (dict[datetime.datetime]): dictionary of input files.
For each entry, list_files can be either a string (file name) or a list of file names
Returns:
dict[datetime.datetime]: Curated list_files
"""
for ddi in list_files:
for k, files in enumerate(list_files[ddi]):
# Different treatment if list or string
if isinstance(files, list):
for i, ff in enumerate(files):
if os.path.isfile(ff):
files[i] = str(pathlib.Path(ff).resolve())
continue
# Now for strings
list_files[ddi][k] = str(pathlib.Path(files).resolve())
return list_files