Source code for pycif.plugins.datavects.standard.init_components

from logging import debug, warning, info
import tracemalloc
import numpy as np
import pandas as pd
import itertools

from ....utils.classes.domains import Domain
from ....utils.path import init_dir
from .fetch import default_fetch
from .utils import split_dates, clean_filenames


[docs] def init_components(plugin): # Do nothing if components not specified in datavect if not hasattr(plugin, "components"): return # Otherwise loop on components components = plugin.components for comp in components.attributes: component = getattr(components, comp) # Fetch parameters # If no parameters, handle the component as a whole if not hasattr(component, "parameters"): params = component parameters = [""] else: params = component.parameters parameters = params.attributes[:] + [""] # Loop over parameters to fetch information for trcr in parameters: debug("Initializing {}/{}".format(comp, trcr)) tracer = getattr(params, trcr, component) # By default is not in the target vector, nor in the obs vector tracer.iscontrol = getattr(tracer, "iscontrol", False) tracer.isobs = getattr(tracer, "isobs", False) # Fetch reference directory and file format # Replace value in parameter by value in component if no # value was specified explicitly in the yaml (= is_default) attributes2propagate = ["dir", "file", "file_freq", "varname"] attributes2empty = ["dir", "file", "varname"] for attr in attributes2propagate: if hasattr(tracer, attr) \ and attr in tracer.is_default_value: if hasattr(component, attr) \ and attr not in component.is_default_value: setattr(tracer, attr, getattr(component, attr)) elif not hasattr(tracer, attr): if hasattr(component, attr): setattr(tracer, attr, getattr(component, attr)) elif attr in attributes2empty: setattr(tracer, attr, "") # Fetch reference functions from the component for attr in ["read", "write", "fetch"]: if not hasattr(tracer, attr) \ or attr in tracer.is_default_value: if hasattr(component, attr): setattr(tracer, attr, getattr(component, attr)) else: if attr == "fetch": setattr(tracer, attr, default_fetch) else: setattr(tracer, attr, None) tracer.varname = getattr(tracer, "varname", "") # Initializes target directory and pass info to tracer target_dir = "{}/datavect/{}/{}/".format( plugin.workdir, comp, trcr ) init_dir(target_dir) tracer.orig_dir = tracer.dir tracer.dir = target_dir tracer.model = getattr(plugin, "model", None) tmp_datei = tracer.datei tmp_datef = tracer.datef # Fetch files and dates if tracer.fetch is not None: try: list_files, list_dates = tracer.fetch( tracer.orig_dir, tracer.file, [tmp_datei, tmp_datef], target_dir, component=component, tracer=tracer, ) except Exception as e: warning("An exception was raised in tracer.fetch " f"for the component/tracer {comp}/{trcr}") raise e # Check list_date format to be sure that the fetch function is well written for ddi in list_dates: if len(list_dates[ddi]) == 0: continue # If dates have only one dimension, bad format if len(np.shape(np.array(list_dates[ddi]))) == 1: raise Exception( "WARNING! `list_dates` does not have a proper format. " "The expected format is a list of 2-element tuples, " "for the range of validity of a given value.\n" "This can happen if the plugin you are using is out-dated, or " "if you are coding a new plugin.\n" f"The issue concerns the tracer {comp}/{trcr} " f"and the corresponding plugin is {tracer.fetch.__module__}" ) # Clean empty periods # Make sure that periods are datetime objects list_files = { pd.DatetimeIndex([ddi]).to_pydatetime()[0]: list_files[ddi] for ddi in list_files # if list_dates[ddi] != [] } list_dates = { pd.DatetimeIndex([ddi]).to_pydatetime()[0]: pd.DataFrame(np.array(list_dates[ddi]), columns=[ "start_date", "end_date"]).apply(pd.to_datetime) for ddi in list_dates } # Crop files and dates depending on global datei and datef try: list_files = { ddi: [ list_files[ddi][i] for i in np.where( (list_dates[ddi]["start_date"] <= tmp_datef) & (list_dates[ddi]["end_date"] >= tmp_datei))[0] ] for ddi in list_files} list_dates = { ddi: list_dates[ddi].loc[ (list_dates[ddi]["start_date"] <= tmp_datef) & (list_dates[ddi]["end_date"] >= tmp_datei)] for ddi in list_dates} except Exception as e: warning(e) pass # Re-organise list_files and list_dates if split_freq specified if getattr(tracer, "split_freq", None) is not None and list_dates != {}: list_dates, list_files = split_dates( plugin, tracer, list_dates, list_files) # Clean file names list_files = clean_filenames(list_files) # Attach info to the tracer tracer.input_dates = list_dates tracer.input_files = list_files # Save tracer min and max dates if list_dates != {}: tracer.min_date = np.min( pd.concat(list_dates.values(), ignore_index=True).stack().reset_index(drop=True)).to_pydatetime() tracer.max_date = np.max( pd.concat(list_dates.values(), ignore_index=True).stack().reset_index(drop=True)).to_pydatetime() else: tracer.min_date = tmp_datei tracer.max_date = tmp_datef # Saving tracer_dir into component dir # if not already available if not hasattr(component, "input_dates"): component.input_dates = list_dates if not hasattr(component, "input_files"): component.input_files = list_files # Dump list_dates and list_files to a user friendly text file # for debugging if plugin.dump_debug: toprint = "\n".join([ "\n".join( [dd.strftime("%Y%m%d %H:%M")] + [" {} -> {}: {}".format(d[1]["start_date"], d[1]["end_date"], f) if type(d[1]) == pd.Series else " {}: {}".format(d, f) for d, f in zip(list_dates[dd].iterrows(), list_files[dd])] ) for dd in list_dates ]) file_debug = "{}/datavect/{}.{}.txt".format( plugin.workdir, comp, trcr) with open(file_debug, "w") as f: f.write(toprint) # Check memory if requested if plugin.monitor_memory: current, peak = tracemalloc.get_traced_memory() debug(f"Current memory usage is {current / 1024 ** 2}MB; " f"Peak was {peak / 1024 ** 2}MB") # Get the domain and # change it to the domain side if lateral conditions if hasattr(tracer, "domain"): continue if hasattr(tracer, "get_domain"): tracer.domain = tracer.get_domain( tracer.orig_dir, tracer.file, tracer.input_dates, target_dir, tracer=tracer, ) # Check that the returned object is actually a domain if not isinstance(tracer.domain, Domain): plg = tracer.plugin tracer_module = tracer.get_registered( plg.name, plg.version, plg.type) raise Exception( f"The get_domain function for the tracer " f"{comp}/{trcr}, as defined in the Plugin " f"{plg.name}/{plg.version}/{plg.type} is incorrect. \n" f"Please check the code in the folder: {tracer_module}" ) else: tracer.domain = plugin.domain # Force getting sides if not already done if not hasattr(tracer.domain, "zlonc_side") \ and hasattr(tracer.domain, "zlonc"): tracer.domain.get_sides() # Pass arguments from root component to tracers if not # explicitly defined for attr in component.attributes: if attr != "plugin" and not hasattr(tracer, attr): setattr(tracer, attr, getattr(component, attr))