Source code for pycif.utils.classes.baseclass

import importlib
import os
from inspect import ismethod
from types import MethodType, FunctionType, ModuleType
import sys

import numpy as np
import pandas as pd
import yaml

from ...utils.check.errclass import PluginError
from ...utils.yml import ordered_load, ordered_dump
from ...utils import dates


[docs]class Plugin(object): """The Plugin class is the parent class of all other pyCIF classes. It is used to store all information about sub-parts of pyCIF and loads if needed sub-modules. """ # Authorized Plugins plugin_types = { "chemistry": [".chemistries", "Chemistry"], "controlvect": [".controlvects", "ControlVect"], "datavect": [".datavects", "DataVect"], "datastream": [".datastreams", "DataStream"], "domain": [".domains", "Domain"], "measurements": [".measurements", "Measurement"], "minimizer": [".minimizers", "Minimizer"], "mode": [".modes", "Mode"], "model": [".models", "Model"], "obsoperator": [".obsoperators", "ObsOperator"], "obsparser": [".obsparsers", "ObsParser"], "obsvect": [".obsvects", "ObsVect"], "platform": [".platforms", "Platform"], "simulator": [".simulators", "Simulator"], "transform": [".transforms", "Transform"], "setup": [".setup", "Setup"], } plugin_subtypes = {t: {"": ""} for t in plugin_types} plugin_subtypes["datastream"] = { "meteo": ".meteos", "flux": ".fluxes", "background": ".backgrounds", "field": ".fields" } plugin_subtypes["transform"] = { "basic": ".basic", "complex": ".complex", "system": ".system" } # Registered and already loaded plugins registered = {} loaded_instances = {} reference_instances = {} subreference_instances = {} # Unauthorized arguments unauthorized_arguments = {} # Reference Setup ref_config = None # Maximum recursive level __maxrecursive__ = 50 def __init__(self, plg_orig=None, orig_name="", **kwargs): """Initializes the Plugin. Args: plg_orig (Plugin): if specified, initialize a new plugin from attributes of the origin plugin """ # Set the orig_name self.orig_name = orig_name # Update the plugin with attributes from plg_orig attributes = self._get_attribute_list(plg_orig) for attr in attributes: if not callable(getattr(self, attr, None)): setattr(self, attr, getattr(plg_orig, attr)) # Initialize the list of attributes if not already done if not hasattr(self, "attributes"): self.attributes = [] # Update attributes according to args if len(kwargs) > 0: for kw in kwargs: setattr(self, kw, kwargs[kw]) # Updating requirements in any self.default_requirements = { key: {"any": True, "empty": True} for key in [ "datei", "datef", "workdir", "logfile", "verbose" ] } if not hasattr(self, "requirements"): self.requirements = {} @classmethod def _get_attribute_list(cls, plg): """Get the list of attributes excluding special methods""" return [a for a in dir(plg) if not a.startswith("_")]
[docs] @staticmethod def plugin_key(name, version, plugin_type, subtype): """Creates the key for a name, version and plugin type Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: set: dict key for a plugin and version """ return name, version, plugin_type, subtype
[docs] @classmethod def print_registered( cls, print_requirement=False, print_rst=False, types=[], names=[], versions=[], stream=None ): """Print in a user-friendly format the list of available plugins Args: print_requirement (bool): for each registered plugin, print its requirements print_rst (bool): print in rst format for automatic use in the documentations types (list): list of types of Plugins to print names (list): list of names of Plugins to print version (list): list of versions of Plugins to print stream: stream to which send the display. By default, stdout is used. A stream to a given file can be used instead. """ if stream is None: stream = sys.stdout keys = [ k for k in list(cls.registered.keys()) if (k[0] in names or names == []) and (k[1] in versions or versions == []) and (k[2] in types or types == []) ] names, versions, types, subtypes = list(zip(*keys)) modules = [ cls.get_registered(n, v, t, st) for n, v, t, st in zip(names, versions, types, subtypes) ] print( "List of all available plugins and requirements " "for each class: \n\n", file=stream ) all_types = sorted(list(set(types))) for tt in all_types: rst_type = cls.plugin_types[tt][0][1:] if not print_rst: print("\t", tt, file=stream) else: print("\t", ":doc:`{}</documentation/plugins/{}/index>`" .format(tt, rst_type), file=stream) plg_type_list = sorted([ (n, v, t, st, mod) for n, v, t, st, mod in zip(names, versions, types, subtypes, modules) if t == tt]) sub_types = sorted(list(set([t[3] for t in plg_type_list]))) for stt in sub_types: rst_subtype = cls.plugin_subtypes[tt][stt][1:] indent = "\t\t" if len(sub_types) > 1: indent = "\t\t\t" if not print_rst: print("\t\t - ", stt, file=stream) else: print("\t\t - ", ":doc:`{}</documentation/plugins/{}/{}/index>`" .format(stt, rst_type, rst_subtype), file=stream) plg_subtype_list = sorted([ (n, v, t, st, mod) for n, v, t, st, mod in plg_type_list if st == stt]) for n, v, t, st, mod in plg_subtype_list: if not print_rst: print("{}- {}, {}".format(indent, n, v), file=stream) else: print("{}- :doc:`{}, {}</documentation/plugins/{}/{}/{}>`" .format(indent, n, v, rst_type, rst_subtype, cls.registered[n, v, t, st].split(".")[-1]), file=stream) # Print requirements if print_requirement and hasattr(mod, "requirements"): print("{}\tRequires:".format(indent), file=stream) for req in mod.requirements: mod_req = mod.requirements[req] name = mod_req.get("name", None) version = mod_req.get("version", None) any = mod_req.get("any", False) empty = mod_req.get("empty", "") req_type = mod_req.get("type", req) req_subtype = mod_req.get("subtype", "") if not print_rst: print("{}\t\t- {}:".format(indent, req), file=stream) else: plg_req = cls.from_dict({ "plugin": { "name": name, "version": version, "type": req_type, "subtype": req_subtype } }) plg_req._load_plugin_type(req) req_rst_type = \ cls.plugin_types[plg_req.plugin.type][0][1:] req_rst_subtype = \ cls.plugin_subtypes[ plg_req.plugin.type][ plg_req.plugin.subtype][1:] print("{}\t\t" "- :doc:`{}</documentation/plugins/{}/{}/index>`:" .format(indent, req, req_rst_type, req_subtype), file=stream) print( "{}\t\t\t- name: {}".format(indent, name), file=stream) print("{}\t\t\t- version: {}".format(indent, version), file=stream) print( "{}\t\t\t- any: {}".format(indent, any), file=stream) print( "{}\t\t\t- empty: {}".format(indent, empty), file=stream) print("\n", file=stream)
[docs] @classmethod def is_registered(cls, name, version, plugin_type, subtype): """Check if a plugin is registered Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: bool: True if a parser is registered key: the key of the registered Plugin if registered """ if cls.plugin_key(name, version, plugin_type, subtype) in cls.registered: return True, cls.plugin_key(name, version, plugin_type, subtype) elif subtype == "": # If given a sub-type, loop on all types and check if valid sub-type if not cls.is_allowed(plugin_type): matching = [k for k in cls.registered.keys() if (k[0], k[1], k[3]) == (name, version, plugin_type)] # Otherwise loop on sub-types else: matching = [k for k in cls.registered.keys() if (k[0], k[1], k[2]) == (name, version, plugin_type)] if len(matching) == 1: return True, matching[0] elif len(matching) > 1: raise PluginError("Multiple plugins matching required definition for " "{}/{}/{}: \n{}".format( name, version, plugin_type, matching)) return False, cls.plugin_key(name, version, plugin_type, subtype)
[docs] @classmethod def is_allowed(cls, plugin_type): """Check whether a plugin type is allowed in pyCIF or not Args: plugin_type (str): type of plugin Returns: bool: True if allowed plugin """ return plugin_type in cls.plugin_types
[docs] @classmethod def is_allowed_as_subtype(cls, plugin_type): """Check whether a plugin type is allowed as the sub-type of a main type in pyCIF or not Args: plugin_type (str): type of plugin Returns: bool: True if allowed plugin """ matching = [ t for t in cls.plugin_subtypes if plugin_type in cls.plugin_subtypes[t]] if len(matching) == 1: return True, matching[0] return False, matching
[docs] @classmethod def is_loaded(cls, name, version, plugin_type, subtype=""): """Check whether a plugin is loaded Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: bool """ return ( cls.plugin_key(name, version, plugin_type, subtype) in cls.loaded_instances )
[docs] @classmethod def register_plugin(cls, name, version, module, plugin_type="", subtype="", **kwargs): """Register a module for a plugin and version with possibly options Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin module (types.ModuleType): module defining the interface between pyCIF and the plugin **kwargs (dictionary): default options for module """ registered = cls.is_registered(name, version, plugin_type, subtype) if registered[0]: raise ValueError( "Already created a Module " "for plugin {} (version {}) and type {}".format( name, version, plugin_type ) + ("and sub-type {}".format(subtype) if subtype != "" else "") ) cls.registered[registered[1]] = module.__name__
[docs] @classmethod def get_registered(cls, name, version, plugin_type, subtype=""): """Get the correct registered plugin, given its name, version and type Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: Plugin: plugin module for plugin version """ registered = cls.is_registered(name, version, plugin_type, subtype) if not registered[0]: raise PluginError( "No {}/{} module found for plugin {} and Version {}".format( plugin_type, subtype, name, version ) ) module = importlib.import_module(cls.registered[registered[1]]) # Get input_arguments from the main class pycif_package = __package__.split(".")[0] class_package = "{}.plugins{}".format( pycif_package, cls.plugin_types[registered[1][2]][0]) class_module = importlib.import_module(class_package) module.input_arguments = getattr(module, "input_arguments", {}) if hasattr(class_module, "input_arguments"): module.input_arguments = dict( class_module.input_arguments, **module.input_arguments) # Clean sub-modules that are also in input argument for arg_in in module.input_arguments: if hasattr(module, arg_in): delattr(module, arg_in) return module
[docs] @classmethod def get_loaded(cls, name, version, plugin_type, subtype=""): """Get the correct loaded plugin, given its name, version and type (and optionally subtype) Args: name (str): name of the plugin version (str): version of the plugin plugin_type (str): type of plugin subtype (str): sub-type of plugin Returns: Plugin: plugin module for plugin version """ if not cls.is_loaded(name, version, plugin_type, subtype=""): raise PluginError( "No {} module loaded for plugin {} and Version {}".format( plugin_type, name, version ) ) return cls.loaded_instances[cls.plugin_key(name, version, plugin_type, subtype)]
[docs] @classmethod def get_subclass(cls, plg_type, plg_subtype=""): """Get the plugin class template from a given type Args: plg_type (str): the plugin type to load plg_subtype (str): the plugin sub-type to load Returns: Empty instance of the correct class type """ # Find subtypes subtypes = [t for t in cls.plugin_subtypes if plg_type in cls.plugin_subtypes[t]] if plg_type in cls.plugin_types: subclass = cls.plugin_types[plg_type] elif plg_subtype in cls.plugin_types: subclass = cls.plugin_types[plg_subtype] elif len(subtypes) == 1: subclass = cls.plugin_types[subtypes[0]] else: raise PluginError("Plugin type {}/{} is not recognized by pyCIF" .format(plg_type, plg_subtype)) return getattr( importlib.import_module(subclass[0], __package__), subclass[1], )
[docs] @classmethod def load_registered(cls, name, version, plg_type, plg_subtype="", plg_orig=None): """Get a sub-class instance of a registered plugin. This can be used to get default required plugins. Args: name (str): name of the plugin version (str): version of the plugin plg_type (str): type of plugin plg_subtype (str): sub-type of plugin plg_orig (Plugin): Original plugin from which to copy attributes to the new plugin Returns: Plugin: a new plugin of correct type """ registered = cls.is_registered(name, version, plg_type, plg_subtype) plgtmp = cls.get_registered(name, version, plg_type, plg_subtype) if plg_orig is not None: for attr in plg_orig.attributes: setattr(plgtmp, attr, getattr(plg_orig, attr)) plgtmp.attributes = plg_orig.attributes[:] # Adding plugin attribute (name, version, plg_type, plg_subtype) = registered[1] plgtmp.plugin = cls.from_dict( {"name": name, "version": version, "type": plg_type, "subtype": plg_subtype} ) # Creating a sub-class instance and initializing it plgtmp = cls.childclass_factory(plg_orig=plgtmp) # Initializing the sub-class instance plgtmp.initiate_template() return plgtmp
def _load_plugin_type(self, key, parent_plg_type=None): """Load plugin type and add it to the plugin if not already specified Args: key (str): the plugin type parent_plg_type (str): the parent plugin type that is inherited from higher level plugins """ # Choosing the correct type according to the plugin name, or its last # known authorized parend plugin plg_subtype = "" if self.is_allowed(key): plg_type = key elif self.is_allowed_as_subtype(key)[0]: plg_type = self.is_allowed_as_subtype(key)[1] plg_subtype = key else: plg_type = parent_plg_type # Initializing the attribute plugin to store type, name and version # Update the type if not already given plg = getattr(self, "plugin", None) if plg is None: self.plugin = self.from_dict( {"name": None, "version": None, "type": plg_type, "subtype": plg_subtype} ) else: if getattr(self.plugin, "type", None) is None: self.plugin.type = plg_type if getattr(self.plugin, "subtype", "") == "": self.plugin.subtype = plg_subtype # Check that the definition from the Yaml is consistent if self.is_allowed(self.plugin.type) \ and self.plugin.subtype in self.plugin_subtypes[self.plugin.type]: return plg_type name = self.plugin.name version = self.plugin.version plg_type = self.plugin.type plg_subtype = self.plugin.subtype if self.is_allowed(plg_type): matching = [k[3] for k in self.registered.keys() if (k[0], k[1], k[2]) == (name, version, plg_type)] if len(matching) == 1: self.plugin.subtype = matching[0] return plg_type else: matching = [k for k in self.registered.keys() if (k[0], k[1], k[3]) == (name, version, plg_type)] if len(matching) == 1: self.plugin.subtype = matching[0][3] self.plugin.type = matching[0][2] return plg_type # If plugin is only defined as empty with no name, # just check that it is allowed if name is None and version is None: if self.is_allowed(plg_type): return plg_type else: matching = [t for t in self.plugin_subtypes if plg_type in self.plugin_subtypes[t]] if len(matching) == 1: self.plugin.subtype = self.plugin.type self.plugin.type = matching[0] return self.plugin.type raise Exception( "There is some error in the definition of your Yaml or, if you " "are a developer, in the new plugin you are designing: \n" "Trying to load the following plugin: \n" "{}:\n" " plugin:\n" " name: {}\n" " version: {}\n" " type: {}\n" " subtype: {}\n\n" "Available types and subtypes are: \n{}\n" "Please check spelling in your definition\n\n" "If you are initializing an empty plugin, please be sure " "to define it with name = None AND version = None.".format( key, name, version, plg_type, plg_subtype, "\n".join(["- {}/{}".format(t, st) for t in self.plugin_subtypes for st in self.plugin_subtypes[t]]) ) )
[docs] @classmethod def save_loaded(cls, plg): """Saves all loaded plugins to the class Args: plg (Plugin): plugin to save """ name = plg.plugin.name version = plg.plugin.version plugin_type = plg.plugin.type plugin_subtype = plg.plugin.subtype cls.loaded_instances[ cls.plugin_key(name, version, plugin_type, plugin_subtype)] = plg
@classmethod def _save_refplugins(cls, plg): """Save all level 0 attributes to the class. Should be called only once Args: plg (Plugin): plugin to save """ for attr in plg.attributes: plg_attr = getattr(plg, attr) if issubclass(type(plg_attr), Plugin): plg_attr._load_plugin_type(attr) name = plg_attr.plugin.name version = plg_attr.plugin.version plugin_type = plg_attr.plugin.type plugin_subtype = plg_attr.plugin.subtype setattr(plg_attr, "isreference", True) else: name = version = plugin_type = plugin_subtype = attr cls.reference_instances[plugin_type] = plg_attr cls.reference_instances["reference_setup"] = plg @classmethod def _save_subrefplugins(cls, plg, parent_plg_type="setup", tree=""): """Save all attributes to the class. Should be called only once Args: plg (Plugin): plugin to save """ if not hasattr(plg, "attributes"): return for attr in plg.attributes: plg_attr = getattr(plg, attr) if issubclass(type(plg_attr), Plugin): plg_attr._load_plugin_type(attr, parent_plg_type) name = plg_attr.plugin.name version = plg_attr.plugin.version plugin_type = plg_attr.plugin.type plugin_subtype = plg_attr.plugin.subtype else: name = version = plugin_type = plugin_subtype = attr plg_tree = "{}/{}".format(tree, attr) cls._save_subrefplugins(plg_attr, parent_plg_type, tree=plg_tree) if (plugin_type, plugin_subtype) in cls.subreference_instances: cls.subreference_instances[ (plugin_type, plugin_subtype)][plg_tree] = plg_attr else: cls.subreference_instances[ (plugin_type, plugin_subtype)] = {plg_tree: plg_attr}
[docs] @classmethod def from_dict(cls, def_dict, orig_name="", convert_none=False, **kwargs): """Loads a recursive dictionary structure into a Plugin Args: def_dict (dict): the definition dictionary orig_dict (dict): the definition dictionary used at level 0 Returns: Plugin """ # Loop over keys to be loaded # Recursively initialize from dictionary plg = cls() for key in def_dict: if isinstance(def_dict[key], dict): setattr( plg, key, cls.from_dict( def_dict[key], orig_name=key, convert_none=convert_none, **kwargs ), ) # Initializes empty keys as Plugins elif def_dict[key] is None and convert_none: setattr(plg, key, cls()) else: setattr(plg, key, def_dict[key]) # Saves the definition keys of the original dictionary into the # output plugin plg.attributes = list(def_dict.keys()) # Saves the name of the plugin as specified in the configuration file plg.orig_name = orig_name return plg
[docs] @classmethod def to_dict( cls, plg, exclude_patterns=None, full=False): """Turns a Plugin to a dictionary for easier saving. Args: plg (Plugin): a Plugin instance to be saved as a dictionary """ default_exclude = [ "logfile", "datei", "datef", "workdir", "verbose" ] if exclude_patterns is None: exclude_patterns = default_exclude else: exclude_patterns.extend(default_exclude) # If the input has no method 'attributes', just return it if not hasattr(plg, "attributes") or isinstance(plg, list): return plg out = {} for attr in plg.attributes: plg_attr = getattr(plg, attr) # Otherwise, save the attribute value if ( not ismethod(plg_attr) and not isinstance(plg_attr, FunctionType) and not isinstance(plg_attr, ModuleType) and not attr[0] == "_" and attr not in dir(cls) and attr not in [ "absolute_import", "loaded_class", "loaded_data", "loaded_attributes", "loaded_requirements", "loaded_template", "orig_name", "requirements", "attributes", "isreference", "Method_type", "MethodType", "default_requirements", "mapper", ] + exclude_patterns and "Feature" not in str(plg_attr) ): # If the attribute is a Plugin sub-class, # recursively call to_dict if hasattr(plg_attr, "to_dict"): out[attr] = cls.to_dict(plg_attr, exclude_patterns, full) continue # Deal with lists try: if type(plg_attr) == str: out[attr] = plg_attr elif len(plg_attr) > 10 and full: if type(plg_attr) \ in [np.ndarray, pd.core.frame.DataFrame]: out[attr] = "{} {}".format( type(plg_attr), plg_attr.shape ) elif type(plg_attr) == dict: out[attr] = "{} {} keys".format( type(plg_attr), len(plg_attr.keys()) ) elif type(plg_attr) == list: out[attr] = "{} len({})".format( type(plg_attr), len(plg_attr) ) elif len(plg_attr) < 10: out[attr] = plg_attr except TypeError: out[attr] = plg_attr # Removing the __class__ attribute if "__class__" in list(out.keys()): del out["__class__"] return out
[docs] @classmethod def to_yaml(cls, plg, yaml_file, full=True): """Write a Yaml from a loaded plugin""" plg_dict = cls.to_dict(plg) with open(yaml_file, "w") as f: ordered_dump(f, plg_dict)
[docs] @classmethod def from_yaml(cls, def_file): """Generates a dictionary including all pyCIF parameters Args: def_file (string) : Path to the definition file Handles both absolute and relative paths Returns: config_dict (dictionary): Dictionary populated with all pyCIF parameters """ yml_file = os.path.abspath(os.path.expanduser(def_file)) try: with open(yml_file, "r") as f: config_dict = ordered_load(f) config_dict["def_file"] = yml_file if "datei" in config_dict: # Converting dates to datetime if necessary config_dict["datei"] = dates.date2datetime( config_dict["datei"] ) config_dict["datef"] = dates.date2datetime( config_dict["datef"] ) return config_dict except IOError as e: print("Couldn't find config file: {}".format(yml_file)) print("Please check directories") raise e except yaml.scanner.ScannerError as e: print("Error in the syntax of config file: {}".format(yml_file)) raise e
[docs] @classmethod def print_default(cls, plg): """Print default values if available""" if not hasattr(plg, "default_values"): print( """{} ({}, {}, {}) has no default values""".format( plg, plg.plugin.name, plg.plugin.version, plg.plugin.type ) ) return print( """The default values of {} ({}, {}, {}) are:""".format( plg, plg.plugin.name, plg.plugin.version, plg.plugin.type ) ) for k in plg.default_values: print("- {}:\t{}".format(k, plg.default_values[k]))
[docs] @classmethod def childclass_factory(cls, plg_orig, child_type=None, parent_plg=None, overwrite=False): """Generates an instance of one of Plugin's child classes. Transfers all existing attributes in the argument plugin to the output child-class instance Args: plg_orig (Plugin): the plugin to turn into a child-class instance child_type (str): sub-class type to generate if not available in plg_orig.plugin.type overwrite (bool): overwrite the class type of the origin plugin Return: child_plg: a plugin with all the attributes from plg_orig, but as a child-class instance """ plg_type = getattr(plg_orig.plugin, "type", None) if plg_type is None or plg_type == "setup": if child_type is None and plg_type is None: raise PluginError( "The Child-class factory was called on a plugin " "that was not correctly initialized: {} / {}".format( plg_orig, plg_orig.orig_name ) ) else: plg_type = child_type plg_orig._load_plugin_type(plg_type) else: plg_type = plg_orig.plugin.type # If the type is not referenced, don't do anything if not cls.is_allowed(plg_type): return plg_orig # Load the subclass child_plg = cls.get_subclass(plg_type)(plg_orig=plg_orig) # Replace name and version if empty if parent_plg is not None and hasattr(parent_plg, "plugin"): if child_plg.plugin.name is None: child_plg.plugin.name = parent_plg.plugin.name child_plg.plugin.from_parent = True if child_plg.plugin.version is None: child_plg.plugin.version = parent_plg.plugin.version child_plg.plugin.from_parent = True return child_plg
[docs] def initiate_template(self, plg_type=None, default_functions={}): """Initializes a Plugin template, with methods from the corresponding module. Args: self: the plugin to initialize plg_type: the type of the plugin to initialize default_functions (dict[str, bool]): functions to load from the module and to attach to the plugin. Each key names the function and each value is a boolean to determine whether the corresponding function is a class method (with a reflective self as argument) or a classical static function """ # First load the module module = self.initiate(plg_type=plg_type) # Attach the list of functions for f in default_functions: if not hasattr(module, f): continue if default_functions[f]: setattr(self, f, MethodType(getattr(module, f), self)) else: setattr(self, f, getattr(module, f)) # Add set_requirements if hasattr(module, "set_requirements"): self.set_requirements = MethodType(module.set_requirements, self)
[docs] def initiate(self, plg_type=None): """Initializes a Plugin, i.e., loads functions from registered plugins Args: plg_type (str): the type of plugin to load; this should correspond to one of the defined child-classes Return: module: a python module as registered in pyCIF """ # It there is no attribute 'plugin', can't initialize anything as # python will not know the plugin type, name and version plugin = getattr(self, "plugin", None) if plugin is None: return # Load plugin IDs name = getattr(plugin, "name", None) version = getattr(plugin, "version", None) plg_subtype = getattr(plugin, "subtype", "") if plg_type is None: plg_type = plugin.type # Load registered module if the Plugin's name and version are not # default empty strings if name is not None: try: module = self.get_registered( name, version, plg_type, plg_subtype) # Attributing all module functions to the plugin functions = [ f for f in dir(module) if f not in getattr(module, "attributes", []) and f[0] != "_" and f != "attributes" ] for attr in functions: f = getattr(module, attr) if not isinstance(f, ModuleType): setattr(self, attr, f) # Ini_data should be define as a Method Type if hasattr(module, "ini_data"): self.ini_data = MethodType(module.ini_data, self) except PluginError as e: # If plugin initialized from parent, ignore if not getattr(plugin, "from_parent", False): raise e else: self.plugin.name = None self.plugin.version = None return return module
@classmethod def flushall(cls): cls.subreference_instances = {} cls.loaded_instances = {} cls.reference_instances = {} # import sys # mod2flush = [mod for mod in sys.modules if "pycif" in mod] # for mod in mod2flush: # del sys.modules[mod]
[docs] def set_requirements(self): """Update requirements depending on the Plugin properties By default, this method does nothing. It can be included in any new Plugin depending on the developer needs. Args: self (Plugin): the Plugin to update """ return
[docs] @classmethod def dump_incorrect(cls, file_dump): """Dump incorrect arguments from the Yaml into a file :param file_dump: File where to dump :return: """ with open(file_dump, "w") as f: for k in cls.unauthorized_arguments: f.write(f"- {k}: {cls.unauthorized_arguments[k]}\n")
# print(cls.unauthorized_arguments) # print(__file__) # import code # code.interact(local=dict(locals(), **globals()))