Source code for pycif.utils.classes.baseclass

import importlib
import os
from inspect import ismethod
from types import MethodType, FunctionType, ModuleType
import sys

import numpy as np
import pandas as pd
import yaml

from ...utils.check.errclass import PluginError
from ...utils.yml import ordered_load, ordered_dump
from ...utils import dates


[docs] class Plugin(object): """Base class for all pyCIF plugin objects. Stores plugin identity (type, name, version, subtype), manages the global registry of available plugins, and provides the machinery for loading, initialising and wiring plugin instances from YAML configuration. Attributes: plugin_types (dict): Mapping of recognised plugin-type names to their module path and class name. plugin_subtypes (dict): Mapping of plugin types to their allowed subtypes. registered (dict): Registry of all registered plugin modules, keyed by (name, version, type, subtype). loaded_instances (dict): Cache of all instantiated plugins. reference_instances (dict): Level-0 plugin instances from the setup file. subreference_instances (dict): All sub-level plugin instances. unauthorized_arguments (dict): Arguments found in the YAML that are not declared in any plugin's ``input_arguments``. ref_config: Reference configuration (set once at load time). """ # Authorized Plugins plugin_types = { "chemistry": [".chemistries", "Chemistry"], "controlvect": [".controlvects", "ControlVect"], "datavect": [".datavects", "DataVect"], "datastream": [".datastreams", "DataStream"], "domain": [".domains", "Domain"], "measurements": [".measurements", "Measurement"], "minimizer": [".minimizers", "Minimizer"], "mode": [".modes", "Mode"], "model": [".models", "Model"], "obsoperator": [".obsoperators", "ObsOperator"], "obsparser": [".obsparsers", "ObsParser"], "obsvect": [".obsvects", "ObsVect"], "platform": [".platforms", "Platform"], "simulator": [".simulators", "Simulator"], "transform": [".transforms", "Transform"], "setup": [".setup", "Setup"], } plugin_subtypes = {t: {"": ""} for t in plugin_types} plugin_subtypes["datastream"] = { "meteo": ".meteos", "flux": ".fluxes", "background": ".backgrounds", "field": ".fields" } plugin_subtypes["transform"] = { "basic": ".basic", "complex": ".complex", "system": ".system" } # Registered and already loaded plugins registered = {} loaded_instances = {} reference_instances = {} subreference_instances = {} # Unauthorized arguments unauthorized_arguments = {} # Reference Setup ref_config = None # Maximum recursive level __maxrecursive__ = 50 def __init__(self, plg_orig=None, orig_name="", **kwargs): """Initialise a Plugin, optionally copying attributes from another plugin. Args: plg_orig (Plugin, optional): If provided, all public attributes of ``plg_orig`` are copied to the new instance. orig_name (str): Name of the plugin as declared in the YAML config. Defaults to ``""``. **kwargs: Additional key/value pairs set as attributes on the new instance. """ # Set the orig_name self.orig_name = orig_name # Update the plugin with attributes from plg_orig attributes = self._get_attribute_list(plg_orig) for attr in attributes: if not callable(getattr(self, attr, None)): setattr(self, attr, getattr(plg_orig, attr)) # Initialize the list of attributes if not already done if not hasattr(self, "attributes"): self.attributes = [] # Update attributes according to args if len(kwargs) > 0: for kw in kwargs: setattr(self, kw, kwargs[kw]) # Setting default requirements self.default_requirements = { key: {"any": True, "empty": True} for key in [ "datei", "datef", "workdir", "logfile", "verbose", "monitor_memory" ] } # Initiating requirements if not specified if not hasattr(self, "requirements"): self.requirements = {} def __repr__(self): """Return a human-readable string representation of the plugin. Returns: str: ``<Plugin type/subtype/name/version at 0x…>`` when plugin metadata is available, otherwise the default Python object repr. """ plg = getattr(self, 'plugin', self) try: if plg.subtype is not None and plg.subtype: name_list = [plg.type, plg.subtype, plg.name, plg.version] else: name_list = [plg.type, plg.name, plg.version] name_list = [str(name) for name in name_list] return f"<Plugin {'/'.join(name_list)} at {hex(id(self))}>" except AttributeError: default_repr = ( f"<{self.__class__.__module__}.{self.__class__.__name__} " f"object at {hex(id(self))}>" ) return default_repr @classmethod def _get_attribute_list(cls, plg): """Return public attributes of a plugin, excluding dunder names. Args: plg (Plugin): Plugin instance or class to inspect. Returns: list[str]: List of attribute names not starting with ``_``. """ return [a for a in dir(plg) if not a.startswith("_")]
[docs] @staticmethod def plugin_key(name, version, plugin_type, subtype): """Creates the key for a name, version and plugin type Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: set: dict key for a plugin and version """ return name, version, plugin_type, subtype
[docs] @classmethod def print_registered( cls, print_requirement=False, print_rst=False, types=[], names=[], versions=[], stream=None ): """Print in a user-friendly format the list of available plugins Args: print_requirement (bool): for each registered plugin, print its requirements print_rst (bool): print in rst format for automatic use in the documentations types (list): list of types of Plugins to print names (list): list of names of Plugins to print version (list): list of versions of Plugins to print stream: stream to which send the display. By default, stdout is used. A stream to a given file can be used instead. """ if stream is None: stream = sys.stdout keys = [ k for k in list(cls.registered.keys()) if (k[0] in names or names == []) and (k[1] in versions or versions == []) and (k[2] in types or types == []) ] names, versions, types, subtypes = list(zip(*keys)) modules = [ cls.get_registered(n, v, t, st) for n, v, t, st in zip(names, versions, types, subtypes) ] print( "List of all available plugins and requirements " "for each class: \n\n", file=stream ) all_types = sorted(list(set(types))) for tt in all_types: rst_type = cls.plugin_types[tt][0][1:] if not print_rst: print("\t", tt, file=stream) else: print("\t", ":doc:`{}</documentation/plugins/{}/index>`" .format(tt, rst_type), file=stream) plg_type_list = sorted([ (n, v, t, st, mod) for n, v, t, st, mod in zip(names, versions, types, subtypes, modules) if t == tt]) sub_types = sorted(list(set([t[3] for t in plg_type_list]))) for stt in sub_types: rst_subtype = cls.plugin_subtypes[tt][stt][1:] indent = "\t\t" if len(sub_types) > 1: indent = "\t\t\t" if not print_rst: print("\t\t - ", stt, file=stream) else: print("\t\t - ", ":doc:`{}</documentation/plugins/{}/{}/index>`" .format(stt, rst_type, rst_subtype), file=stream) plg_subtype_list = sorted([ (n, v, t, st, mod) for n, v, t, st, mod in plg_type_list if st == stt]) for n, v, t, st, mod in plg_subtype_list: if not print_rst: print("{}- {}, {}".format(indent, n, v), file=stream) else: print("{}- :doc:`{}, {}</documentation/plugins/{}/{}/{}>`" .format(indent, n, v, rst_type, rst_subtype, cls.registered[n, v, t, st].split(".")[-1]), file=stream) # Print requirements if print_requirement and hasattr(mod, "requirements"): print("{}\tRequires:".format(indent), file=stream) for req in mod.requirements: mod_req = mod.requirements[req] name = mod_req.get("name", None) version = mod_req.get("version", None) any = mod_req.get("any", False) empty = mod_req.get("empty", "") req_type = mod_req.get("type", req) req_subtype = mod_req.get("subtype", "") if not print_rst: print("{}\t\t- {}:".format(indent, req), file=stream) else: plg_req = cls.from_dict({ "plugin": { "name": name, "version": version, "type": req_type, "subtype": req_subtype } }) plg_req._load_plugin_type(req) req_rst_type = \ cls.plugin_types[plg_req.plugin.type][0][1:] req_rst_subtype = \ cls.plugin_subtypes[ plg_req.plugin.type][ plg_req.plugin.subtype][1:] print("{}\t\t" "- :doc:`{}</documentation/plugins/{}/{}/index>`:" .format(indent, req, req_rst_type, req_subtype), file=stream) print( "{}\t\t\t- name: {}".format(indent, name), file=stream) print("{}\t\t\t- version: {}".format(indent, version), file=stream) print( "{}\t\t\t- any: {}".format(indent, any), file=stream) print( "{}\t\t\t- empty: {}".format(indent, empty), file=stream) print("\n", file=stream)
[docs] @classmethod def is_registered(cls, name, version, plugin_type, subtype): """Check if a plugin is registered Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: bool: True if a parser is registered key: the key of the registered Plugin if registered """ if cls.plugin_key(name, version, plugin_type, subtype) in cls.registered: return True, cls.plugin_key(name, version, plugin_type, subtype) elif subtype == "": # If given a sub-type, loop on all types and check if valid sub-type if not cls.is_allowed(plugin_type): matching = [k for k in cls.registered.keys() if (k[0], k[1], k[3]) == (name, version, plugin_type)] # Otherwise loop on sub-types else: matching = [k for k in cls.registered.keys() if (k[0], k[1], k[2]) == (name, version, plugin_type)] if len(matching) == 1: return True, matching[0] elif len(matching) > 1: raise PluginError("Multiple plugins matching required definition for " "{}/{}/{}: \n{}".format( name, version, plugin_type, matching)) return False, cls.plugin_key(name, version, plugin_type, subtype)
[docs] @classmethod def is_allowed(cls, plugin_type): """Check whether a plugin type is allowed in pyCIF or not Args: plugin_type (str): type of plugin Returns: bool: True if allowed plugin """ return plugin_type in cls.plugin_types
[docs] @classmethod def is_allowed_as_subtype(cls, plugin_type): """Check whether a plugin type is allowed as the sub-type of a main type in pyCIF or not Args: plugin_type (str): type of plugin Returns: bool: True if allowed plugin """ matching = [ t for t in cls.plugin_subtypes if plugin_type in cls.plugin_subtypes[t]] if len(matching) == 1: return True, matching[0] return False, matching
[docs] @classmethod def is_loaded(cls, name, version, plugin_type, subtype=""): """Check whether a plugin is loaded Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: bool """ return ( cls.plugin_key(name, version, plugin_type, subtype) in cls.loaded_instances )
[docs] @classmethod def register_plugin(cls, name, version, module, plugin_type="", subtype="", **kwargs): """Register a module for a plugin and version with possibly options Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin module (types.ModuleType): module defining the interface between pyCIF and the plugin **kwargs (dictionary): default options for module """ registered = cls.is_registered(name, version, plugin_type, subtype) if registered[0]: raise ValueError( "Already created a Module " "for plugin {} (version {}) and type {}".format( name, version, plugin_type ) + ("and sub-type {}".format(subtype) if subtype != "" else "") ) cls.registered[registered[1]] = module.__name__
[docs] @classmethod def get_registered(cls, name, version, plugin_type, subtype=""): """Get the correct registered plugin, given its name, version and type Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: Plugin: plugin module for plugin version """ registered = cls.is_registered(name, version, plugin_type, subtype) if not registered[0]: raise PluginError( "No {}/{} module found for plugin {} and Version {}".format( plugin_type, subtype, name, version ) ) module = importlib.import_module(cls.registered[registered[1]]) # Get input_arguments from the main class pycif_package = __package__.split(".")[0] class_package = "{}.plugins{}".format( pycif_package, cls.plugin_types[registered[1][2]][0]) class_module = importlib.import_module(class_package) module.input_arguments = getattr(module, "input_arguments", {}) if hasattr(class_module, "input_arguments"): module.input_arguments = dict( class_module.input_arguments, **module.input_arguments) # Clean sub-modules that are also in input argument for arg_in in module.input_arguments: if hasattr(module, arg_in): delattr(module, arg_in) return module
[docs] @classmethod def get_loaded(cls, name, version, plugin_type, subtype=""): """Get the correct loaded plugin, given its name, version and type (and optionally subtype) Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: Plugin: plugin module for plugin version """ if not cls.is_loaded(name, version, plugin_type, subtype=""): raise PluginError( "No {} module loaded for plugin {} and Version {}".format( plugin_type, name, version ) ) return cls.loaded_instances[cls.plugin_key(name, version, plugin_type, subtype)]
[docs] @classmethod def get_subclass(cls, plg_type, plg_subtype=""): """Get the plugin class template from a given type Args: plg_type (str): the plugin type to load plg_subtype (str): the plugin sub-type to load Returns: Empty instance of the correct class type """ # Find subtypes subtypes = [t for t in cls.plugin_subtypes if plg_type in cls.plugin_subtypes[t]] if plg_type in cls.plugin_types: subclass = cls.plugin_types[plg_type] elif plg_subtype in cls.plugin_types: subclass = cls.plugin_types[plg_subtype] elif len(subtypes) == 1: subclass = cls.plugin_types[subtypes[0]] else: raise PluginError("Plugin type {}/{} is not recognized by pyCIF" .format(plg_type, plg_subtype)) return getattr( importlib.import_module(subclass[0], __package__), subclass[1], )
[docs] @classmethod def load_registered(cls, name, version, plg_type, plg_subtype="", plg_orig=None): """Get a sub-class instance of a registered plugin. This can be used to get default required plugins. Args: name (str): name of the plugin version (str): version of the plugin plg_type (str): type of plugin plg_subtype (str): sub-type of plugin plg_orig (Plugin): Original plugin from which to copy attributes to the new plugin Returns: Plugin: a new plugin of correct type """ registered = cls.is_registered(name, version, plg_type, plg_subtype) plgtmp = cls.get_registered(name, version, plg_type, plg_subtype) if plg_orig is not None: for attr in plg_orig.attributes: setattr(plgtmp, attr, getattr(plg_orig, attr)) plgtmp.attributes = plg_orig.attributes[:] # Adding plugin attribute (name, version, plg_type, plg_subtype) = registered[1] plgtmp.plugin = cls.from_dict( {"name": name, "version": version, "type": plg_type, "subtype": plg_subtype} ) # Creating a sub-class instance and initializing it plgtmp = cls.childclass_factory(plg_orig=plgtmp) # Initializing the sub-class instance plgtmp.initiate_template() return plgtmp
def _load_plugin_type(self, key, parent_plg_type=None): """Load plugin type and add it to the plugin if not already specified. Resolves the plugin type from ``key``: if ``key`` is a recognised top-level type it is used directly; if it is a recognised sub-type the parent type is inferred; otherwise ``parent_plg_type`` is used as a fallback. Updates ``self.plugin.type`` and ``self.plugin.subtype`` in place. Args: key (str): The YAML attribute name used as a candidate plugin type. parent_plg_type (str, optional): The last recognised plugin type inherited from a parent plugin. Used as fallback when ``key`` is not a known type or sub-type. Returns: str: The resolved top-level plugin type (e.g. ``"datastream"``). Raises: Exception: If the type/subtype combination cannot be matched to any registered or allowed plugin. """ # Choosing the correct type according to the plugin name, or its last # known authorized parend plugin plg_subtype = "" if self.is_allowed(key): plg_type = key elif self.is_allowed_as_subtype(key)[0]: plg_type = self.is_allowed_as_subtype(key)[1] plg_subtype = key else: plg_type = parent_plg_type # Initializing the attribute plugin to store type, name and version # Update the type if not already given plg = getattr(self, "plugin", None) if plg is None: self.plugin = self.from_dict( {"name": None, "version": None, "type": plg_type, "subtype": plg_subtype} ) else: if getattr(self.plugin, "type", None) is None: self.plugin.type = plg_type if getattr(self.plugin, "subtype", "") == "": self.plugin.subtype = plg_subtype # Check that the definition from the Yaml is consistent if self.is_allowed(self.plugin.type) \ and self.plugin.subtype in self.plugin_subtypes[self.plugin.type]: return plg_type name = self.plugin.name version = self.plugin.version plg_type = self.plugin.type plg_subtype = self.plugin.subtype if self.is_allowed(plg_type): matching = [k[3] for k in self.registered.keys() if (k[0], k[1], k[2]) == (name, version, plg_type)] if len(matching) == 1: self.plugin.subtype = matching[0] return plg_type else: matching = [k for k in self.registered.keys() if (k[0], k[1], k[3]) == (name, version, plg_type)] if len(matching) == 1: self.plugin.subtype = matching[0][3] self.plugin.type = matching[0][2] return plg_type # If plugin is only defined as empty with no name, # just check that it is allowed if name is None and version is None: if self.is_allowed(plg_type): return plg_type else: matching = [t for t in self.plugin_subtypes if plg_type in self.plugin_subtypes[t]] if len(matching) == 1: self.plugin.subtype = self.plugin.type self.plugin.type = matching[0] return self.plugin.type raise Exception( "There is some error in the definition of your Yaml or, if you " "are a developer, in the new plugin you are designing: \n" "Trying to load the following plugin: \n" "{}:\n" " plugin:\n" " name: {}\n" " version: {}\n" " type: {}\n" " subtype: {}\n\n" "Available types and subtypes are: \n{}\n" "Please check spelling in your definition\n\n" "If you are initializing an empty plugin, please be sure " "to define it with name = None AND version = None.".format( key, name, version, plg_type, plg_subtype, "\n".join(["- {}/{}".format(t, st) for t in self.plugin_subtypes for st in self.plugin_subtypes[t]]) ) )
[docs] @classmethod def save_loaded(cls, plg): """Saves all loaded plugins to the class Args: plg (Plugin): plugin to save """ name = plg.plugin.name version = plg.plugin.version plugin_type = plg.plugin.type plugin_subtype = plg.plugin.subtype cls.loaded_instances[ cls.plugin_key(name, version, plugin_type, plugin_subtype)] = plg
@classmethod def _save_refplugins(cls, plg): """Save the level-0 plugin attributes of a setup as reference instances. Iterates over every attribute of ``plg`` that is itself a Plugin, resolves its type, and stores it in ``Plugin.reference_instances`` so it can be retrieved later as a default requirement. Should be called only once at initialisation. Args: plg (Plugin): The root setup plugin to scan. """ for attr in plg.attributes: plg_attr = getattr(plg, attr) if issubclass(type(plg_attr), Plugin): plg_attr._load_plugin_type(attr) name = plg_attr.plugin.name version = plg_attr.plugin.version plugin_type = plg_attr.plugin.type plugin_subtype = plg_attr.plugin.subtype setattr(plg_attr, "isreference", True) else: name = version = plugin_type = plugin_subtype = attr cls.reference_instances[plugin_type] = plg_attr cls.reference_instances["reference_setup"] = plg @classmethod def _save_subrefplugins(cls, plg, parent_plg_type="setup", tree=""): """Recursively save all sub-level plugin attributes as sub-reference instances. Walks the full attribute tree of ``plg`` and registers every Plugin-typed attribute in ``Plugin.subreference_instances`` keyed by its slash-separated YAML path. Should be called only once at initialisation. Args: plg (Plugin): Plugin whose attribute tree is to be scanned. parent_plg_type (str): Plugin type of the nearest recognised ancestor. Defaults to ``"setup"``. tree (str): Slash-separated YAML path accumulated during recursion. Defaults to ``""``. """ if not hasattr(plg, "attributes"): return for attr in plg.attributes: plg_attr = getattr(plg, attr) if issubclass(type(plg_attr), Plugin): plg_attr._load_plugin_type(attr, parent_plg_type) name = plg_attr.plugin.name version = plg_attr.plugin.version plugin_type = plg_attr.plugin.type plugin_subtype = plg_attr.plugin.subtype else: name = version = plugin_type = plugin_subtype = attr plg_tree = "{}/{}".format(tree, attr) cls._save_subrefplugins(plg_attr, parent_plg_type, tree=plg_tree) if (plugin_type, plugin_subtype) in cls.subreference_instances: cls.subreference_instances[ (plugin_type, plugin_subtype)][plg_tree] = plg_attr else: cls.subreference_instances[ (plugin_type, plugin_subtype)] = {plg_tree: plg_attr}
[docs] @classmethod def from_dict(cls, def_dict, orig_name="", convert_none=False, **kwargs): """Load a recursive dictionary structure into a Plugin. Each key of ``def_dict`` becomes an attribute of the returned Plugin. Nested dicts are loaded recursively into child Plugin instances. Args: def_dict (dict): The definition dictionary to load. orig_name (str): Name label assigned to the returned plugin (stored as ``plg.orig_name``). Defaults to ``""``. convert_none (bool): If True, YAML keys whose value is ``None`` are initialised as empty ``Plugin()`` objects instead of ``None``. Defaults to False. **kwargs: Forwarded to recursive calls; not used directly. Returns: Plugin: Plugin instance populated with the dictionary contents. """ # Loop over keys to be loaded # Recursively initialize from dictionary plg = cls() for key in def_dict: if isinstance(def_dict[key], dict): setattr( plg, key, cls.from_dict( def_dict[key], orig_name=key, convert_none=convert_none, **kwargs ), ) # Initializes empty keys as Plugins elif def_dict[key] is None and convert_none: setattr(plg, key, cls()) else: setattr(plg, key, def_dict[key]) # Saves the definition keys of the original dictionary into the # output plugin plg.attributes = list(def_dict.keys()) # Saves the name of the plugin as specified in the configuration file plg.orig_name = orig_name return plg
[docs] @classmethod def to_dict( cls, plg, exclude_patterns=None, full_output=False, large_to_strings=False, exclude_default=True): """Convert a Plugin instance to a plain nested dictionary. Args: plg (Plugin): Plugin instance to serialise. exclude_patterns (list[str], optional): Additional attribute names to exclude. The default exclusion list (``logfile``, ``datei``, ``datef``, ``workdir``, ``verbose``) is always appended unless ``exclude_default`` is False. full_output (bool): If True, include large array-like attributes in full rather than summarising them as type/shape strings. Defaults to False. large_to_strings (bool): If True, replace arrays/dicts/lists with more than 10 elements by a short descriptive string. Defaults to False. exclude_default (bool): If True (default), always exclude the standard set of runtime attributes (dates, workdir, …). Returns: dict: Nested dictionary representation of the plugin. """ default_exclude = [] if exclude_default: default_exclude = [ "logfile", "datei", "datef", "workdir", "verbose" ] if exclude_patterns is None: exclude_patterns = default_exclude else: exclude_patterns.extend(default_exclude) # If the input has no method 'attributes', just return it if not hasattr(plg, "attributes") or isinstance(plg, list): return plg out = {} for attr in plg.attributes: plg_attr = getattr(plg, attr) # Otherwise, save the attribute value if ( not ismethod(plg_attr) and not isinstance(plg_attr, FunctionType) and not isinstance(plg_attr, ModuleType) and not attr[0] == "_" and attr not in dir(cls) and attr not in [ "absolute_import", "loaded_class", "loaded_data", "loaded_attributes", "loaded_requirements", "loaded_template", "orig_name", "requirements", "attributes", "isreference", "Method_type", "MethodType", "default_requirements", "mapper", ] + exclude_patterns and "Feature" not in str(plg_attr) ): # If the attribute is a Plugin sub-class, # recursively call to_dict if hasattr(plg_attr, "to_dict"): out[attr] = cls.to_dict( plg_attr, exclude_patterns, full_output=full_output, large_to_strings=large_to_strings ) continue # Deal with lists try: if type(plg_attr) == str: out[attr] = plg_attr elif len(plg_attr) > 10 and large_to_strings: if type(plg_attr) \ in [np.ndarray, pd.core.frame.DataFrame]: out[attr] = "{} {}".format( type(plg_attr), plg_attr.shape ) elif type(plg_attr) == dict: out[attr] = "{} {} keys".format( type(plg_attr), len(plg_attr.keys()) ) elif type(plg_attr) == list: out[attr] = "{} len({})".format( type(plg_attr), len(plg_attr) ) elif len(plg_attr) < 10 or full_output: out[attr] = plg_attr except TypeError: out[attr] = plg_attr # Removing the __class__ attribute if "__class__" in list(out.keys()): del out["__class__"] return out
[docs] @classmethod def to_yaml(cls, plg, yaml_file, full=True): """Serialise a Plugin to a YAML configuration file. Args: plg (Plugin): Plugin instance to write. yaml_file (str): Destination file path. full (bool): Passed to :meth:`to_dict` (currently unused). Defaults to True. """ plg_dict = cls.to_dict(plg) with open(yaml_file, "w") as f: ordered_dump(f, plg_dict)
[docs] @classmethod def from_yaml(cls, def_file): """Generates a dictionary including all pyCIF parameters Args: def_file (string) : Path to the definition file Handles both absolute and relative paths Returns: config_dict (dictionary): Dictionary populated with all pyCIF parameters """ yml_file = os.path.abspath(os.path.expanduser(def_file)) try: with open(yml_file, "r") as f: config_dict = ordered_load(f) config_dict["def_file"] = yml_file if "datei" in config_dict: # Converting dates to datetime if necessary config_dict["datei"] = dates.date2datetime( config_dict["datei"] ) config_dict["datef"] = dates.date2datetime( config_dict["datef"] ) return config_dict except IOError as e: print("Couldn't find config file: {}".format(yml_file)) print("Please check directories") raise e except yaml.scanner.ScannerError as e: print("Error in the syntax of config file: {}".format(yml_file)) raise e
[docs] @classmethod def print_default(cls, plg): """Print default parameter values for a plugin, if available. Args: plg (Plugin): Plugin instance whose ``default_values`` to display. """ if not hasattr(plg, "default_values"): print( """{} ({}, {}, {}) has no default values""".format( plg, plg.plugin.name, plg.plugin.version, plg.plugin.type ) ) return print( """The default values of {} ({}, {}, {}) are:""".format( plg, plg.plugin.name, plg.plugin.version, plg.plugin.type ) ) for k in plg.default_values: print("- {}:\t{}".format(k, plg.default_values[k]))
[docs] @classmethod def childclass_factory(cls, plg_orig, child_type=None, parent_plg=None, overwrite=False): """Generates an instance of one of Plugin's child classes. Transfers all existing attributes in the argument plugin to the output child-class instance Args: plg_orig (Plugin): the plugin to turn into a child-class instance child_type (str): sub-class type to generate if not available in plg_orig.plugin.type overwrite (bool): overwrite the class type of the origin plugin Return: child_plg: a plugin with all the attributes from plg_orig, but as a child-class instance """ plg_type = getattr(plg_orig.plugin, "type", None) if plg_type is None or plg_type == "setup": if child_type is None and plg_type is None: raise PluginError( "The Child-class factory was called on a plugin " "that was not correctly initialized: {} / {}".format( plg_orig, plg_orig.orig_name ) ) else: plg_type = child_type plg_orig._load_plugin_type(plg_type) else: plg_type = plg_orig.plugin.type # If the type is not referenced, don't do anything if not cls.is_allowed(plg_type): return plg_orig # Load the subclass child_plg = cls.get_subclass(plg_type)(plg_orig=plg_orig) # Replace name and version if empty if parent_plg is not None and hasattr(parent_plg, "plugin"): if child_plg.plugin.name is None: child_plg.plugin.name = parent_plg.plugin.name child_plg.plugin.from_parent = True if child_plg.plugin.version is None: child_plg.plugin.version = parent_plg.plugin.version child_plg.plugin.from_parent = True return child_plg
[docs] def initiate_template(self, plg_type=None, default_functions={}): """Initialise a Plugin template by loading functions from the registered module. Calls :meth:`initiate` to fetch the module, then attaches the functions listed in ``default_functions`` to ``self``, either as bound methods or as plain function references. Also attaches ``set_requirements`` if defined in the module. Args: plg_type (str, optional): Plugin type passed to :meth:`initiate`. Defaults to None (inferred from ``self.plugin.type``). default_functions (dict[str, bool]): Mapping of function name → ``True`` if the function should be bound as a method (receives ``self`` as first argument), ``False`` for a static attachment. """ # First load the module module = self.initiate(plg_type=plg_type) # Attach the list of functions for f in default_functions: if not hasattr(module, f): continue if default_functions[f]: setattr(self, f, MethodType(getattr(module, f), self)) else: setattr(self, f, getattr(module, f)) # Add set_requirements if hasattr(module, "set_requirements"): self.set_requirements = MethodType(module.set_requirements, self)
[docs] def initiate(self, plg_type=None): """Initializes a Plugin, i.e., loads functions from registered plugins Args: plg_type (str): the type of plugin to load; this should correspond to one of the defined child-classes Return: module: a python module as registered in pyCIF """ # It there is no attribute 'plugin', can't initialize anything as # python will not know the plugin type, name and version plugin = getattr(self, "plugin", None) if plugin is None: return # Load plugin IDs name = getattr(plugin, "name", None) version = getattr(plugin, "version", None) plg_subtype = getattr(plugin, "subtype", "") if plg_type is None: plg_type = plugin.type # Load registered module if the Plugin's name and version are not # default empty strings if name is not None: try: module = self.get_registered( name, version, plg_type, plg_subtype) # Attributing all module functions to the plugin functions = [ f for f in dir(module) if f not in getattr(module, "attributes", []) and f[0] != "_" and f != "attributes" ] for attr in functions: f = getattr(module, attr) if not isinstance(f, ModuleType): setattr(self, attr, f) # Ini_data should be define as a Method Type if hasattr(module, "ini_data"): self.ini_data = MethodType(module.ini_data, self) except PluginError as e: # If plugin initialized from parent, ignore if not getattr(plugin, "from_parent", False): raise e else: self.plugin.name = None self.plugin.version = None return return module
[docs] @classmethod def flushall(cls): """Reset all class-level plugin caches. Clears ``loaded_instances``, ``reference_instances`` and ``subreference_instances``. Useful between independent simulation runs or in test suites to avoid cross-contamination between setups. """ cls.subreference_instances = {} cls.loaded_instances = {} cls.reference_instances = {}
# import sys # mod2flush = [mod for mod in sys.modules if "pycif" in mod] # for mod in mod2flush: # del sys.modules[mod]
[docs] def set_requirements(self): """Update requirements depending on the Plugin properties By default, this method does nothing. It can be included in any new Plugin depending on the developer needs. Args: self (Plugin): the Plugin to update """ return
[docs] @classmethod def dump_incorrect(cls, file_dump): """Dump undeclared YAML arguments to a text file for inspection. Writes one line per plugin tree path listing the argument names that were found in the YAML but are not declared in the plugin's ``input_arguments``. Args: file_dump (str): Path to the output file. """ with open(file_dump, "w") as f: for k in cls.unauthorized_arguments: f.write(f"- {k}: {cls.unauthorized_arguments[k]}\n")