import importlib
import os
from inspect import ismethod
from types import MethodType, FunctionType, ModuleType
import sys
import numpy as np
import pandas as pd
import yaml
from ...utils.check.errclass import PluginError
from ...utils.yml import ordered_load, ordered_dump
from ...utils import dates
[docs]class Plugin(object):
"""The Plugin class is the parent class of all other pyCIF classes.
It is used to store all information about sub-parts of pyCIF and loads if needed sub-modules.
"""
# Authorized Plugins
plugin_types = {
"chemistry": [".chemistries", "Chemistry"],
"controlvect": [".controlvects", "ControlVect"],
"datavect": [".datavects", "DataVect"],
"datastream": [".datastreams", "DataStream"],
"domain": [".domains", "Domain"],
"measurements": [".measurements", "Measurement"],
"minimizer": [".minimizers", "Minimizer"],
"mode": [".modes", "Mode"],
"model": [".models", "Model"],
"obsoperator": [".obsoperators", "ObsOperator"],
"obsparser": [".obsparsers", "ObsParser"],
"obsvect": [".obsvects", "ObsVect"],
"platform": [".platforms", "Platform"],
"simulator": [".simulators", "Simulator"],
"transform": [".transforms", "Transform"],
"setup": [".setup", "Setup"],
}
plugin_subtypes = {t: {"": ""} for t in plugin_types}
plugin_subtypes["datastream"] = {
"meteo": ".meteos",
"flux": ".fluxes",
"background": ".backgrounds",
"field": ".fields"
}
plugin_subtypes["transform"] = {
"basic": ".basic", "complex": ".complex", "system": ".system"
}
# Registered and already loaded plugins
registered = {}
loaded_instances = {}
reference_instances = {}
subreference_instances = {}
# Unauthorized arguments
unauthorized_arguments = {}
# Reference Setup
ref_config = None
# Maximum recursive level
__maxrecursive__ = 50
def __init__(self, plg_orig=None,
orig_name="", **kwargs):
"""Initializes the Plugin.
Args:
plg_orig (Plugin): if specified, initialize a new plugin from
attributes of the origin plugin
"""
# Set the orig_name
self.orig_name = orig_name
# Update the plugin with attributes from plg_orig
attributes = self._get_attribute_list(plg_orig)
for attr in attributes:
if not callable(getattr(self, attr, None)):
setattr(self, attr, getattr(plg_orig, attr))
# Initialize the list of attributes if not already done
if not hasattr(self, "attributes"):
self.attributes = []
# Update attributes according to args
if len(kwargs) > 0:
for kw in kwargs:
setattr(self, kw, kwargs[kw])
# Updating requirements in any
self.default_requirements = {
key: {"any": True, "empty": True}
for key in [
"datei", "datef", "workdir", "logfile", "verbose"
]
}
if not hasattr(self, "requirements"):
self.requirements = {}
@classmethod
def _get_attribute_list(cls, plg):
"""Get the list of attributes excluding special methods"""
return [a for a in dir(plg) if not a.startswith("_")]
[docs] @staticmethod
def plugin_key(name, version, plugin_type, subtype):
"""Creates the key for a name, version and plugin type
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
set: dict key for a plugin and version
"""
return name, version, plugin_type, subtype
[docs] @classmethod
def print_registered(
cls, print_requirement=False, print_rst=False,
types=[], names=[], versions=[], stream=None
):
"""Print in a user-friendly format the list of available plugins
Args:
print_requirement (bool): for each registered plugin, print its requirements
print_rst (bool): print in rst format for automatic use in the documentations
types (list): list of types of Plugins to print
names (list): list of names of Plugins to print
version (list): list of versions of Plugins to print
stream: stream to which send the display. By default, stdout is used. A stream to a given file can be used instead.
"""
if stream is None:
stream = sys.stdout
keys = [
k
for k in list(cls.registered.keys())
if (k[0] in names or names == [])
and (k[1] in versions or versions == [])
and (k[2] in types or types == [])
]
names, versions, types, subtypes = list(zip(*keys))
modules = [
cls.get_registered(n, v, t, st)
for n, v, t, st in zip(names, versions, types, subtypes)
]
print(
"List of all available plugins and requirements "
"for each class: \n\n", file=stream
)
all_types = sorted(list(set(types)))
for tt in all_types:
rst_type = cls.plugin_types[tt][0][1:]
if not print_rst:
print("\t", tt, file=stream)
else:
print("\t", ":doc:`{}</documentation/plugins/{}/index>`"
.format(tt, rst_type), file=stream)
plg_type_list = sorted([
(n, v, t, st, mod)
for n, v, t, st, mod in zip(names, versions, types, subtypes, modules)
if t == tt])
sub_types = sorted(list(set([t[3] for t in plg_type_list])))
for stt in sub_types:
rst_subtype = cls.plugin_subtypes[tt][stt][1:]
indent = "\t\t"
if len(sub_types) > 1:
indent = "\t\t\t"
if not print_rst:
print("\t\t - ", stt, file=stream)
else:
print("\t\t - ", ":doc:`{}</documentation/plugins/{}/{}/index>`"
.format(stt, rst_type, rst_subtype), file=stream)
plg_subtype_list = sorted([
(n, v, t, st, mod)
for n, v, t, st, mod in plg_type_list
if st == stt])
for n, v, t, st, mod in plg_subtype_list:
if not print_rst:
print("{}- {}, {}".format(indent, n, v), file=stream)
else:
print("{}- :doc:`{}, {}</documentation/plugins/{}/{}/{}>`"
.format(indent, n, v, rst_type, rst_subtype,
cls.registered[n, v, t, st].split(".")[-1]),
file=stream)
# Print requirements
if print_requirement and hasattr(mod, "requirements"):
print("{}\tRequires:".format(indent), file=stream)
for req in mod.requirements:
mod_req = mod.requirements[req]
name = mod_req.get("name", None)
version = mod_req.get("version", None)
any = mod_req.get("any", False)
empty = mod_req.get("empty", "")
req_type = mod_req.get("type", req)
req_subtype = mod_req.get("subtype", "")
if not print_rst:
print("{}\t\t- {}:".format(indent, req),
file=stream)
else:
plg_req = cls.from_dict({
"plugin": {
"name": name,
"version": version,
"type": req_type,
"subtype": req_subtype
}
})
plg_req._load_plugin_type(req)
req_rst_type = \
cls.plugin_types[plg_req.plugin.type][0][1:]
req_rst_subtype = \
cls.plugin_subtypes[
plg_req.plugin.type][
plg_req.plugin.subtype][1:]
print("{}\t\t"
"- :doc:`{}</documentation/plugins/{}/{}/index>`:"
.format(indent, req,
req_rst_type, req_subtype), file=stream)
print(
"{}\t\t\t- name: {}".format(indent, name), file=stream)
print("{}\t\t\t- version: {}".format(indent,
version), file=stream)
print(
"{}\t\t\t- any: {}".format(indent, any), file=stream)
print(
"{}\t\t\t- empty: {}".format(indent, empty), file=stream)
print("\n", file=stream)
[docs] @classmethod
def is_registered(cls, name, version, plugin_type, subtype):
"""Check if a plugin is registered
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
bool: True if a parser is registered
key: the key of the registered Plugin if registered
"""
if cls.plugin_key(name, version, plugin_type, subtype) in cls.registered:
return True, cls.plugin_key(name, version, plugin_type, subtype)
elif subtype == "":
# If given a sub-type, loop on all types and check if valid sub-type
if not cls.is_allowed(plugin_type):
matching = [k for k in cls.registered.keys()
if (k[0], k[1], k[3]) == (name, version, plugin_type)]
# Otherwise loop on sub-types
else:
matching = [k for k in cls.registered.keys()
if (k[0], k[1], k[2]) == (name, version, plugin_type)]
if len(matching) == 1:
return True, matching[0]
elif len(matching) > 1:
raise PluginError("Multiple plugins matching required definition for "
"{}/{}/{}: \n{}".format(
name, version, plugin_type, matching))
return False, cls.plugin_key(name, version, plugin_type, subtype)
[docs] @classmethod
def is_allowed(cls, plugin_type):
"""Check whether a plugin type is allowed in pyCIF or not
Args:
plugin_type (str): type of plugin
Returns:
bool: True if allowed plugin
"""
return plugin_type in cls.plugin_types
[docs] @classmethod
def is_allowed_as_subtype(cls, plugin_type):
"""Check whether a plugin type is allowed as the sub-type
of a main type in pyCIF or not
Args:
plugin_type (str): type of plugin
Returns:
bool: True if allowed plugin
"""
matching = [
t for t in cls.plugin_subtypes
if plugin_type in cls.plugin_subtypes[t]]
if len(matching) == 1:
return True, matching[0]
return False, matching
[docs] @classmethod
def is_loaded(cls, name, version, plugin_type, subtype=""):
"""Check whether a plugin is loaded
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
bool
"""
return (
cls.plugin_key(name, version, plugin_type, subtype)
in cls.loaded_instances
)
[docs] @classmethod
def register_plugin(cls, name, version, module,
plugin_type="", subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
**kwargs (dictionary): default options for module
"""
registered = cls.is_registered(name, version, plugin_type, subtype)
if registered[0]:
raise ValueError(
"Already created a Module "
"for plugin {} (version {}) and type {}".format(
name, version, plugin_type
) + ("and sub-type {}".format(subtype) if subtype != "" else "")
)
cls.registered[registered[1]] = module.__name__
[docs] @classmethod
def get_registered(cls, name, version, plugin_type, subtype=""):
"""Get the correct registered plugin, given its name, version and type
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
Plugin: plugin module for plugin version
"""
registered = cls.is_registered(name, version, plugin_type, subtype)
if not registered[0]:
raise PluginError(
"No {}/{} module found for plugin {} and Version {}".format(
plugin_type, subtype, name, version
)
)
module = importlib.import_module(cls.registered[registered[1]])
# Get input_arguments from the main class
pycif_package = __package__.split(".")[0]
class_package = "{}.plugins{}".format(
pycif_package, cls.plugin_types[registered[1][2]][0])
class_module = importlib.import_module(class_package)
module.input_arguments = getattr(module, "input_arguments", {})
if hasattr(class_module, "input_arguments"):
module.input_arguments = dict(
class_module.input_arguments,
**module.input_arguments)
# Clean sub-modules that are also in input argument
for arg_in in module.input_arguments:
if hasattr(module, arg_in):
delattr(module, arg_in)
return module
[docs] @classmethod
def get_loaded(cls, name, version, plugin_type, subtype=""):
"""Get the correct loaded plugin, given its name, version and type
(and optionally subtype)
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
Plugin: plugin module for plugin version
"""
if not cls.is_loaded(name, version, plugin_type, subtype=""):
raise PluginError(
"No {} module loaded for plugin {} and Version {}".format(
plugin_type, name, version
)
)
return cls.loaded_instances[cls.plugin_key(name, version, plugin_type, subtype)]
[docs] @classmethod
def get_subclass(cls, plg_type, plg_subtype=""):
"""Get the plugin class template from a given type
Args:
plg_type (str): the plugin type to load
plg_subtype (str): the plugin sub-type to load
Returns:
Empty instance of the correct class type
"""
# Find subtypes
subtypes = [t for t in cls.plugin_subtypes
if plg_type in cls.plugin_subtypes[t]]
if plg_type in cls.plugin_types:
subclass = cls.plugin_types[plg_type]
elif plg_subtype in cls.plugin_types:
subclass = cls.plugin_types[plg_subtype]
elif len(subtypes) == 1:
subclass = cls.plugin_types[subtypes[0]]
else:
raise PluginError("Plugin type {}/{} is not recognized by pyCIF"
.format(plg_type, plg_subtype))
return getattr(
importlib.import_module(subclass[0], __package__),
subclass[1],
)
[docs] @classmethod
def load_registered(cls, name, version, plg_type,
plg_subtype="", plg_orig=None):
"""Get a sub-class instance of a registered plugin.
This can be used to get default required plugins.
Args:
name (str): name of the plugin
version (str): version of the plugin
plg_type (str): type of plugin
plg_subtype (str): sub-type of plugin
plg_orig (Plugin): Original plugin from which to copy attributes to the new plugin
Returns:
Plugin: a new plugin of correct type
"""
registered = cls.is_registered(name, version, plg_type, plg_subtype)
plgtmp = cls.get_registered(name, version, plg_type, plg_subtype)
if plg_orig is not None:
for attr in plg_orig.attributes:
setattr(plgtmp, attr, getattr(plg_orig, attr))
plgtmp.attributes = plg_orig.attributes[:]
# Adding plugin attribute
(name, version, plg_type, plg_subtype) = registered[1]
plgtmp.plugin = cls.from_dict(
{"name": name, "version": version, "type": plg_type,
"subtype": plg_subtype}
)
# Creating a sub-class instance and initializing it
plgtmp = cls.childclass_factory(plg_orig=plgtmp)
# Initializing the sub-class instance
plgtmp.initiate_template()
return plgtmp
def _load_plugin_type(self, key, parent_plg_type=None):
"""Load plugin type and add it to the plugin if not already specified
Args:
key (str): the plugin type
parent_plg_type (str): the parent plugin type that is inherited
from higher level plugins
"""
# Choosing the correct type according to the plugin name, or its last
# known authorized parend plugin
plg_subtype = ""
if self.is_allowed(key):
plg_type = key
elif self.is_allowed_as_subtype(key)[0]:
plg_type = self.is_allowed_as_subtype(key)[1]
plg_subtype = key
else:
plg_type = parent_plg_type
# Initializing the attribute plugin to store type, name and version
# Update the type if not already given
plg = getattr(self, "plugin", None)
if plg is None:
self.plugin = self.from_dict(
{"name": None, "version": None,
"type": plg_type, "subtype": plg_subtype}
)
else:
if getattr(self.plugin, "type", None) is None:
self.plugin.type = plg_type
if getattr(self.plugin, "subtype", "") == "":
self.plugin.subtype = plg_subtype
# Check that the definition from the Yaml is consistent
if self.is_allowed(self.plugin.type) \
and self.plugin.subtype in self.plugin_subtypes[self.plugin.type]:
return plg_type
name = self.plugin.name
version = self.plugin.version
plg_type = self.plugin.type
plg_subtype = self.plugin.subtype
if self.is_allowed(plg_type):
matching = [k[3] for k in self.registered.keys()
if (k[0], k[1], k[2]) == (name, version, plg_type)]
if len(matching) == 1:
self.plugin.subtype = matching[0]
return plg_type
else:
matching = [k for k in self.registered.keys()
if (k[0], k[1], k[3]) == (name, version, plg_type)]
if len(matching) == 1:
self.plugin.subtype = matching[0][3]
self.plugin.type = matching[0][2]
return plg_type
# If plugin is only defined as empty with no name,
# just check that it is allowed
if name is None and version is None:
if self.is_allowed(plg_type):
return plg_type
else:
matching = [t for t in self.plugin_subtypes
if plg_type in self.plugin_subtypes[t]]
if len(matching) == 1:
self.plugin.subtype = self.plugin.type
self.plugin.type = matching[0]
return self.plugin.type
raise Exception(
"There is some error in the definition of your Yaml or, if you "
"are a developer, in the new plugin you are designing: \n"
"Trying to load the following plugin: \n"
"{}:\n"
" plugin:\n"
" name: {}\n"
" version: {}\n"
" type: {}\n"
" subtype: {}\n\n"
"Available types and subtypes are: \n{}\n"
"Please check spelling in your definition\n\n"
"If you are initializing an empty plugin, please be sure "
"to define it with name = None AND version = None.".format(
key, name, version, plg_type, plg_subtype,
"\n".join(["- {}/{}".format(t, st)
for t in self.plugin_subtypes
for st in self.plugin_subtypes[t]])
)
)
[docs] @classmethod
def save_loaded(cls, plg):
"""Saves all loaded plugins to the class
Args:
plg (Plugin): plugin to save
"""
name = plg.plugin.name
version = plg.plugin.version
plugin_type = plg.plugin.type
plugin_subtype = plg.plugin.subtype
cls.loaded_instances[
cls.plugin_key(name, version, plugin_type, plugin_subtype)] = plg
@classmethod
def _save_refplugins(cls, plg):
"""Save all level 0 attributes to the class.
Should be called only once
Args:
plg (Plugin): plugin to save
"""
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
if issubclass(type(plg_attr), Plugin):
plg_attr._load_plugin_type(attr)
name = plg_attr.plugin.name
version = plg_attr.plugin.version
plugin_type = plg_attr.plugin.type
plugin_subtype = plg_attr.plugin.subtype
setattr(plg_attr, "isreference", True)
else:
name = version = plugin_type = plugin_subtype = attr
cls.reference_instances[plugin_type] = plg_attr
cls.reference_instances["reference_setup"] = plg
@classmethod
def _save_subrefplugins(cls, plg, parent_plg_type="setup", tree=""):
"""Save all attributes to the class.
Should be called only once
Args:
plg (Plugin): plugin to save
"""
if not hasattr(plg, "attributes"):
return
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
if issubclass(type(plg_attr), Plugin):
plg_attr._load_plugin_type(attr, parent_plg_type)
name = plg_attr.plugin.name
version = plg_attr.plugin.version
plugin_type = plg_attr.plugin.type
plugin_subtype = plg_attr.plugin.subtype
else:
name = version = plugin_type = plugin_subtype = attr
plg_tree = "{}/{}".format(tree, attr)
cls._save_subrefplugins(plg_attr, parent_plg_type, tree=plg_tree)
if (plugin_type, plugin_subtype) in cls.subreference_instances:
cls.subreference_instances[
(plugin_type, plugin_subtype)][plg_tree] = plg_attr
else:
cls.subreference_instances[
(plugin_type, plugin_subtype)] = {plg_tree: plg_attr}
[docs] @classmethod
def from_dict(cls, def_dict, orig_name="", convert_none=False, **kwargs):
"""Loads a recursive dictionary structure into a Plugin
Args:
def_dict (dict): the definition dictionary
orig_dict (dict): the definition dictionary used at level 0
Returns:
Plugin
"""
# Loop over keys to be loaded
# Recursively initialize from dictionary
plg = cls()
for key in def_dict:
if isinstance(def_dict[key], dict):
setattr(
plg,
key,
cls.from_dict(
def_dict[key],
orig_name=key,
convert_none=convert_none,
**kwargs
),
)
# Initializes empty keys as Plugins
elif def_dict[key] is None and convert_none:
setattr(plg, key, cls())
else:
setattr(plg, key, def_dict[key])
# Saves the definition keys of the original dictionary into the
# output plugin
plg.attributes = list(def_dict.keys())
# Saves the name of the plugin as specified in the configuration file
plg.orig_name = orig_name
return plg
[docs] @classmethod
def to_dict(
cls,
plg,
exclude_patterns=None,
full=False):
"""Turns a Plugin to a dictionary for easier saving.
Args:
plg (Plugin): a Plugin instance to be saved as a dictionary
"""
default_exclude = [
"logfile", "datei", "datef", "workdir", "verbose"
]
if exclude_patterns is None:
exclude_patterns = default_exclude
else:
exclude_patterns.extend(default_exclude)
# If the input has no method 'attributes', just return it
if not hasattr(plg, "attributes") or isinstance(plg, list):
return plg
out = {}
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
# Otherwise, save the attribute value
if (
not ismethod(plg_attr)
and not isinstance(plg_attr, FunctionType)
and not isinstance(plg_attr, ModuleType)
and not attr[0] == "_"
and attr not in dir(cls)
and attr
not in [
"absolute_import",
"loaded_class",
"loaded_data",
"loaded_attributes",
"loaded_requirements",
"loaded_template",
"orig_name",
"requirements",
"attributes",
"isreference",
"Method_type",
"MethodType",
"default_requirements",
"mapper",
]
+ exclude_patterns
and "Feature" not in str(plg_attr)
):
# If the attribute is a Plugin sub-class,
# recursively call to_dict
if hasattr(plg_attr, "to_dict"):
out[attr] = cls.to_dict(plg_attr, exclude_patterns, full)
continue
# Deal with lists
try:
if type(plg_attr) == str:
out[attr] = plg_attr
elif len(plg_attr) > 10 and full:
if type(plg_attr) \
in [np.ndarray, pd.core.frame.DataFrame]:
out[attr] = "{} {}".format(
type(plg_attr), plg_attr.shape
)
elif type(plg_attr) == dict:
out[attr] = "{} {} keys".format(
type(plg_attr), len(plg_attr.keys())
)
elif type(plg_attr) == list:
out[attr] = "{} len({})".format(
type(plg_attr), len(plg_attr)
)
elif len(plg_attr) < 10:
out[attr] = plg_attr
except TypeError:
out[attr] = plg_attr
# Removing the __class__ attribute
if "__class__" in list(out.keys()):
del out["__class__"]
return out
[docs] @classmethod
def to_yaml(cls, plg, yaml_file, full=True):
"""Write a Yaml from a loaded plugin"""
plg_dict = cls.to_dict(plg)
with open(yaml_file, "w") as f:
ordered_dump(f, plg_dict)
[docs] @classmethod
def from_yaml(cls, def_file):
"""Generates a dictionary including all pyCIF parameters
Args:
def_file (string) : Path to the definition file
Handles both absolute and relative paths
Returns:
config_dict (dictionary): Dictionary populated with all pyCIF
parameters
"""
yml_file = os.path.abspath(os.path.expanduser(def_file))
try:
with open(yml_file, "r") as f:
config_dict = ordered_load(f)
config_dict["def_file"] = yml_file
if "datei" in config_dict:
# Converting dates to datetime if necessary
config_dict["datei"] = dates.date2datetime(
config_dict["datei"]
)
config_dict["datef"] = dates.date2datetime(
config_dict["datef"]
)
return config_dict
except IOError as e:
print("Couldn't find config file: {}".format(yml_file))
print("Please check directories")
raise e
except yaml.scanner.ScannerError as e:
print("Error in the syntax of config file: {}".format(yml_file))
raise e
[docs] @classmethod
def print_default(cls, plg):
"""Print default values if available"""
if not hasattr(plg, "default_values"):
print(
"""{} ({}, {}, {}) has no default values""".format(
plg, plg.plugin.name, plg.plugin.version, plg.plugin.type
)
)
return
print(
"""The default values of {} ({}, {}, {}) are:""".format(
plg, plg.plugin.name, plg.plugin.version, plg.plugin.type
)
)
for k in plg.default_values:
print("- {}:\t{}".format(k, plg.default_values[k]))
[docs] @classmethod
def childclass_factory(cls, plg_orig, child_type=None,
parent_plg=None,
overwrite=False):
"""Generates an instance of one of Plugin's child classes. Transfers
all existing attributes in the argument plugin to the output
child-class instance
Args:
plg_orig (Plugin): the plugin to turn into a child-class instance
child_type (str): sub-class type to generate if not available in
plg_orig.plugin.type
overwrite (bool): overwrite the class type of the origin plugin
Return:
child_plg: a plugin with all the attributes from plg_orig,
but as a child-class instance
"""
plg_type = getattr(plg_orig.plugin, "type", None)
if plg_type is None or plg_type == "setup":
if child_type is None and plg_type is None:
raise PluginError(
"The Child-class factory was called on a plugin "
"that was not correctly initialized: {} / {}".format(
plg_orig, plg_orig.orig_name
)
)
else:
plg_type = child_type
plg_orig._load_plugin_type(plg_type)
else:
plg_type = plg_orig.plugin.type
# If the type is not referenced, don't do anything
if not cls.is_allowed(plg_type):
return plg_orig
# Load the subclass
child_plg = cls.get_subclass(plg_type)(plg_orig=plg_orig)
# Replace name and version if empty
if parent_plg is not None and hasattr(parent_plg, "plugin"):
if child_plg.plugin.name is None:
child_plg.plugin.name = parent_plg.plugin.name
child_plg.plugin.from_parent = True
if child_plg.plugin.version is None:
child_plg.plugin.version = parent_plg.plugin.version
child_plg.plugin.from_parent = True
return child_plg
[docs] def initiate_template(self, plg_type=None, default_functions={}):
"""Initializes a Plugin template, with methods from the corresponding
module.
Args:
self: the plugin to initialize
plg_type: the type of the plugin to initialize
default_functions (dict[str, bool]):
functions to load from the module and to attach to the plugin.
Each key names the function and each value is a boolean to determine
whether the corresponding function is a class method (with a reflective
self as argument) or a classical static function
"""
# First load the module
module = self.initiate(plg_type=plg_type)
# Attach the list of functions
for f in default_functions:
if not hasattr(module, f):
continue
if default_functions[f]:
setattr(self, f, MethodType(getattr(module, f), self))
else:
setattr(self, f, getattr(module, f))
# Add set_requirements
if hasattr(module, "set_requirements"):
self.set_requirements = MethodType(module.set_requirements, self)
[docs] def initiate(self, plg_type=None):
"""Initializes a Plugin, i.e., loads functions from registered
plugins
Args:
plg_type (str): the type of plugin to load; this should
correspond to one of the defined child-classes
Return:
module: a python module as registered in pyCIF
"""
# It there is no attribute 'plugin', can't initialize anything as
# python will not know the plugin type, name and version
plugin = getattr(self, "plugin", None)
if plugin is None:
return
# Load plugin IDs
name = getattr(plugin, "name", None)
version = getattr(plugin, "version", None)
plg_subtype = getattr(plugin, "subtype", "")
if plg_type is None:
plg_type = plugin.type
# Load registered module if the Plugin's name and version are not
# default empty strings
if name is not None:
try:
module = self.get_registered(
name, version, plg_type, plg_subtype)
# Attributing all module functions to the plugin
functions = [
f
for f in dir(module)
if f not in getattr(module, "attributes", [])
and f[0] != "_"
and f != "attributes"
]
for attr in functions:
f = getattr(module, attr)
if not isinstance(f, ModuleType):
setattr(self, attr, f)
# Ini_data should be define as a Method Type
if hasattr(module, "ini_data"):
self.ini_data = MethodType(module.ini_data, self)
except PluginError as e:
# If plugin initialized from parent, ignore
if not getattr(plugin, "from_parent", False):
raise e
else:
self.plugin.name = None
self.plugin.version = None
return
return module
@classmethod
def flushall(cls):
cls.subreference_instances = {}
cls.loaded_instances = {}
cls.reference_instances = {}
# import sys
# mod2flush = [mod for mod in sys.modules if "pycif" in mod]
# for mod in mod2flush:
# del sys.modules[mod]
[docs] def set_requirements(self):
"""Update requirements depending on the Plugin properties
By default, this method does nothing. It can be included in any new
Plugin depending on the developer needs.
Args:
self (Plugin): the Plugin to update
"""
return
[docs] @classmethod
def dump_incorrect(cls, file_dump):
"""Dump incorrect arguments from the Yaml into a file
:param file_dump: File where to dump
:return:
"""
with open(file_dump, "w") as f:
for k in cls.unauthorized_arguments:
f.write(f"- {k}: {cls.unauthorized_arguments[k]}\n")
# print(cls.unauthorized_arguments)
# print(__file__)
# import code
# code.interact(local=dict(locals(), **globals()))