Source code for pycif.plugins.datastreams.meteos.tm5_meteo.fetch

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import pandas as pd
import numpy as np
from .....utils import path
from logging import info, debug

# JvP 20210517: added import statements, MODULE_NAME module level logger
import logging
import sys
import subprocess
from .....utils.dates import date_range
MODULE_NAME = __name__[__name__.index(
    'TM5'):] if 'TM5' in __name__ else __name__
logger = logging.getLogger(MODULE_NAME)

# Original fetch function by A. Berchet


[docs] def fetch_AB( ref_dir, ref_file, input_interval, target_dir, tracer=None, component=None, **kwargs ): """Reads meteorology and links to the working directory Args: meteo (dictionary): dictionary defining the domain. Should include dirmeteo to be able to read the meteorology datei (datetime.datetime): initial date for the inversion window datef (datetime.datetime): end date for the inversion window workdir (str): path to the working directory where meteo files should be copied logfile (str): path to the log file filetypes ([str]): list of file radicals to copy in the working directory **kwargs (dictionary): extra arguments Return: ???????? Notes: At some point, include option to compute mass fluxes for LMDz, with different physics What is needed to do that? Possible only on CCRT? Flexibility to define new domains Can be very heavy and not necessarily relevant """ info("Copying meteo files from {} to {}".format(ref_dir, target_dir)) # File directories and prefixes filedirs = {"an0tr1": ["albedo_%Y%m%d_00p01.nc", "sr_%Y%m%d_00p01.nc", "veg_%Y%m%d_00p01.nc"], "h06h18tr1": ["blh_%Y%m%d_00p01.nc", "ci_%Y%m%d_00p01.nc", "cp_%Y%m%d_00p01.nc"], "h06h18tr3": ["cld_%Y%m%d_00p03.nc", "convec_%Y%m%d_00p03.nc", "mfw_%Y%m%d_00p03.nc"]} # Create the sub-directory to store meteo files path.init_dir(target_dir) # Loop over dates and file types all_dates = ( pd.DatetimeIndex( [dd for di in input_interval for dd in input_interval[di]]) + pd.offsets.MonthBegin(0)).unique() for date in all_dates: for filedir in filedirs: for filetype in filedirs[filedir]: source = date.strftime("{}/{}/{}".format( ref_dir, getattr(component, "dir_{}".format(filedir)), filetype )) target = date.strftime( "{}/{}".format(target_dir, filetype)) try: path.link(source, target) except OSError: debug("Could not find file: {}".format(source)) list_files = {datei: [] for datei in input_interval} list_dates = {datei: [] for datei in input_interval} return list_files, list_dates
# New fetch function by J.C.A. van Peet
[docs] def fetch(ref_dir, ref_file, input_interval, target_dir, tracer=None, component=None, **kwargs): """ PURPOSE Link to meteo directory from datavect/meteo. Use "cp -as ..." to create symbolic links: the directory names are copied, but files in the directories are linked. VERSION HISTORY 2.0 17-05-2021 by J.C.A. van Peet Changed function, it now copies the full meteo directory as links. 1.0 28-04-2021 by A. Berchet Original code (see above) """ # Set the name of this function PROG_NAME = MODULE_NAME+".fetch" # Local logger logger = logging.getLogger(PROG_NAME) logger.setLevel(logging.DEBUG) # Print debug statements logger.debug("") logger.debug("*"*30) logger.debug(PROG_NAME+" => DEBUG:") logger.debug(" ref_dir = %s", ref_dir) logger.debug(" ref_file = %s", ref_file) logger.debug(" input_interval = %s", input_interval) logger.debug(" target_dir = %s", target_dir) logger.debug(" tracer = %s", tracer) logger.debug(" component = %s", component) logger.debug(" kwargs = %s", kwargs) logger.debug("*"*30) logger.debug("") # First clean the target directory if os.path.isdir(target_dir): path.remove(f"{target_dir}/*") # Copy the meteo directory with 'cp -as' so that the files are copied # as symbolic links, while retaining the original directory structure. # The -n option prevents error messages if the target file already exists. with subprocess.Popen('cp -Rsn {}/* {}'.format(ref_dir, target_dir), shell=True, cwd='.', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1) as p: for line in p.stdout: logger.debug(line.strip()) # end with # # Check return code if (p.returncode != 0): print("") print(f"{PROG_NAME} => SOME ERROR IN CMD!") print(f" p.returncode = {p.returncode}") print(" Computer says no!") print("") raise RuntimeError # end if # logger.debug("") # logger.debug("*"*30) # logger.debug("JvP: Computer says no!") # logger.debug("*"*30) # logger.debug("") # try: # raise RuntimeError # except RuntimeError as e: # #logger.exception("OOPS!") # logger.critical(e, exc_info=True) # #raise # => Will display the traceback on screen a second time # sys.exit() # => Just exit. # end try # Apparently, this function is supposed to return two lists: # list_files and list_dates. Not sure what the format is though, # I should ask Antoine at some point... ddi, ddf = input_interval list_files = {ddi: []} list_dates = {ddi: np.append( date_range(ddi, ddf, period="1MS")[:-1, np.newaxis], date_range(ddi, ddf, period="1MS")[1:, np.newaxis], axis=1) } return list_files, list_dates
# end function fetch