Source code for pycif.plugins.datastreams.fluxes.wrfchem.fetch

import datetime
import glob
import os
import re

import pandas as pd

from .....utils import path
from .times_in_wrf_file import times_in_wrf_file


[docs] def fetch(ref_dir, ref_file, input_interval, target_dir, tracer=None, **kwargs): """Links flux files to target_dir, returns file list Inputs: --------- ref_dir: directory where the original files are found (yml: dir) ref_file: (template) name of the original files (yml: file) input_interval: list of the periods to simulate, each item is the list of the dates of the period # FR: I think now it's just start and end date to simulate (2021-09-20) target_dir: directory where the links to the orginal files are created Ouputs: --------- list_files: for each date that begins a period, an array containing the names of the files that are available for the dates within this period list_dates: for each date that begins a period, an array containing the names of the dates mathcin the files listed in list_files VERSION HISTORY (before incremental git commit messages) 2021-09-22 freum Reverted 2021-09-21 to one item per file 2021-09-21 freum Changed list_dates to one item per subsimu start. Left the original lines in there but commented. 2021-09-20 freum After talking to Antoine, copied first part of chimere's fetch instead, then adapted ctdas' observationoperator_wrfchem.py into it 2021-08-20 freum Original from flux_template_plugin, raising NotImplementedError at beginning """ # Get names of all flux files in ref_dir glob_pattern = re.sub("%[a-zA-Z]", "*", ref_file) all_files = glob.glob(os.path.join(ref_dir, glob_pattern)) all_files.sort() # Get list of all flux periods and corresponding files in all_files. # Per advise from Antoine, it's one list for the whole period, # and the name of the list is the start date of the period. t0 = input_interval[0] list_files = {t0: []} list_dates = {t0: []} for fp in all_files: # Get times in this file that fall in the simulation period. times_file = times_in_wrf_file(fp) times_add = [time for time in times_file if (input_interval[0] <= time < input_interval[1])] n_add = len(times_add) # If there are any, add to list_dates and list_files if n_add > 0: time_file = datetime.datetime.strptime(os.path.basename(fp), ref_file) periods = [ list(pd.date_range(start=t, periods=2, freq=tracer.flux_freq).to_pydatetime()) for t in times_add ] list_dates[t0].extend(periods) list_files[t0].extend([fp]*n_add) # Check that the flux dates that were found are the same as # requested in the config yml dates_yml = pd.date_range(start=input_interval[0], end=input_interval[1], freq=tracer.flux_freq, inclusive="left") dates_found = [d[0] for dd in list_dates for d in list_dates[dd]] # Mod that forces calls to read.py and write.py for development: # - Rename a flux file so that fetch.py doesn't find it anymore, e.g. # mv wrfchemi_d01_2015-06-01_03:00:00 bak.wrfchemi_d01_2015-06-01_03:00:00 # - Comment raising error on flux dates below # - Modify list_dates to cover the simulation period without gaps despite the missing file, e.g.: # list_dates[datetime.datetime(2015, 6, 1, 0, 0)][2] = [datetime.datetime(2015, 6, 1, 2, 0), datetime.datetime(2015, 6, 1, 4, 0)] if set(dates_yml.to_pydatetime()) != set(dates_found): msg = ("Available flux dates in {} do not match those " + "requested in config yml.").format(ref_dir) raise ValueError(msg) # Fetch files for _, fps in list_files.items(): for fp in fps: fn = os.path.basename(fp) fp_target = os.path.join(target_dir, fn) path.link(fp, fp_target) return list_files, list_dates