from logging import debug, warning, info
import tracemalloc
import numpy as np
import pandas as pd
import itertools
from ....utils.classes.domains import Domain
from ....utils.path import init_dir
from .fetch import default_fetch
from .utils import split_dates, clean_filenames
[docs]
def init_components(plugin):
# Do nothing if components not specified in datavect
if not hasattr(plugin, "components"):
return
# Otherwise loop on components
components = plugin.components
for comp in components.attributes:
component = getattr(components, comp)
# Fetch parameters
# If no parameters, handle the component as a whole
if not hasattr(component, "parameters"):
params = component
parameters = [""]
else:
params = component.parameters
parameters = params.attributes[:] + [""]
# Loop over parameters to fetch information
for trcr in parameters:
debug("Initializing {}/{}".format(comp, trcr))
tracer = getattr(params, trcr, component)
# By default is not in the target vector, nor in the obs vector
tracer.iscontrol = getattr(tracer, "iscontrol", False)
tracer.isobs = getattr(tracer, "isobs", False)
# Fetch reference directory and file format
# Replace value in parameter by value in component if no
# value was specified explicitly in the yaml (= is_default)
attributes2propagate = ["dir", "file", "file_freq", "varname"]
attributes2empty = ["dir", "file", "varname"]
for attr in attributes2propagate:
if hasattr(tracer, attr) \
and attr in tracer.is_default_value:
if hasattr(component, attr) \
and attr not in component.is_default_value:
setattr(tracer, attr, getattr(component, attr))
elif not hasattr(tracer, attr):
if hasattr(component, attr):
setattr(tracer, attr, getattr(component, attr))
elif attr in attributes2empty:
setattr(tracer, attr, "")
# Fetch reference functions from the component
for attr in ["read", "write", "fetch"]:
if not hasattr(tracer, attr) \
or attr in tracer.is_default_value:
if hasattr(component, attr):
setattr(tracer, attr, getattr(component, attr))
else:
if attr == "fetch":
setattr(tracer, attr, default_fetch)
else:
setattr(tracer, attr, None)
tracer.varname = getattr(tracer, "varname", "")
# Initializes target directory and pass info to tracer
target_dir = "{}/datavect/{}/{}/".format(
plugin.workdir, comp, trcr
)
init_dir(target_dir)
tracer.orig_dir = tracer.dir
tracer.dir = target_dir
tracer.model = getattr(plugin, "model", None)
tmp_datei = tracer.datei
tmp_datef = tracer.datef
# Fetch files and dates
if tracer.fetch is not None:
try:
list_files, list_dates = tracer.fetch(
tracer.orig_dir,
tracer.file,
[tmp_datei, tmp_datef],
target_dir,
component=component,
tracer=tracer,
)
except Exception as e:
warning("An exception was raised in tracer.fetch "
f"for the component/tracer {comp}/{trcr}")
raise e
# Check list_date format to be sure that the fetch function is well written
for ddi in list_dates:
if len(list_dates[ddi]) == 0:
continue
# If dates have only one dimension, bad format
if len(np.shape(np.array(list_dates[ddi]))) == 1:
raise Exception(
"WARNING! `list_dates` does not have a proper format. "
"The expected format is a list of 2-element tuples, "
"for the range of validity of a given value.\n"
"This can happen if the plugin you are using is out-dated, or "
"if you are coding a new plugin.\n"
f"The issue concerns the tracer {comp}/{trcr} "
f"and the corresponding plugin is {tracer.fetch.__module__}"
)
# Clean empty periods
# Make sure that periods are datetime objects
list_files = {
pd.DatetimeIndex([ddi]).to_pydatetime()[0]:
list_files[ddi] for ddi in list_files
# if list_dates[ddi] != []
}
list_dates = {
pd.DatetimeIndex([ddi]).to_pydatetime()[0]:
pd.DataFrame(np.array(list_dates[ddi]), columns=[
"start_date", "end_date"]).apply(pd.to_datetime)
for ddi in list_dates
}
# Crop files and dates depending on global datei and datef
try:
list_files = {
ddi: [
list_files[ddi][i]
for i in np.where(
(list_dates[ddi]["start_date"] <= tmp_datef) &
(list_dates[ddi]["end_date"] >= tmp_datei))[0]
]
for ddi in list_files}
list_dates = {
ddi: list_dates[ddi].loc[
(list_dates[ddi]["start_date"] <= tmp_datef) &
(list_dates[ddi]["end_date"] >= tmp_datei)]
for ddi in list_dates}
except Exception as e:
warning(e)
pass
# Re-organise list_files and list_dates if split_freq specified
if getattr(tracer, "split_freq", None) is not None and list_dates != {}:
list_dates, list_files = split_dates(
plugin, tracer, list_dates, list_files)
# Clean file names
list_files = clean_filenames(list_files)
# Attach info to the tracer
tracer.input_dates = list_dates
tracer.input_files = list_files
# Save tracer min and max dates
if list_dates != {}:
tracer.min_date = np.min(
pd.concat(list_dates.values(),
ignore_index=True).stack().reset_index(drop=True)).to_pydatetime()
tracer.max_date = np.max(
pd.concat(list_dates.values(),
ignore_index=True).stack().reset_index(drop=True)).to_pydatetime()
else:
tracer.min_date = tmp_datei
tracer.max_date = tmp_datef
# Saving tracer_dir into component dir
# if not already available
if not hasattr(component, "input_dates"):
component.input_dates = list_dates
if not hasattr(component, "input_files"):
component.input_files = list_files
# Dump list_dates and list_files to a user friendly text file
# for debugging
if plugin.dump_debug:
toprint = "\n".join([
"\n".join(
[dd.strftime("%Y%m%d %H:%M")] +
[" {} -> {}: {}".format(d[1]["start_date"], d[1]["end_date"], f)
if type(d[1]) == pd.Series else
" {}: {}".format(d, f)
for d, f in zip(list_dates[dd].iterrows(), list_files[dd])]
)
for dd in list_dates
])
file_debug = "{}/datavect/{}.{}.txt".format(
plugin.workdir, comp, trcr)
with open(file_debug, "w") as f:
f.write(toprint)
# Check memory if requested
if plugin.monitor_memory:
current, peak = tracemalloc.get_traced_memory()
debug(f"Current memory usage is {current / 1024 ** 2}MB; "
f"Peak was {peak / 1024 ** 2}MB")
# Get the domain and
# change it to the domain side if lateral conditions
if hasattr(tracer, "domain"):
continue
if hasattr(tracer, "get_domain"):
tracer.domain = tracer.get_domain(
tracer.orig_dir,
tracer.file,
tracer.input_dates,
target_dir,
tracer=tracer,
)
# Check that the returned object is actually a domain
if not isinstance(tracer.domain, Domain):
plg = tracer.plugin
tracer_module = tracer.get_registered(
plg.name, plg.version, plg.type)
raise Exception(
f"The get_domain function for the tracer "
f"{comp}/{trcr}, as defined in the Plugin "
f"{plg.name}/{plg.version}/{plg.type} is incorrect. \n"
f"Please check the code in the folder: {tracer_module}"
)
else:
tracer.domain = plugin.domain
# Force getting sides if not already done
if not hasattr(tracer.domain, "zlonc_side") \
and hasattr(tracer.domain, "zlonc"):
tracer.domain.get_sides()
# Pass arguments from root component to tracers if not
# explicitly defined
for attr in component.attributes:
if attr != "plugin" and not hasattr(tracer, attr):
setattr(tracer, attr, getattr(component, attr))