Source code for pycif.utils.classes.setup

import logging
import os
import datetime
import shutil
import subprocess
import numpy as np
from ...utils.check import init_log
from logging import info, debug, warning
from ...utils.check.errclass import PluginError
from .baseclass import Plugin
from . import disclaimer_end, disclaimer_beginning


[docs]class Setup(Plugin): @classmethod def run_simu(cls, args): # Dealing with relative and variable path def_file = os.path.abspath(os.path.expanduser(args["def_file"])) # Loading Yaml setup = cls.yaml_to_setup(def_file) # Save the starting time of the overall job setup.job_start_time = datetime.datetime.now() # Change verbose level if 'debug' is forced in the command arguments if args.get("debug", False): setup.verbose = 0 # Initialize the configuration setup = cls.init_config(setup) # Copying Yaml file for traceability of simulations try: shutil.copy(setup.def_file, setup.workdir) except shutil.SameFileError: debug("Yml already in working directory. Not copying.") setup.def_file = os.path.join( setup.workdir, os.path.basename(setup.def_file)) # Saving the branch and commit used to run the present simulation try: output = str( subprocess.check_output( ['git', 'branch'], cwd=os.path.dirname(__file__), universal_newlines=True ) ) branch = [a for a in output.split('\n') if a.find('*') >= 0][0] branch = branch[branch.find('*') + 2:] label = subprocess.check_output( ["git", "rev-parse", "HEAD"], cwd=os.path.dirname(__file__)).strip().decode() with open("{}/VERSION".format(setup.workdir), "w") as f: f.write("The present run was computed with the branch '{}' \n" "and the commit {}".format(branch, label)) except subprocess.CalledProcessError: warning( f"Could not find the branch and version of the pycif library " f"(in {os.path.dirname(__file__)}). Check that the folder is a git " f"repository. This can happen if pycif was set-up with a static install." ) with open("{}/VERSION".format(setup.workdir), "w") as f: f.write(f"No version could be determined for pycif at " f"{os.path.dirname(__file__)}") # Load the set-up cls.load_setup(setup, level=0) # Dump warning file about incorrect arguments cls.dump_incorrect(f"{setup.workdir}/incorrect_arguments.txt") # Append run argument to setup for arg in args: if arg == "def_file": continue if not hasattr(setup, arg): setattr(setup, arg, args[arg]) else: raise Exception("Trying to set argument value to setup from " "run arguments while already attributed: " "{}".format(arg)) # Saving the loaded configuration if getattr(setup, "dump_config", False): cls.to_yaml( setup, "{}/loaded.{}".format( setup.workdir, os.path.basename(setup.def_file) ), ) # Run the mode to_return = None if getattr(getattr(setup, "mode", None), "loaded_requirements", False): output = setup.mode.execute(**args) if getattr(setup, "return_config", False): to_return = output, setup else: to_return = output else: info( "pycif has correctly been initialized " "but no execution mode was specified" ) if getattr(setup, "return_config", False): to_return = setup # Post-computation logging logging.info(disclaimer_end) return to_return @classmethod def yaml_to_setup(cls, config_file): # TODO: Allow for another type of files than yaml? config_dict = cls.from_yaml(config_file) # Looking for others Yaml configs files in the main config_file config_dict = cls.yaml_subconfigs(config_dict) # Print the config dict and exit, if asked to if config_dict.get("print_dict_and_leave", False): def print_config_dict(conf_dict, level=0): if level == 0: print(level * " " + "{") for key in conf_dict: if type(conf_dict[key]) == type(conf_dict): print(level * " " + "'{}': ".format(key) + "{") print_config_dict(conf_dict[key], level + 1) print(level * " " + "},") elif type(conf_dict[key]) in [str, datetime.datetime]: print(level * " " + "'{}': '{}'," .format(key, conf_dict[key])) else: print(level * " " + "'{}': {}," .format(key, conf_dict[key])) if level == 0: print(level * " " + "},") print_config_dict(config_dict) exit() # Load a dictionary to a Setup recursive object setup = cls.from_dict(config_dict, convert_none=True) return setup @classmethod def init_config(cls, setup): # Check that mandatory arguments are specified mandatory = [ "datei", "datef", "workdir", "logfile", "verbose" ] missing = [k for k in mandatory if not hasattr(setup, k)] if missing: raise Exception( "Mandatory yaml arguments were not setup. \n" "Please check your Yaml file ({}): \n" "Missing arguments: \n".format(setup.def_file) + "\n".join([" - {}".format(k) for k in missing]) ) # Creates and initializes the log file logfile, workdir = init_log( setup.logfile, setup.workdir, setup.verbose ) # Write general information logging.info(disclaimer_beginning) # Write config info cls.config_info(setup) # Stop here if simulation period is empty if setup.datei == setup.datef: raise Exception( f"The simulation period is void. Stopping here:\n" f" - datei: {setup.datei}\n" f" - datef: {setup.datef}\n" f"Please revise you yml!" ) setup.logfile = logfile setup.workdir = workdir return setup @classmethod def load_config(cls, setup): # Initializes workdir and config cls.init_config(setup) # Initialize every plugin, requirements and data cls.load_setup(setup, level=0) return setup @classmethod def load_from_dict(cls, config_dict): setup = Setup.from_dict(config_dict) cls.load_setup(setup, level=1) return setup @classmethod def yaml_subconfigs(cls, config_dict): for key, value in config_dict.items(): if isinstance(value, dict): config_dict[key] = cls.yaml_subconfigs(value) else: if key == "file_yaml": if not os.path.isfile(value): raise OSError( "The Yaml path given is not a file : " "{}".format(value) ) if not os.path.exists(value): raise OSError( "The Yaml path given is not valid " "{}".format(value) ) config_dict = cls.from_yaml(value) return config_dict
[docs] @classmethod def config_info(cls, setup): """Prints out main input parameters for pyCIF """ verbose_txt = [ "pyCIF has been initialized with the following parameters:", "Yaml configuration file: {}".format(setup.def_file), "Log file: {}".format(setup.logfile), "Start date: {}".format(setup.datei), "End date: {}".format(setup.datef), "Working directory: {}".format(setup.workdir), ] list(map(lambda v: info(v), verbose_txt))
[docs] @classmethod def load_setup( cls, plg, parent_plg_type=None, level=999, tree="", list_levels=None, ignore_parent_type=False, **kwargs ): """Loads a Setup plugin. Loops recursively over all attributes of the setup to load: 1) sub-plugins are initialized as Plugin child-class templates ( Domain, ObsVect, Model, etc); 2) instances are saved to the Plugin class to be accessible for anywhere later one. This allows modifications of the data of a given plugin at some place of the code to be automatically forwarded to the rest of the code Args: self (Setup): the setup to load parent_plg_type (str): the last recognized plugin type that is inherited by children """ orig_name = getattr(plg, "orig_name", None) plg_name = getattr(getattr(plg, "plugin", None), "name", None) plg_version = getattr(getattr(plg, "plugin", None), "version", None) plg_type = getattr(getattr(plg, "plugin", None), "type", None) debug("Loading setup for {} / {} / {} / {} from {}: {}" .format(orig_name, plg_name, plg_version, plg_type, parent_plg_type, plg)) # Update list of different levels to keep track of the recursion if list_levels is None: list_levels = [orig_name] else: list_levels.append(orig_name) # Update orig_dict if not yet defined if level == 0: # Saves level 0 entries as reference plugins in requirements cls._save_refplugins(plg) cls._save_subrefplugins(plg) # Loop over self attributes and load them as other Class if necessary # If an argument 'todo_init' was specified, initialize only listed plg if "todo_init" in cls._get_attribute_list(plg): attributes = plg.todo_init else: attributes = [ a for a in cls._get_attribute_list(plg) if a != "plugin" ] # Keep in memory the root plg_type root_plg_type = parent_plg_type for attr in attributes: plg_attr = getattr(plg, attr) plg_tree = "{}/{}".format(tree, attr) # Re-initializing parent type to the root parent_plg_type = root_plg_type # Ignore plugin type to avoid initializing sub-structure ignore_plg_type = \ getattr(plg, "input_arguments", {}).get(attr, {}).get( "ignore_plg_type", True) or ignore_parent_type # For reference instances, check whether the Plugin was already # initialized as requirement; if so, just take it from reference if ( attr in cls.reference_instances and getattr(plg_attr, "isreference", False) and getattr( cls.reference_instances.get(attr, None), "loaded_class", False, ) ): setattr(plg, attr, cls.reference_instances[attr]) continue # If not a Plugin, continue if not issubclass(type(plg_attr), Plugin): continue # If is still a Setup class, means that should be processed and # Initialized if isinstance(plg_attr, Setup) and not getattr( plg_attr, "loaded_class", False ): # Load the plugin type depending on the attribute name # Do nothing if the attribute is named 'plugin' if attr != "plugin": parent_plg_type = plg_attr._load_plugin_type( attr, parent_plg_type ) # Build a child sub-class and # overwrite the Setup class if needed plg_attr = cls.childclass_factory( plg_attr, child_type=parent_plg_type, parent_plg=None if ignore_plg_type else plg ) # Keep in memory that the current attribute class is loaded plg_attr.loaded_class = True # Initializes the plugin from registered module if any if hasattr(plg_attr, "initiate_template") \ and not getattr(plg_attr, "loaded_template", False): plg_attr.initiate_template() # Saves the plugin to the class, # so it is accessible by everyone anywhere # (including its attributes and stored data) if hasattr(plg_attr, "plugin"): name = plg_attr.plugin.name version = plg_attr.plugin.version plg_type = plg_attr.plugin.type plg_subtype = plg_attr.plugin.subtype if (not cls.is_loaded(name, version, plg_type, plg_subtype) and name is not None): cls.save_loaded(plg_attr) plg_attr.loaded_template = True # Load all attributes recursively if not already done if not getattr(plg_attr, "loaded_attributes", False): if level >= cls.__maxrecursive__: raise Exception( "Maximum number of recursive levels " f"when initializing {plg_attr}") print(__file__) import code code.interact(local=dict(locals(), **globals())) cls.load_setup( plg_attr, parent_plg_type, level=level + 1, tree=plg_tree, list_levels=list_levels[:], ignore_parent_type=ignore_plg_type, **kwargs ) plg_attr.loaded_attributes = True # If requirements are not already loaded if not getattr(plg_attr, "loaded_requirements", False): # Load requirements cls._check_requirements( plg_attr, parent_plg_type, level, list_levels[:], **kwargs ) # The plugin has been correctly loaded at this point plg_attr.loaded_requirements = True # Initializes the plugin data if hasattr(plg_attr, "ini_data") \ and not getattr(plg_attr, "loaded_data", False): plg_attr.ini_data(**kwargs) plg_attr.loaded_data = True # Linking present plugin to reference level 0 if needed if getattr(plg_attr, "isreference", False): cls.reference_instances[attr] = plg_attr # Updating sub-references if needed else: plg_type = attr plg_subtype = attr if hasattr(plg_attr, "plugin"): plg_type = plg_attr.plugin.type plg_subtype = plg_attr.plugin.subtype if attr not in super(Setup, cls).subreference_instances: super(Setup, cls).subreference_instances[ (plg_type, plg_subtype) ] = {plg_tree: plg_attr} else: super(Setup, cls).subreference_instances[ (plg_type, plg_subtype) ][plg_tree] = plg_attr # Attach plugin to the parent plugin setattr(plg, attr, plg_attr) # Initialize default values if any cls._check_input_arguments(plg, level=level, tree=tree)
@classmethod def _check_input_arguments(cls, plg, level=999, substructure=False, debug=False, tree=""): # Do nothing if not at the first level of the yml # if level != 1: # return if not hasattr(plg, "is_default_value"): plg.is_default_value = [] allowed_attributes = ["plugin"] if hasattr(plg, "default_values"): for k in plg.default_values: if not hasattr(plg, k): setattr(plg, k, plg.default_values[k]) plg.is_default_value.append(k) allowed_attributes.extend(list(plg.default_values.keys())) elif hasattr(plg, "input_arguments"): if "any_key" in plg.input_arguments: if len(plg.input_arguments) != 1: raise Exception( "The key 'any_key' should be given individually. " "It is not compatible with other keys. ") ref_key = plg.input_arguments["any_key"] if "structure" not in ref_key: raise Exception( "'any_key' need a 'structure' to be applied") for k in plg.attributes: subplg = getattr(plg, k) subplg_tree = "{}/{}".format(tree, k) subplg.input_arguments = { **ref_key["structure"], **getattr(subplg, "input_arguments", {}) } err = cls._check_input_arguments( subplg, substructure=True, tree=subplg_tree) if err is not None: raise Exception( "The plugin {} ('{}') needs the input '{}' " "in sub-structure '{}'" " to run".format(plg, plg.orig_name, err, k)) for k in plg.input_arguments: kargument = plg.input_arguments[k] if kargument.get("default", None) is None \ and not hasattr(plg, k) \ and not kargument.get("optional", False): if not substructure: raise Exception( "The plugin {} needs the input {} to run" .format(plg, k)) else: return k if not hasattr(plg, k) \ and not kargument.get("default", None) is None: setattr(plg, k, kargument["default"]) plg.is_default_value.append(k) # Check that sub-structure is respected if hasattr(plg, k) and kargument.get("structure", None) is not None: subplg = getattr(plg, k) subplg_tree = "{}/{}".format(tree, k) subplg.input_arguments = { **kargument["structure"], **getattr(subplg, "input_arguments", {}) } # print("BBBBB", k) err = cls._check_input_arguments( subplg, substructure=True, tree=subplg_tree) if err is not None: raise Exception( "The plugin {} ('{}') needs the input '{}' " "in sub-structure '{}'" " to run".format(plg, plg.orig_name, err, k)) # subplg.is_default_value = [] # for subkey in kargument["structure"]: # subkargument = kargument["structure"][subkey] # if subkargument.get("default", None) is None \ # and not hasattr(subplg, subkey) \ # and not subkargument.get("optional", False): # raise Exception( # "The plugin {} needs the input {} " # "in the sub-structure {} to run" # .format(plg, k, subkey)) # # if not hasattr(subplg, subkey) \ # and not subkargument.get("default", None) is None: # setattr(subplg, subkey, subkargument["default"]) # subplg.is_default_value.append(k) allowed_attributes.extend(list(plg.input_arguments.keys())) # Check that attributes are compatible with allowed attributes if not hasattr(plg, "plugin"): return if allowed_attributes == ["plugin"] or "any_key" in allowed_attributes: return incorrect_attributes = [ k for k in plg.attributes if k not in allowed_attributes] if incorrect_attributes != []: cls.unauthorized_arguments[tree] = incorrect_attributes warning("The following arguments were prescribed in the Yaml file, " "whereas they are not documented. Please make sure you really " "want to carry on: \n" f"{incorrect_attributes}") @classmethod def _check_requirements( cls, plg, parent_plg_type=None, level=None, list_levels=None, **kwargs ): """Checking that required modules and plugins are loaded. If not, load them. Requirements are defined in the __init__.py file of the corresponding plugin module. Args: plg (Plugin): a plugin to initialize Notes: Some basic parameters are added as requirements to all plugins; These are: 'datei', 'datef', 'workdir', 'logfile', 'verbose' """ # Dealing with default requirements supposed to be given at level 0 for key in plg.default_requirements: if key not in cls._get_attribute_list(plg): if key in cls.reference_instances: setattr(plg, key, cls.reference_instances[key]) else: raise PluginError( "The default key '{}' is not prescribed " "neither in the plugin {}, nor in the " "level 0 of the configuration file".format(key, plg) ) # Update requirements if set_requirements is available plg.set_requirements() # Looping over requirements and including them for key in plg.requirements: key_req = plg.requirements[key] fromany = key_req.get("any", False) fromsub = key_req.get("subplug", False) preftree = key_req.get("preftree", "") empty = key_req.get("empty", False) name = key_req.get("name", None) version = key_req.get("version", "") plg_type = key_req.get("type", key) plg_subtype = key_req.get("subtype", "") newplg = key_req.get("newplg", False) # If not from any plugin, but no default value specified, error if not fromany and name is None: raise PluginError( "{} needs a specific {}, but none was specified \n" "Please check requirements in your module".format(plg, key) ) # If needs a Plugin explicitly defined, # look for it at level 0 of setup, or in children, # or in unambiguous level N plugins plg_tmp = cls._fetch_requirement( plg, key, name, version, plg_type, plg_subtype, fromsub, empty, preftree ) # If has a prescribed name tmp_plugin = getattr(plg_tmp, "plugin", None) tmp_name = getattr(tmp_plugin, "name", None) tmp_version = getattr(tmp_plugin, "version", None) tmp_type = getattr(tmp_plugin, "type", None) tmp_subtype = getattr(tmp_plugin, "subtype", None) if (tmp_name is not None and fromany) or ( tmp_name is not None and name == tmp_name and version == tmp_version and not fromany and (type(plg_tmp) == cls.get_subclass(plg_type) or tmp_type == plg_type or tmp_subtype == plg_type) ): plg_out = plg_tmp # If a default is defined, load from registered elif (name is not None and fromany) or ( tmp_name is None and not fromany ): try: plg_out = cls.load_registered( name, version, plg_type, plg_subtype=plg_subtype, plg_orig=plg_tmp ) except PluginError: raise PluginError( f"Could not find registered plugin {name}/{version} " f"of type {plg_type} " f"for the plugin {plg.plugin.name}/{plg.plugin.version}" f" of type {plg.plugin.type}" ) # Otherwise, if accepts empty classes from anywhere elif empty and fromany: plg_out = plg_tmp # Otherwise, empty from default elif empty and not fromany: plg_out = cls.load_registered(name, version, plg_type, plg_subtype) # Error in the yaml if reaching this point else: raise PluginError( "Plugin {} ({}/{}/{}) needs a plugin '{}/{}/{}' and an " "inconsistent one was proposed in the Yaml".format( plg, plg.plugin.name, plg.plugin.version, plg.plugin.type, key, name, version ) ) if plg_out is None: raise Exception( "{} needs a Plugin '{}' to run properly\n" "there is none in its children nor at the level 0 of " "Yaml\n" "Please check your Yaml".format(plg, key) ) # Keep in memory to initialize a new instance of the plugin or not if hasattr(plg, "plugin"): plg.plugin.newplg = newplg # Adding auxiliary attributes if any aux_ids = ["name", "version", "type", "any", "subplug", "preftree", "empty", "newplg"] for attr in key_req: if attr not in aux_ids: setattr(plg_out, attr, key_req[attr]) # Attaching the requirement to the parent plugin setattr(plg, key, plg_out) # Load the requirements if not already done cls.load_setup(plg, parent_plg_type, level + 1, list_levels=list_levels, **kwargs) @classmethod def _fetch_requirement( cls, plg, key, name, version, plg_type, plg_subtype, fromsub, empty, preftree ): possible_keys = [k for k in cls.subreference_instances if plg_type in k and ( plg_subtype == "" or plg_subtype in k)] possible_plg = [ cls.subreference_instances[k][s] for k in possible_keys for s in cls.subreference_instances[k]] pref_plg = [ (s, k) for k in possible_keys for s in cls.subreference_instances[k] if preftree in s ] # If in children if key in cls._get_attribute_list(plg): plg_tmp = getattr(plg, key) # If not in children but at level 0 of Yaml elif key in cls.reference_instances: plg_tmp = cls.reference_instances[key] # If not at level 0, but no ambiguity elif len(possible_plg) == 1 and fromsub: plg_tmp = possible_plg[0] # If not at level 0, but ambiguity # Take the one at highest yml level elif ( len(possible_plg) >= 1 and fromsub and len(pref_plg) >= 1 ): ind_preferred = np.argmin([s for s, k in pref_plg]) pref_key = pref_plg[ind_preferred][1] pref_tree = pref_plg[ind_preferred][0] plg_tmp = cls.subreference_instances[pref_key][pref_tree] elif empty: registered = cls.is_registered( name, version, plg_type, plg_subtype) if registered[0]: plg_tmp = cls.load_registered(name, version, plg_type, plg_subtype) else: try: plg_tmp = cls.get_subclass(plg_type, plg_subtype)() except PluginError: raise PluginError( "Failed fetching the following requirements for " "plugin {}/{} of type {}: {}" .format(plg.plugin.name, plg.plugin.version, plg.plugin.type, plg_type) ) # Error in the yaml if reaching this point else: plg_tmp = None raise PluginError( "{} ({}/{}) needs a plugin '{}/{}/{}' and an " "inconsistent one was proposed in the Yaml".format( plg, plg.plugin.name, plg.plugin.version, key, name, version ) ) return plg_tmp