Source code for pycif.plugins.datastreams.fluxes.wrfchem.fetch
import datetime
import glob
import os
import re
import pandas as pd
from .....utils import path
from .times_in_wrf_file import times_in_wrf_file
[docs]
def fetch(ref_dir, ref_file, input_interval, target_dir, tracer=None, **kwargs):
"""Links flux files to target_dir, returns file list
Inputs:
---------
ref_dir: directory where the original files are found (yml: dir)
ref_file: (template) name of the original files (yml: file)
input_interval: list of the periods to simulate, each item is the list
of the dates of the period
# FR: I think now it's just start and end date to
simulate (2021-09-20)
target_dir: directory where the links to the orginal files are created
Ouputs:
---------
list_files: for each date that begins a period, an array containing
the names of the files that are available for the dates
within this period
list_dates: for each date that begins a period, an array containing
the names of the dates mathcin the files listed in
list_files
VERSION HISTORY (before incremental git commit messages)
2021-09-22 freum Reverted 2021-09-21 to one item per file
2021-09-21 freum Changed list_dates to one item per subsimu
start. Left the original lines in there but
commented.
2021-09-20 freum After talking to Antoine, copied first part of
chimere's fetch instead, then adapted ctdas'
observationoperator_wrfchem.py into it
2021-08-20 freum Original from flux_template_plugin, raising
NotImplementedError at beginning
"""
# Get names of all flux files in ref_dir
glob_pattern = re.sub("%[a-zA-Z]", "*", ref_file)
all_files = glob.glob(os.path.join(ref_dir, glob_pattern))
all_files.sort()
# Get list of all flux periods and corresponding files in all_files.
# Per advise from Antoine, it's one list for the whole period,
# and the name of the list is the start date of the period.
t0 = input_interval[0]
list_files = {t0: []}
list_dates = {t0: []}
for fp in all_files:
# Get times in this file that fall in the simulation period.
times_file = times_in_wrf_file(fp)
times_add = [time for time in times_file
if (input_interval[0] <= time < input_interval[1])]
n_add = len(times_add)
# If there are any, add to list_dates and list_files
if n_add > 0:
time_file = datetime.datetime.strptime(os.path.basename(fp), ref_file)
periods = [
list(pd.date_range(start=t,
periods=2,
freq=tracer.flux_freq).to_pydatetime())
for t in times_add
]
list_dates[t0].extend(periods)
list_files[t0].extend([fp]*n_add)
# Check that the flux dates that were found are the same as
# requested in the config yml
dates_yml = pd.date_range(start=input_interval[0],
end=input_interval[1],
freq=tracer.flux_freq,
inclusive="left")
dates_found = [d[0]
for dd in list_dates
for d in list_dates[dd]]
# Mod that forces calls to read.py and write.py for development:
# - Rename a flux file so that fetch.py doesn't find it anymore, e.g.
# mv wrfchemi_d01_2015-06-01_03:00:00 bak.wrfchemi_d01_2015-06-01_03:00:00
# - Comment raising error on flux dates below
# - Modify list_dates to cover the simulation period without gaps despite the missing file, e.g.:
# list_dates[datetime.datetime(2015, 6, 1, 0, 0)][2] = [datetime.datetime(2015, 6, 1, 2, 0), datetime.datetime(2015, 6, 1, 4, 0)]
if set(dates_yml.to_pydatetime()) != set(dates_found):
msg = ("Available flux dates in {} do not match those " +
"requested in config yml.").format(ref_dir)
raise ValueError(msg)
# Fetch files
for _, fps in list_files.items():
for fp in fps:
fn = os.path.basename(fp)
fp_target = os.path.join(target_dir, fn)
path.link(fp, fp_target)
return list_files, list_dates