Source code for pycif.plugins.datastreams.fluxes.chimere.fetch

import os
import datetime
import pandas as pd
import xarray as xr
import itertools
from .....utils import path
from .....utils.classes.setup import Setup


[docs] def fetch(ref_dir, ref_file, input_interval, target_dir, tracer=None, **kwargs): list_period_dates = \ pd.date_range(input_interval[0], input_interval[1], freq=tracer.file_freq) list_dates = {} list_files = {} for dd in list_period_dates: file = dd.strftime("{}/{}".format(ref_dir, ref_file)) if os.path.isfile(file): file_hours = pd.date_range( dd, dd + pd.to_timedelta(tracer.file_freq), freq="1H") list_dates[dd] = [[hh, hh + datetime.timedelta(hours=1)] for hh in file_hours] list_files[dd] = (len(file_hours) * [file]) # Fetching target_file = "{}/{}".format(target_dir, os.path.basename(file)) path.link(file, target_file) # Raise exception if no file was found if not list(itertools.chain(*list_files.values())): raise Exception( "Could not find any file fitting the requested structure:\n" "Please check your informations: \n" " - ref_dir: {}\n" " - ref_file: {}\n" " - date_range: {}\n" " - file_freq: {}\n" "This set-up leads to the following dates and files being looped on:\n" .format(ref_dir, ref_file, input_interval, tracer.file_freq) + "\n".join([dd.strftime(" - {}: {}/{}" .format(dd, ref_dir, ref_file)) for dd in list_period_dates]) ) # Alter vertical domain depending on input files # if bottom_top dimension is available file_ref = list(itertools.chain(*list_files.values()))[0] nlevemis = 0 try: nlevemis = xr.open_dataset(file_ref).dims["bottom_top"] except KeyError: pass if nlevemis > 0: domain_in = tracer.domain if domain_in.nlev != nlevemis: domain_out = Setup.load_registered( domain_in.plugin.name, domain_in.plugin.version, "domain", plg_orig=domain_in ) domain_out.nlon = domain_in.nlon domain_out.nlat = domain_in.nlat domain_out.zlon = domain_in.zlon domain_out.zlat = domain_in.zlat domain_out.zlonc = domain_in.zlonc domain_out.zlatc = domain_in.zlatc domain_out.nlon_side = domain_in.nlon_side domain_out.nlat_side = domain_in.nlat_side domain_out.zlonc_side = domain_in.zlonc_side domain_out.zlatc_side = domain_in.zlatc_side domain_out.zlon_side = domain_in.zlon_side domain_out.zlat_side = domain_in.zlat_side # Read nlevemis from one of the available files domain_out.pressure_unit = domain_in.pressure_unit domain_out.nlev = nlevemis domain_out.sigma_a = domain_in.sigma_a[:domain_out.nlev + 1] domain_out.sigma_b = domain_in.sigma_b[:domain_out.nlev + 1] domain_out.sigma_a_mid = domain_in.sigma_a_mid[:domain_out.nlev] domain_out.sigma_b_mid = domain_in.sigma_b_mid[:domain_out.nlev] tracer.domain = domain_out return list_files, list_dates