import logging
import os
import datetime
import shutil
import subprocess
import numpy as np
from ...utils.check import init_log
from logging import info, debug, warning
from ...utils.check.errclass import PluginError
from .baseclass import Plugin
from . import disclaimer_end, disclaimer_beginning
[docs]class Setup(Plugin):
@classmethod
def run_simu(cls, args):
# Dealing with relative and variable path
def_file = os.path.abspath(os.path.expanduser(args["def_file"]))
# Loading Yaml
setup = cls.yaml_to_setup(def_file)
# Save the starting time of the overall job
setup.job_start_time = datetime.datetime.now()
# Change verbose level if 'debug' is forced in the command arguments
if args.get("debug", False):
setup.verbose = 0
# Initialize the configuration
setup = cls.init_config(setup)
# Copying Yaml file for traceability of simulations
try:
shutil.copy(setup.def_file, setup.workdir)
except shutil.SameFileError:
debug("Yml already in working directory. Not copying.")
setup.def_file = os.path.join(
setup.workdir,
os.path.basename(setup.def_file))
# Saving the branch and commit used to run the present simulation
try:
output = str(
subprocess.check_output(
['git', 'branch'], cwd=os.path.dirname(__file__),
universal_newlines=True
)
)
branch = [a for a in output.split('\n') if a.find('*') >= 0][0]
branch = branch[branch.find('*') + 2:]
label = subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=os.path.dirname(__file__)).strip().decode()
with open("{}/VERSION".format(setup.workdir), "w") as f:
f.write("The present run was computed with the branch '{}' \n"
"and the commit {}".format(branch, label))
except subprocess.CalledProcessError:
warning(
f"Could not find the branch and version of the pycif library "
f"(in {os.path.dirname(__file__)}). Check that the folder is a git "
f"repository. This can happen if pycif was set-up with a static install."
)
with open("{}/VERSION".format(setup.workdir), "w") as f:
f.write(f"No version could be determined for pycif at "
f"{os.path.dirname(__file__)}")
# Load the set-up
cls.load_setup(setup, level=0)
# Dump warning file about incorrect arguments
cls.dump_incorrect(f"{setup.workdir}/incorrect_arguments.txt")
# Append run argument to setup
for arg in args:
if arg == "def_file":
continue
if not hasattr(setup, arg):
setattr(setup, arg, args[arg])
else:
raise Exception("Trying to set argument value to setup from "
"run arguments while already attributed: "
"{}".format(arg))
# Saving the loaded configuration
if getattr(setup, "dump_config", False):
cls.to_yaml(
setup,
"{}/loaded.{}".format(
setup.workdir, os.path.basename(setup.def_file)
),
)
# Run the mode
to_return = None
if getattr(getattr(setup, "mode", None), "loaded_requirements", False):
output = setup.mode.execute(**args)
if getattr(setup, "return_config", False):
to_return = output, setup
else:
to_return = output
else:
info(
"pycif has correctly been initialized "
"but no execution mode was specified"
)
if getattr(setup, "return_config", False):
to_return = setup
# Post-computation logging
logging.info(disclaimer_end)
return to_return
@classmethod
def yaml_to_setup(cls, config_file):
# TODO: Allow for another type of files than yaml?
config_dict = cls.from_yaml(config_file)
# Looking for others Yaml configs files in the main config_file
config_dict = cls.yaml_subconfigs(config_dict)
# Print the config dict and exit, if asked to
if config_dict.get("print_dict_and_leave", False):
def print_config_dict(conf_dict, level=0):
if level == 0:
print(level * " " + "{")
for key in conf_dict:
if type(conf_dict[key]) == type(conf_dict):
print(level * " " + "'{}': ".format(key) + "{")
print_config_dict(conf_dict[key], level + 1)
print(level * " " + "},")
elif type(conf_dict[key]) in [str, datetime.datetime]:
print(level * " " + "'{}': '{}',"
.format(key, conf_dict[key]))
else:
print(level * " " + "'{}': {},"
.format(key, conf_dict[key]))
if level == 0:
print(level * " " + "},")
print_config_dict(config_dict)
exit()
# Load a dictionary to a Setup recursive object
setup = cls.from_dict(config_dict, convert_none=True)
return setup
@classmethod
def init_config(cls, setup):
# Check that mandatory arguments are specified
mandatory = [
"datei", "datef", "workdir", "logfile", "verbose"
]
missing = [k for k in mandatory if not hasattr(setup, k)]
if missing:
raise Exception(
"Mandatory yaml arguments were not setup. \n"
"Please check your Yaml file ({}): \n"
"Missing arguments: \n".format(setup.def_file)
+ "\n".join([" - {}".format(k) for k in missing])
)
# Creates and initializes the log file
logfile, workdir = init_log(
setup.logfile, setup.workdir, setup.verbose
)
# Write general information
logging.info(disclaimer_beginning)
# Write config info
cls.config_info(setup)
# Stop here if simulation period is empty
if setup.datei == setup.datef:
raise Exception(
f"The simulation period is void. Stopping here:\n"
f" - datei: {setup.datei}\n"
f" - datef: {setup.datef}\n"
f"Please revise you yml!"
)
setup.logfile = logfile
setup.workdir = workdir
return setup
@classmethod
def load_config(cls, setup):
# Initializes workdir and config
cls.init_config(setup)
# Initialize every plugin, requirements and data
cls.load_setup(setup, level=0)
return setup
@classmethod
def load_from_dict(cls, config_dict):
setup = Setup.from_dict(config_dict)
cls.load_setup(setup, level=1)
return setup
@classmethod
def yaml_subconfigs(cls, config_dict):
for key, value in config_dict.items():
if isinstance(value, dict):
config_dict[key] = cls.yaml_subconfigs(value)
else:
if key == "file_yaml":
if not os.path.isfile(value):
raise OSError(
"The Yaml path given is not a file : "
"{}".format(value)
)
if not os.path.exists(value):
raise OSError(
"The Yaml path given is not valid "
"{}".format(value)
)
config_dict = cls.from_yaml(value)
return config_dict
[docs] @classmethod
def config_info(cls, setup):
"""Prints out main input parameters for pyCIF
"""
verbose_txt = [
"pyCIF has been initialized with the following parameters:",
"Yaml configuration file: {}".format(setup.def_file),
"Log file: {}".format(setup.logfile),
"Start date: {}".format(setup.datei),
"End date: {}".format(setup.datef),
"Working directory: {}".format(setup.workdir),
]
list(map(lambda v: info(v), verbose_txt))
[docs] @classmethod
def load_setup(
cls, plg, parent_plg_type=None, level=999, tree="",
list_levels=None, ignore_parent_type=False, **kwargs
):
"""Loads a Setup plugin.
Loops recursively over all attributes of the setup to load:
1) sub-plugins are initialized as Plugin child-class templates (
Domain, ObsVect, Model, etc);
2) instances are saved to the Plugin class to be accessible for
anywhere later one.
This allows modifications of the data of a given plugin at some place
of the code to be automatically forwarded to the rest of the code
Args:
self (Setup): the setup to load
parent_plg_type (str): the last recognized plugin type that is
inherited by children
"""
orig_name = getattr(plg, "orig_name", None)
plg_name = getattr(getattr(plg, "plugin", None), "name", None)
plg_version = getattr(getattr(plg, "plugin", None), "version", None)
plg_type = getattr(getattr(plg, "plugin", None), "type", None)
debug("Loading setup for {} / {} / {} / {} from {}: {}"
.format(orig_name, plg_name, plg_version, plg_type,
parent_plg_type, plg))
# Update list of different levels to keep track of the recursion
if list_levels is None:
list_levels = [orig_name]
else:
list_levels.append(orig_name)
# Update orig_dict if not yet defined
if level == 0:
# Saves level 0 entries as reference plugins in requirements
cls._save_refplugins(plg)
cls._save_subrefplugins(plg)
# Loop over self attributes and load them as other Class if necessary
# If an argument 'todo_init' was specified, initialize only listed plg
if "todo_init" in cls._get_attribute_list(plg):
attributes = plg.todo_init
else:
attributes = [
a for a in cls._get_attribute_list(plg) if a != "plugin"
]
# Keep in memory the root plg_type
root_plg_type = parent_plg_type
for attr in attributes:
plg_attr = getattr(plg, attr)
plg_tree = "{}/{}".format(tree, attr)
# Re-initializing parent type to the root
parent_plg_type = root_plg_type
# Ignore plugin type to avoid initializing sub-structure
ignore_plg_type = \
getattr(plg, "input_arguments",
{}).get(attr, {}).get(
"ignore_plg_type", True) or ignore_parent_type
# For reference instances, check whether the Plugin was already
# initialized as requirement; if so, just take it from reference
if (
attr in cls.reference_instances
and getattr(plg_attr, "isreference", False)
and getattr(
cls.reference_instances.get(attr, None),
"loaded_class",
False,
)
):
setattr(plg, attr, cls.reference_instances[attr])
continue
# If not a Plugin, continue
if not issubclass(type(plg_attr), Plugin):
continue
# If is still a Setup class, means that should be processed and
# Initialized
if isinstance(plg_attr, Setup) and not getattr(
plg_attr, "loaded_class", False
):
# Load the plugin type depending on the attribute name
# Do nothing if the attribute is named 'plugin'
if attr != "plugin":
parent_plg_type = plg_attr._load_plugin_type(
attr, parent_plg_type
)
# Build a child sub-class and
# overwrite the Setup class if needed
plg_attr = cls.childclass_factory(
plg_attr, child_type=parent_plg_type,
parent_plg=None if ignore_plg_type else plg
)
# Keep in memory that the current attribute class is loaded
plg_attr.loaded_class = True
# Initializes the plugin from registered module if any
if hasattr(plg_attr, "initiate_template") \
and not getattr(plg_attr, "loaded_template", False):
plg_attr.initiate_template()
# Saves the plugin to the class,
# so it is accessible by everyone anywhere
# (including its attributes and stored data)
if hasattr(plg_attr, "plugin"):
name = plg_attr.plugin.name
version = plg_attr.plugin.version
plg_type = plg_attr.plugin.type
plg_subtype = plg_attr.plugin.subtype
if (not cls.is_loaded(name, version, plg_type, plg_subtype)
and name is not None):
cls.save_loaded(plg_attr)
plg_attr.loaded_template = True
# Load all attributes recursively if not already done
if not getattr(plg_attr, "loaded_attributes", False):
if level >= cls.__maxrecursive__:
raise Exception(
"Maximum number of recursive levels "
f"when initializing {plg_attr}")
print(__file__)
import code
code.interact(local=dict(locals(), **globals()))
cls.load_setup(
plg_attr,
parent_plg_type,
level=level + 1,
tree=plg_tree,
list_levels=list_levels[:],
ignore_parent_type=ignore_plg_type,
**kwargs
)
plg_attr.loaded_attributes = True
# If requirements are not already loaded
if not getattr(plg_attr, "loaded_requirements", False):
# Load requirements
cls._check_requirements(
plg_attr, parent_plg_type, level,
list_levels[:], **kwargs
)
# The plugin has been correctly loaded at this point
plg_attr.loaded_requirements = True
# Initializes the plugin data
if hasattr(plg_attr, "ini_data") \
and not getattr(plg_attr, "loaded_data", False):
plg_attr.ini_data(**kwargs)
plg_attr.loaded_data = True
# Linking present plugin to reference level 0 if needed
if getattr(plg_attr, "isreference", False):
cls.reference_instances[attr] = plg_attr
# Updating sub-references if needed
else:
plg_type = attr
plg_subtype = attr
if hasattr(plg_attr, "plugin"):
plg_type = plg_attr.plugin.type
plg_subtype = plg_attr.plugin.subtype
if attr not in super(Setup, cls).subreference_instances:
super(Setup, cls).subreference_instances[
(plg_type, plg_subtype)
] = {plg_tree: plg_attr}
else:
super(Setup, cls).subreference_instances[
(plg_type, plg_subtype)
][plg_tree] = plg_attr
# Attach plugin to the parent plugin
setattr(plg, attr, plg_attr)
# Initialize default values if any
cls._check_input_arguments(plg, level=level, tree=tree)
@classmethod
def _check_input_arguments(cls, plg, level=999, substructure=False,
debug=False, tree=""):
# Do nothing if not at the first level of the yml
# if level != 1:
# return
if not hasattr(plg, "is_default_value"):
plg.is_default_value = []
allowed_attributes = ["plugin"]
if hasattr(plg, "default_values"):
for k in plg.default_values:
if not hasattr(plg, k):
setattr(plg, k, plg.default_values[k])
plg.is_default_value.append(k)
allowed_attributes.extend(list(plg.default_values.keys()))
elif hasattr(plg, "input_arguments"):
if "any_key" in plg.input_arguments:
if len(plg.input_arguments) != 1:
raise Exception(
"The key 'any_key' should be given individually. "
"It is not compatible with other keys. ")
ref_key = plg.input_arguments["any_key"]
if "structure" not in ref_key:
raise Exception(
"'any_key' need a 'structure' to be applied")
for k in plg.attributes:
subplg = getattr(plg, k)
subplg_tree = "{}/{}".format(tree, k)
subplg.input_arguments = {
**ref_key["structure"],
**getattr(subplg, "input_arguments", {})
}
err = cls._check_input_arguments(
subplg, substructure=True, tree=subplg_tree)
if err is not None:
raise Exception(
"The plugin {} ('{}') needs the input '{}' "
"in sub-structure '{}'"
" to run".format(plg, plg.orig_name, err, k))
for k in plg.input_arguments:
kargument = plg.input_arguments[k]
if kargument.get("default", None) is None \
and not hasattr(plg, k) \
and not kargument.get("optional", False):
if not substructure:
raise Exception(
"The plugin {} needs the input {} to run"
.format(plg, k))
else:
return k
if not hasattr(plg, k) \
and not kargument.get("default", None) is None:
setattr(plg, k, kargument["default"])
plg.is_default_value.append(k)
# Check that sub-structure is respected
if hasattr(plg, k) and kargument.get("structure", None) is not None:
subplg = getattr(plg, k)
subplg_tree = "{}/{}".format(tree, k)
subplg.input_arguments = {
**kargument["structure"],
**getattr(subplg, "input_arguments", {})
}
# print("BBBBB", k)
err = cls._check_input_arguments(
subplg, substructure=True, tree=subplg_tree)
if err is not None:
raise Exception(
"The plugin {} ('{}') needs the input '{}' "
"in sub-structure '{}'"
" to run".format(plg, plg.orig_name, err, k))
# subplg.is_default_value = []
# for subkey in kargument["structure"]:
# subkargument = kargument["structure"][subkey]
# if subkargument.get("default", None) is None \
# and not hasattr(subplg, subkey) \
# and not subkargument.get("optional", False):
# raise Exception(
# "The plugin {} needs the input {} "
# "in the sub-structure {} to run"
# .format(plg, k, subkey))
#
# if not hasattr(subplg, subkey) \
# and not subkargument.get("default", None) is None:
# setattr(subplg, subkey, subkargument["default"])
# subplg.is_default_value.append(k)
allowed_attributes.extend(list(plg.input_arguments.keys()))
# Check that attributes are compatible with allowed attributes
if not hasattr(plg, "plugin"):
return
if allowed_attributes == ["plugin"] or "any_key" in allowed_attributes:
return
incorrect_attributes = [
k for k in plg.attributes if k not in allowed_attributes]
if incorrect_attributes != []:
cls.unauthorized_arguments[tree] = incorrect_attributes
warning("The following arguments were prescribed in the Yaml file, "
"whereas they are not documented. Please make sure you really "
"want to carry on: \n"
f"{incorrect_attributes}")
@classmethod
def _check_requirements(
cls, plg, parent_plg_type=None, level=None,
list_levels=None, **kwargs
):
"""Checking that required modules and plugins are loaded.
If not, load them.
Requirements are defined in the __init__.py file of the
corresponding plugin module.
Args:
plg (Plugin): a plugin to initialize
Notes: Some basic parameters are added as requirements to all plugins;
These are:
'datei', 'datef', 'workdir', 'logfile', 'verbose'
"""
# Dealing with default requirements supposed to be given at level 0
for key in plg.default_requirements:
if key not in cls._get_attribute_list(plg):
if key in cls.reference_instances:
setattr(plg, key, cls.reference_instances[key])
else:
raise PluginError(
"The default key '{}' is not prescribed "
"neither in the plugin {}, nor in the "
"level 0 of the configuration file".format(key, plg)
)
# Update requirements if set_requirements is available
plg.set_requirements()
# Looping over requirements and including them
for key in plg.requirements:
key_req = plg.requirements[key]
fromany = key_req.get("any", False)
fromsub = key_req.get("subplug", False)
preftree = key_req.get("preftree", "")
empty = key_req.get("empty", False)
name = key_req.get("name", None)
version = key_req.get("version", "")
plg_type = key_req.get("type", key)
plg_subtype = key_req.get("subtype", "")
newplg = key_req.get("newplg", False)
# If not from any plugin, but no default value specified, error
if not fromany and name is None:
raise PluginError(
"{} needs a specific {}, but none was specified \n"
"Please check requirements in your module".format(plg, key)
)
# If needs a Plugin explicitly defined,
# look for it at level 0 of setup, or in children,
# or in unambiguous level N plugins
plg_tmp = cls._fetch_requirement(
plg, key, name, version, plg_type, plg_subtype, fromsub, empty,
preftree
)
# If has a prescribed name
tmp_plugin = getattr(plg_tmp, "plugin", None)
tmp_name = getattr(tmp_plugin, "name", None)
tmp_version = getattr(tmp_plugin, "version", None)
tmp_type = getattr(tmp_plugin, "type", None)
tmp_subtype = getattr(tmp_plugin, "subtype", None)
if (tmp_name is not None and fromany) or (
tmp_name is not None
and name == tmp_name
and version == tmp_version
and not fromany
and (type(plg_tmp) == cls.get_subclass(plg_type)
or tmp_type == plg_type or tmp_subtype == plg_type)
):
plg_out = plg_tmp
# If a default is defined, load from registered
elif (name is not None and fromany) or (
tmp_name is None and not fromany
):
try:
plg_out = cls.load_registered(
name, version, plg_type, plg_subtype=plg_subtype,
plg_orig=plg_tmp
)
except PluginError:
raise PluginError(
f"Could not find registered plugin {name}/{version} "
f"of type {plg_type} "
f"for the plugin {plg.plugin.name}/{plg.plugin.version}"
f" of type {plg.plugin.type}"
)
# Otherwise, if accepts empty classes from anywhere
elif empty and fromany:
plg_out = plg_tmp
# Otherwise, empty from default
elif empty and not fromany:
plg_out = cls.load_registered(name, version, plg_type,
plg_subtype)
# Error in the yaml if reaching this point
else:
raise PluginError(
"Plugin {} ({}/{}/{}) needs a plugin '{}/{}/{}' and an "
"inconsistent one was proposed in the Yaml".format(
plg, plg.plugin.name, plg.plugin.version,
plg.plugin.type, key, name, version
)
)
if plg_out is None:
raise Exception(
"{} needs a Plugin '{}' to run properly\n"
"there is none in its children nor at the level 0 of "
"Yaml\n"
"Please check your Yaml".format(plg, key)
)
# Keep in memory to initialize a new instance of the plugin or not
if hasattr(plg, "plugin"):
plg.plugin.newplg = newplg
# Adding auxiliary attributes if any
aux_ids = ["name", "version", "type", "any", "subplug", "preftree",
"empty", "newplg"]
for attr in key_req:
if attr not in aux_ids:
setattr(plg_out, attr, key_req[attr])
# Attaching the requirement to the parent plugin
setattr(plg, key, plg_out)
# Load the requirements if not already done
cls.load_setup(plg, parent_plg_type, level + 1,
list_levels=list_levels, **kwargs)
@classmethod
def _fetch_requirement(
cls, plg, key, name, version, plg_type, plg_subtype, fromsub, empty,
preftree
):
possible_keys = [k for k in cls.subreference_instances
if plg_type in k and (
plg_subtype == "" or plg_subtype in k)]
possible_plg = [
cls.subreference_instances[k][s]
for k in possible_keys for s in cls.subreference_instances[k]]
pref_plg = [
(s, k)
for k in possible_keys for s in cls.subreference_instances[k]
if preftree in s
]
# If in children
if key in cls._get_attribute_list(plg):
plg_tmp = getattr(plg, key)
# If not in children but at level 0 of Yaml
elif key in cls.reference_instances:
plg_tmp = cls.reference_instances[key]
# If not at level 0, but no ambiguity
elif len(possible_plg) == 1 and fromsub:
plg_tmp = possible_plg[0]
# If not at level 0, but ambiguity
# Take the one at highest yml level
elif (
len(possible_plg) >= 1
and fromsub
and len(pref_plg) >= 1
):
ind_preferred = np.argmin([s for s, k in pref_plg])
pref_key = pref_plg[ind_preferred][1]
pref_tree = pref_plg[ind_preferred][0]
plg_tmp = cls.subreference_instances[pref_key][pref_tree]
elif empty:
registered = cls.is_registered(
name, version, plg_type, plg_subtype)
if registered[0]:
plg_tmp = cls.load_registered(name, version, plg_type,
plg_subtype)
else:
try:
plg_tmp = cls.get_subclass(plg_type, plg_subtype)()
except PluginError:
raise PluginError(
"Failed fetching the following requirements for "
"plugin {}/{} of type {}: {}"
.format(plg.plugin.name,
plg.plugin.version,
plg.plugin.type,
plg_type)
)
# Error in the yaml if reaching this point
else:
plg_tmp = None
raise PluginError(
"{} ({}/{}) needs a plugin '{}/{}/{}' and an "
"inconsistent one was proposed in the Yaml".format(
plg, plg.plugin.name, plg.plugin.version, key, name, version
)
)
return plg_tmp