import importlib
import os
from inspect import ismethod
from types import MethodType, FunctionType, ModuleType
import sys
import numpy as np
import pandas as pd
import yaml
from ...utils.check.errclass import PluginError
from ...utils.yml import ordered_load, ordered_dump
from ...utils import dates
[docs]
class Plugin(object):
"""Base class for all pyCIF plugin objects.
Stores plugin identity (type, name, version, subtype), manages the global
registry of available plugins, and provides the machinery for loading,
initialising and wiring plugin instances from YAML configuration.
Attributes:
plugin_types (dict): Mapping of recognised plugin-type names to their
module path and class name.
plugin_subtypes (dict): Mapping of plugin types to their allowed subtypes.
registered (dict): Registry of all registered plugin modules, keyed by
(name, version, type, subtype).
loaded_instances (dict): Cache of all instantiated plugins.
reference_instances (dict): Level-0 plugin instances from the setup file.
subreference_instances (dict): All sub-level plugin instances.
unauthorized_arguments (dict): Arguments found in the YAML that are not
declared in any plugin's ``input_arguments``.
ref_config: Reference configuration (set once at load time).
"""
# Authorized Plugins
plugin_types = {
"chemistry": [".chemistries", "Chemistry"],
"controlvect": [".controlvects", "ControlVect"],
"datavect": [".datavects", "DataVect"],
"datastream": [".datastreams", "DataStream"],
"domain": [".domains", "Domain"],
"measurements": [".measurements", "Measurement"],
"minimizer": [".minimizers", "Minimizer"],
"mode": [".modes", "Mode"],
"model": [".models", "Model"],
"obsoperator": [".obsoperators", "ObsOperator"],
"obsparser": [".obsparsers", "ObsParser"],
"obsvect": [".obsvects", "ObsVect"],
"platform": [".platforms", "Platform"],
"simulator": [".simulators", "Simulator"],
"transform": [".transforms", "Transform"],
"setup": [".setup", "Setup"],
}
plugin_subtypes = {t: {"": ""} for t in plugin_types}
plugin_subtypes["datastream"] = {
"meteo": ".meteos",
"flux": ".fluxes",
"background": ".backgrounds",
"field": ".fields"
}
plugin_subtypes["transform"] = {
"basic": ".basic", "complex": ".complex", "system": ".system"
}
# Registered and already loaded plugins
registered = {}
loaded_instances = {}
reference_instances = {}
subreference_instances = {}
# Unauthorized arguments
unauthorized_arguments = {}
# Reference Setup
ref_config = None
# Maximum recursive level
__maxrecursive__ = 50
def __init__(self, plg_orig=None,
orig_name="", **kwargs):
"""Initialise a Plugin, optionally copying attributes from another plugin.
Args:
plg_orig (Plugin, optional): If provided, all public attributes of
``plg_orig`` are copied to the new instance.
orig_name (str): Name of the plugin as declared in the YAML config.
Defaults to ``""``.
**kwargs: Additional key/value pairs set as attributes on the new
instance.
"""
# Set the orig_name
self.orig_name = orig_name
# Update the plugin with attributes from plg_orig
attributes = self._get_attribute_list(plg_orig)
for attr in attributes:
if not callable(getattr(self, attr, None)):
setattr(self, attr, getattr(plg_orig, attr))
# Initialize the list of attributes if not already done
if not hasattr(self, "attributes"):
self.attributes = []
# Update attributes according to args
if len(kwargs) > 0:
for kw in kwargs:
setattr(self, kw, kwargs[kw])
# Setting default requirements
self.default_requirements = {
key: {"any": True, "empty": True}
for key in [
"datei", "datef", "workdir",
"logfile", "verbose", "monitor_memory"
]
}
# Initiating requirements if not specified
if not hasattr(self, "requirements"):
self.requirements = {}
def __repr__(self):
"""Return a human-readable string representation of the plugin.
Returns:
str: ``<Plugin type/subtype/name/version at 0x…>`` when plugin
metadata is available, otherwise the default Python object repr.
"""
plg = getattr(self, 'plugin', self)
try:
if plg.subtype is not None and plg.subtype:
name_list = [plg.type, plg.subtype, plg.name, plg.version]
else:
name_list = [plg.type, plg.name, plg.version]
name_list = [str(name) for name in name_list]
return f"<Plugin {'/'.join(name_list)} at {hex(id(self))}>"
except AttributeError:
default_repr = (
f"<{self.__class__.__module__}.{self.__class__.__name__} "
f"object at {hex(id(self))}>"
)
return default_repr
@classmethod
def _get_attribute_list(cls, plg):
"""Return public attributes of a plugin, excluding dunder names.
Args:
plg (Plugin): Plugin instance or class to inspect.
Returns:
list[str]: List of attribute names not starting with ``_``.
"""
return [a for a in dir(plg) if not a.startswith("_")]
[docs]
@staticmethod
def plugin_key(name, version, plugin_type, subtype):
"""Creates the key for a name, version and plugin type
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
set: dict key for a plugin and version
"""
return name, version, plugin_type, subtype
[docs]
@classmethod
def print_registered(
cls, print_requirement=False, print_rst=False,
types=[], names=[], versions=[], stream=None
):
"""Print in a user-friendly format the list of available plugins
Args:
print_requirement (bool): for each registered plugin, print its requirements
print_rst (bool): print in rst format for automatic use in the documentations
types (list): list of types of Plugins to print
names (list): list of names of Plugins to print
version (list): list of versions of Plugins to print
stream: stream to which send the display. By default, stdout is used. A stream to a given file can be used instead.
"""
if stream is None:
stream = sys.stdout
keys = [
k
for k in list(cls.registered.keys())
if (k[0] in names or names == [])
and (k[1] in versions or versions == [])
and (k[2] in types or types == [])
]
names, versions, types, subtypes = list(zip(*keys))
modules = [
cls.get_registered(n, v, t, st)
for n, v, t, st in zip(names, versions, types, subtypes)
]
print(
"List of all available plugins and requirements "
"for each class: \n\n", file=stream
)
all_types = sorted(list(set(types)))
for tt in all_types:
rst_type = cls.plugin_types[tt][0][1:]
if not print_rst:
print("\t", tt, file=stream)
else:
print("\t", ":doc:`{}</documentation/plugins/{}/index>`"
.format(tt, rst_type), file=stream)
plg_type_list = sorted([
(n, v, t, st, mod)
for n, v, t, st, mod in zip(names, versions, types, subtypes, modules)
if t == tt])
sub_types = sorted(list(set([t[3] for t in plg_type_list])))
for stt in sub_types:
rst_subtype = cls.plugin_subtypes[tt][stt][1:]
indent = "\t\t"
if len(sub_types) > 1:
indent = "\t\t\t"
if not print_rst:
print("\t\t - ", stt, file=stream)
else:
print("\t\t - ", ":doc:`{}</documentation/plugins/{}/{}/index>`"
.format(stt, rst_type, rst_subtype), file=stream)
plg_subtype_list = sorted([
(n, v, t, st, mod)
for n, v, t, st, mod in plg_type_list
if st == stt])
for n, v, t, st, mod in plg_subtype_list:
if not print_rst:
print("{}- {}, {}".format(indent, n, v), file=stream)
else:
print("{}- :doc:`{}, {}</documentation/plugins/{}/{}/{}>`"
.format(indent, n, v, rst_type, rst_subtype,
cls.registered[n, v, t, st].split(".")[-1]),
file=stream)
# Print requirements
if print_requirement and hasattr(mod, "requirements"):
print("{}\tRequires:".format(indent), file=stream)
for req in mod.requirements:
mod_req = mod.requirements[req]
name = mod_req.get("name", None)
version = mod_req.get("version", None)
any = mod_req.get("any", False)
empty = mod_req.get("empty", "")
req_type = mod_req.get("type", req)
req_subtype = mod_req.get("subtype", "")
if not print_rst:
print("{}\t\t- {}:".format(indent, req),
file=stream)
else:
plg_req = cls.from_dict({
"plugin": {
"name": name,
"version": version,
"type": req_type,
"subtype": req_subtype
}
})
plg_req._load_plugin_type(req)
req_rst_type = \
cls.plugin_types[plg_req.plugin.type][0][1:]
req_rst_subtype = \
cls.plugin_subtypes[
plg_req.plugin.type][
plg_req.plugin.subtype][1:]
print("{}\t\t"
"- :doc:`{}</documentation/plugins/{}/{}/index>`:"
.format(indent, req,
req_rst_type, req_subtype), file=stream)
print(
"{}\t\t\t- name: {}".format(indent, name), file=stream)
print("{}\t\t\t- version: {}".format(indent,
version), file=stream)
print(
"{}\t\t\t- any: {}".format(indent, any), file=stream)
print(
"{}\t\t\t- empty: {}".format(indent, empty), file=stream)
print("\n", file=stream)
[docs]
@classmethod
def is_registered(cls, name, version, plugin_type, subtype):
"""Check if a plugin is registered
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
bool: True if a parser is registered
key: the key of the registered Plugin if registered
"""
if cls.plugin_key(name, version, plugin_type, subtype) in cls.registered:
return True, cls.plugin_key(name, version, plugin_type, subtype)
elif subtype == "":
# If given a sub-type, loop on all types and check if valid sub-type
if not cls.is_allowed(plugin_type):
matching = [k for k in cls.registered.keys()
if (k[0], k[1], k[3]) == (name, version, plugin_type)]
# Otherwise loop on sub-types
else:
matching = [k for k in cls.registered.keys()
if (k[0], k[1], k[2]) == (name, version, plugin_type)]
if len(matching) == 1:
return True, matching[0]
elif len(matching) > 1:
raise PluginError("Multiple plugins matching required definition for "
"{}/{}/{}: \n{}".format(
name, version, plugin_type, matching))
return False, cls.plugin_key(name, version, plugin_type, subtype)
[docs]
@classmethod
def is_allowed(cls, plugin_type):
"""Check whether a plugin type is allowed in pyCIF or not
Args:
plugin_type (str): type of plugin
Returns:
bool: True if allowed plugin
"""
return plugin_type in cls.plugin_types
[docs]
@classmethod
def is_allowed_as_subtype(cls, plugin_type):
"""Check whether a plugin type is allowed as the sub-type
of a main type in pyCIF or not
Args:
plugin_type (str): type of plugin
Returns:
bool: True if allowed plugin
"""
matching = [
t for t in cls.plugin_subtypes
if plugin_type in cls.plugin_subtypes[t]]
if len(matching) == 1:
return True, matching[0]
return False, matching
[docs]
@classmethod
def is_loaded(cls, name, version, plugin_type, subtype=""):
"""Check whether a plugin is loaded
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
bool
"""
return (
cls.plugin_key(name, version, plugin_type, subtype)
in cls.loaded_instances
)
[docs]
@classmethod
def register_plugin(cls, name, version, module,
plugin_type="", subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
**kwargs (dictionary): default options for module
"""
registered = cls.is_registered(name, version, plugin_type, subtype)
if registered[0]:
raise ValueError(
"Already created a Module "
"for plugin {} (version {}) and type {}".format(
name, version, plugin_type
) + ("and sub-type {}".format(subtype) if subtype != "" else "")
)
cls.registered[registered[1]] = module.__name__
[docs]
@classmethod
def get_registered(cls, name, version, plugin_type, subtype=""):
"""Get the correct registered plugin, given its name, version and type
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
Plugin: plugin module for plugin version
"""
registered = cls.is_registered(name, version, plugin_type, subtype)
if not registered[0]:
raise PluginError(
"No {}/{} module found for plugin {} and Version {}".format(
plugin_type, subtype, name, version
)
)
module = importlib.import_module(cls.registered[registered[1]])
# Get input_arguments from the main class
pycif_package = __package__.split(".")[0]
class_package = "{}.plugins{}".format(
pycif_package, cls.plugin_types[registered[1][2]][0])
class_module = importlib.import_module(class_package)
module.input_arguments = getattr(module, "input_arguments", {})
if hasattr(class_module, "input_arguments"):
module.input_arguments = dict(
class_module.input_arguments,
**module.input_arguments)
# Clean sub-modules that are also in input argument
for arg_in in module.input_arguments:
if hasattr(module, arg_in):
delattr(module, arg_in)
return module
[docs]
@classmethod
def get_loaded(cls, name, version, plugin_type, subtype=""):
"""Get the correct loaded plugin, given its name, version and type
(and optionally subtype)
Args:
name (str): name of the plugin
version (str): version of the plugin
plugin_type (str): type of plugin
subtype (str): sub-type of plugin
Returns:
Plugin: plugin module for plugin version
"""
if not cls.is_loaded(name, version, plugin_type, subtype=""):
raise PluginError(
"No {} module loaded for plugin {} and Version {}".format(
plugin_type, name, version
)
)
return cls.loaded_instances[cls.plugin_key(name, version, plugin_type, subtype)]
[docs]
@classmethod
def get_subclass(cls, plg_type, plg_subtype=""):
"""Get the plugin class template from a given type
Args:
plg_type (str): the plugin type to load
plg_subtype (str): the plugin sub-type to load
Returns:
Empty instance of the correct class type
"""
# Find subtypes
subtypes = [t for t in cls.plugin_subtypes
if plg_type in cls.plugin_subtypes[t]]
if plg_type in cls.plugin_types:
subclass = cls.plugin_types[plg_type]
elif plg_subtype in cls.plugin_types:
subclass = cls.plugin_types[plg_subtype]
elif len(subtypes) == 1:
subclass = cls.plugin_types[subtypes[0]]
else:
raise PluginError("Plugin type {}/{} is not recognized by pyCIF"
.format(plg_type, plg_subtype))
return getattr(
importlib.import_module(subclass[0], __package__),
subclass[1],
)
[docs]
@classmethod
def load_registered(cls, name, version, plg_type,
plg_subtype="", plg_orig=None):
"""Get a sub-class instance of a registered plugin.
This can be used to get default required plugins.
Args:
name (str): name of the plugin
version (str): version of the plugin
plg_type (str): type of plugin
plg_subtype (str): sub-type of plugin
plg_orig (Plugin): Original plugin from which to copy attributes to the new plugin
Returns:
Plugin: a new plugin of correct type
"""
registered = cls.is_registered(name, version, plg_type, plg_subtype)
plgtmp = cls.get_registered(name, version, plg_type, plg_subtype)
if plg_orig is not None:
for attr in plg_orig.attributes:
setattr(plgtmp, attr, getattr(plg_orig, attr))
plgtmp.attributes = plg_orig.attributes[:]
# Adding plugin attribute
(name, version, plg_type, plg_subtype) = registered[1]
plgtmp.plugin = cls.from_dict(
{"name": name, "version": version, "type": plg_type,
"subtype": plg_subtype}
)
# Creating a sub-class instance and initializing it
plgtmp = cls.childclass_factory(plg_orig=plgtmp)
# Initializing the sub-class instance
plgtmp.initiate_template()
return plgtmp
def _load_plugin_type(self, key, parent_plg_type=None):
"""Load plugin type and add it to the plugin if not already specified.
Resolves the plugin type from ``key``: if ``key`` is a recognised
top-level type it is used directly; if it is a recognised sub-type the
parent type is inferred; otherwise ``parent_plg_type`` is used as a
fallback. Updates ``self.plugin.type`` and ``self.plugin.subtype``
in place.
Args:
key (str): The YAML attribute name used as a candidate plugin type.
parent_plg_type (str, optional): The last recognised plugin type
inherited from a parent plugin. Used as fallback when ``key``
is not a known type or sub-type.
Returns:
str: The resolved top-level plugin type (e.g. ``"datastream"``).
Raises:
Exception: If the type/subtype combination cannot be matched to any
registered or allowed plugin.
"""
# Choosing the correct type according to the plugin name, or its last
# known authorized parend plugin
plg_subtype = ""
if self.is_allowed(key):
plg_type = key
elif self.is_allowed_as_subtype(key)[0]:
plg_type = self.is_allowed_as_subtype(key)[1]
plg_subtype = key
else:
plg_type = parent_plg_type
# Initializing the attribute plugin to store type, name and version
# Update the type if not already given
plg = getattr(self, "plugin", None)
if plg is None:
self.plugin = self.from_dict(
{"name": None, "version": None,
"type": plg_type, "subtype": plg_subtype}
)
else:
if getattr(self.plugin, "type", None) is None:
self.plugin.type = plg_type
if getattr(self.plugin, "subtype", "") == "":
self.plugin.subtype = plg_subtype
# Check that the definition from the Yaml is consistent
if self.is_allowed(self.plugin.type) \
and self.plugin.subtype in self.plugin_subtypes[self.plugin.type]:
return plg_type
name = self.plugin.name
version = self.plugin.version
plg_type = self.plugin.type
plg_subtype = self.plugin.subtype
if self.is_allowed(plg_type):
matching = [k[3] for k in self.registered.keys()
if (k[0], k[1], k[2]) == (name, version, plg_type)]
if len(matching) == 1:
self.plugin.subtype = matching[0]
return plg_type
else:
matching = [k for k in self.registered.keys()
if (k[0], k[1], k[3]) == (name, version, plg_type)]
if len(matching) == 1:
self.plugin.subtype = matching[0][3]
self.plugin.type = matching[0][2]
return plg_type
# If plugin is only defined as empty with no name,
# just check that it is allowed
if name is None and version is None:
if self.is_allowed(plg_type):
return plg_type
else:
matching = [t for t in self.plugin_subtypes
if plg_type in self.plugin_subtypes[t]]
if len(matching) == 1:
self.plugin.subtype = self.plugin.type
self.plugin.type = matching[0]
return self.plugin.type
raise Exception(
"There is some error in the definition of your Yaml or, if you "
"are a developer, in the new plugin you are designing: \n"
"Trying to load the following plugin: \n"
"{}:\n"
" plugin:\n"
" name: {}\n"
" version: {}\n"
" type: {}\n"
" subtype: {}\n\n"
"Available types and subtypes are: \n{}\n"
"Please check spelling in your definition\n\n"
"If you are initializing an empty plugin, please be sure "
"to define it with name = None AND version = None.".format(
key, name, version, plg_type, plg_subtype,
"\n".join(["- {}/{}".format(t, st)
for t in self.plugin_subtypes
for st in self.plugin_subtypes[t]])
)
)
[docs]
@classmethod
def save_loaded(cls, plg):
"""Saves all loaded plugins to the class
Args:
plg (Plugin): plugin to save
"""
name = plg.plugin.name
version = plg.plugin.version
plugin_type = plg.plugin.type
plugin_subtype = plg.plugin.subtype
cls.loaded_instances[
cls.plugin_key(name, version, plugin_type, plugin_subtype)] = plg
@classmethod
def _save_refplugins(cls, plg):
"""Save the level-0 plugin attributes of a setup as reference instances.
Iterates over every attribute of ``plg`` that is itself a Plugin,
resolves its type, and stores it in ``Plugin.reference_instances`` so
it can be retrieved later as a default requirement. Should be called
only once at initialisation.
Args:
plg (Plugin): The root setup plugin to scan.
"""
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
if issubclass(type(plg_attr), Plugin):
plg_attr._load_plugin_type(attr)
name = plg_attr.plugin.name
version = plg_attr.plugin.version
plugin_type = plg_attr.plugin.type
plugin_subtype = plg_attr.plugin.subtype
setattr(plg_attr, "isreference", True)
else:
name = version = plugin_type = plugin_subtype = attr
cls.reference_instances[plugin_type] = plg_attr
cls.reference_instances["reference_setup"] = plg
@classmethod
def _save_subrefplugins(cls, plg, parent_plg_type="setup", tree=""):
"""Recursively save all sub-level plugin attributes as sub-reference instances.
Walks the full attribute tree of ``plg`` and registers every
Plugin-typed attribute in ``Plugin.subreference_instances`` keyed by
its slash-separated YAML path. Should be called only once at
initialisation.
Args:
plg (Plugin): Plugin whose attribute tree is to be scanned.
parent_plg_type (str): Plugin type of the nearest recognised
ancestor. Defaults to ``"setup"``.
tree (str): Slash-separated YAML path accumulated during recursion.
Defaults to ``""``.
"""
if not hasattr(plg, "attributes"):
return
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
if issubclass(type(plg_attr), Plugin):
plg_attr._load_plugin_type(attr, parent_plg_type)
name = plg_attr.plugin.name
version = plg_attr.plugin.version
plugin_type = plg_attr.plugin.type
plugin_subtype = plg_attr.plugin.subtype
else:
name = version = plugin_type = plugin_subtype = attr
plg_tree = "{}/{}".format(tree, attr)
cls._save_subrefplugins(plg_attr, parent_plg_type, tree=plg_tree)
if (plugin_type, plugin_subtype) in cls.subreference_instances:
cls.subreference_instances[
(plugin_type, plugin_subtype)][plg_tree] = plg_attr
else:
cls.subreference_instances[
(plugin_type, plugin_subtype)] = {plg_tree: plg_attr}
[docs]
@classmethod
def from_dict(cls, def_dict, orig_name="", convert_none=False, **kwargs):
"""Load a recursive dictionary structure into a Plugin.
Each key of ``def_dict`` becomes an attribute of the returned Plugin.
Nested dicts are loaded recursively into child Plugin instances.
Args:
def_dict (dict): The definition dictionary to load.
orig_name (str): Name label assigned to the returned plugin (stored
as ``plg.orig_name``). Defaults to ``""``.
convert_none (bool): If True, YAML keys whose value is ``None``
are initialised as empty ``Plugin()`` objects instead of
``None``. Defaults to False.
**kwargs: Forwarded to recursive calls; not used directly.
Returns:
Plugin: Plugin instance populated with the dictionary contents.
"""
# Loop over keys to be loaded
# Recursively initialize from dictionary
plg = cls()
for key in def_dict:
if isinstance(def_dict[key], dict):
setattr(
plg,
key,
cls.from_dict(
def_dict[key],
orig_name=key,
convert_none=convert_none,
**kwargs
),
)
# Initializes empty keys as Plugins
elif def_dict[key] is None and convert_none:
setattr(plg, key, cls())
else:
setattr(plg, key, def_dict[key])
# Saves the definition keys of the original dictionary into the
# output plugin
plg.attributes = list(def_dict.keys())
# Saves the name of the plugin as specified in the configuration file
plg.orig_name = orig_name
return plg
[docs]
@classmethod
def to_dict(
cls,
plg,
exclude_patterns=None,
full_output=False,
large_to_strings=False,
exclude_default=True):
"""Convert a Plugin instance to a plain nested dictionary.
Args:
plg (Plugin): Plugin instance to serialise.
exclude_patterns (list[str], optional): Additional attribute names
to exclude. The default exclusion list (``logfile``, ``datei``,
``datef``, ``workdir``, ``verbose``) is always appended unless
``exclude_default`` is False.
full_output (bool): If True, include large array-like attributes in
full rather than summarising them as type/shape strings.
Defaults to False.
large_to_strings (bool): If True, replace arrays/dicts/lists with
more than 10 elements by a short descriptive string.
Defaults to False.
exclude_default (bool): If True (default), always exclude the
standard set of runtime attributes (dates, workdir, …).
Returns:
dict: Nested dictionary representation of the plugin.
"""
default_exclude = []
if exclude_default:
default_exclude = [
"logfile", "datei", "datef", "workdir", "verbose"
]
if exclude_patterns is None:
exclude_patterns = default_exclude
else:
exclude_patterns.extend(default_exclude)
# If the input has no method 'attributes', just return it
if not hasattr(plg, "attributes") or isinstance(plg, list):
return plg
out = {}
for attr in plg.attributes:
plg_attr = getattr(plg, attr)
# Otherwise, save the attribute value
if (
not ismethod(plg_attr)
and not isinstance(plg_attr, FunctionType)
and not isinstance(plg_attr, ModuleType)
and not attr[0] == "_"
and attr not in dir(cls)
and attr
not in [
"absolute_import",
"loaded_class",
"loaded_data",
"loaded_attributes",
"loaded_requirements",
"loaded_template",
"orig_name",
"requirements",
"attributes",
"isreference",
"Method_type",
"MethodType",
"default_requirements",
"mapper",
]
+ exclude_patterns
and "Feature" not in str(plg_attr)
):
# If the attribute is a Plugin sub-class,
# recursively call to_dict
if hasattr(plg_attr, "to_dict"):
out[attr] = cls.to_dict(
plg_attr, exclude_patterns,
full_output=full_output,
large_to_strings=large_to_strings
)
continue
# Deal with lists
try:
if type(plg_attr) == str:
out[attr] = plg_attr
elif len(plg_attr) > 10 and large_to_strings:
if type(plg_attr) \
in [np.ndarray, pd.core.frame.DataFrame]:
out[attr] = "{} {}".format(
type(plg_attr), plg_attr.shape
)
elif type(plg_attr) == dict:
out[attr] = "{} {} keys".format(
type(plg_attr), len(plg_attr.keys())
)
elif type(plg_attr) == list:
out[attr] = "{} len({})".format(
type(plg_attr), len(plg_attr)
)
elif len(plg_attr) < 10 or full_output:
out[attr] = plg_attr
except TypeError:
out[attr] = plg_attr
# Removing the __class__ attribute
if "__class__" in list(out.keys()):
del out["__class__"]
return out
[docs]
@classmethod
def to_yaml(cls, plg, yaml_file, full=True):
"""Serialise a Plugin to a YAML configuration file.
Args:
plg (Plugin): Plugin instance to write.
yaml_file (str): Destination file path.
full (bool): Passed to :meth:`to_dict` (currently unused).
Defaults to True.
"""
plg_dict = cls.to_dict(plg)
with open(yaml_file, "w") as f:
ordered_dump(f, plg_dict)
[docs]
@classmethod
def from_yaml(cls, def_file):
"""Generates a dictionary including all pyCIF parameters
Args:
def_file (string) : Path to the definition file
Handles both absolute and relative paths
Returns:
config_dict (dictionary): Dictionary populated with all pyCIF
parameters
"""
yml_file = os.path.abspath(os.path.expanduser(def_file))
try:
with open(yml_file, "r") as f:
config_dict = ordered_load(f)
config_dict["def_file"] = yml_file
if "datei" in config_dict:
# Converting dates to datetime if necessary
config_dict["datei"] = dates.date2datetime(
config_dict["datei"]
)
config_dict["datef"] = dates.date2datetime(
config_dict["datef"]
)
return config_dict
except IOError as e:
print("Couldn't find config file: {}".format(yml_file))
print("Please check directories")
raise e
except yaml.scanner.ScannerError as e:
print("Error in the syntax of config file: {}".format(yml_file))
raise e
[docs]
@classmethod
def print_default(cls, plg):
"""Print default parameter values for a plugin, if available.
Args:
plg (Plugin): Plugin instance whose ``default_values`` to display.
"""
if not hasattr(plg, "default_values"):
print(
"""{} ({}, {}, {}) has no default values""".format(
plg, plg.plugin.name, plg.plugin.version, plg.plugin.type
)
)
return
print(
"""The default values of {} ({}, {}, {}) are:""".format(
plg, plg.plugin.name, plg.plugin.version, plg.plugin.type
)
)
for k in plg.default_values:
print("- {}:\t{}".format(k, plg.default_values[k]))
[docs]
@classmethod
def childclass_factory(cls, plg_orig, child_type=None,
parent_plg=None,
overwrite=False):
"""Generates an instance of one of Plugin's child classes. Transfers
all existing attributes in the argument plugin to the output
child-class instance
Args:
plg_orig (Plugin): the plugin to turn into a child-class instance
child_type (str): sub-class type to generate if not available in
plg_orig.plugin.type
overwrite (bool): overwrite the class type of the origin plugin
Return:
child_plg: a plugin with all the attributes from plg_orig,
but as a child-class instance
"""
plg_type = getattr(plg_orig.plugin, "type", None)
if plg_type is None or plg_type == "setup":
if child_type is None and plg_type is None:
raise PluginError(
"The Child-class factory was called on a plugin "
"that was not correctly initialized: {} / {}".format(
plg_orig, plg_orig.orig_name
)
)
else:
plg_type = child_type
plg_orig._load_plugin_type(plg_type)
else:
plg_type = plg_orig.plugin.type
# If the type is not referenced, don't do anything
if not cls.is_allowed(plg_type):
return plg_orig
# Load the subclass
child_plg = cls.get_subclass(plg_type)(plg_orig=plg_orig)
# Replace name and version if empty
if parent_plg is not None and hasattr(parent_plg, "plugin"):
if child_plg.plugin.name is None:
child_plg.plugin.name = parent_plg.plugin.name
child_plg.plugin.from_parent = True
if child_plg.plugin.version is None:
child_plg.plugin.version = parent_plg.plugin.version
child_plg.plugin.from_parent = True
return child_plg
[docs]
def initiate_template(self, plg_type=None, default_functions={}):
"""Initialise a Plugin template by loading functions from the registered module.
Calls :meth:`initiate` to fetch the module, then attaches the functions
listed in ``default_functions`` to ``self``, either as bound methods or
as plain function references. Also attaches ``set_requirements`` if
defined in the module.
Args:
plg_type (str, optional): Plugin type passed to :meth:`initiate`.
Defaults to None (inferred from ``self.plugin.type``).
default_functions (dict[str, bool]): Mapping of function name →
``True`` if the function should be bound as a method (receives
``self`` as first argument), ``False`` for a static attachment.
"""
# First load the module
module = self.initiate(plg_type=plg_type)
# Attach the list of functions
for f in default_functions:
if not hasattr(module, f):
continue
if default_functions[f]:
setattr(self, f, MethodType(getattr(module, f), self))
else:
setattr(self, f, getattr(module, f))
# Add set_requirements
if hasattr(module, "set_requirements"):
self.set_requirements = MethodType(module.set_requirements, self)
[docs]
def initiate(self, plg_type=None):
"""Initializes a Plugin, i.e., loads functions from registered
plugins
Args:
plg_type (str): the type of plugin to load; this should
correspond to one of the defined child-classes
Return:
module: a python module as registered in pyCIF
"""
# It there is no attribute 'plugin', can't initialize anything as
# python will not know the plugin type, name and version
plugin = getattr(self, "plugin", None)
if plugin is None:
return
# Load plugin IDs
name = getattr(plugin, "name", None)
version = getattr(plugin, "version", None)
plg_subtype = getattr(plugin, "subtype", "")
if plg_type is None:
plg_type = plugin.type
# Load registered module if the Plugin's name and version are not
# default empty strings
if name is not None:
try:
module = self.get_registered(
name, version, plg_type, plg_subtype)
# Attributing all module functions to the plugin
functions = [
f
for f in dir(module)
if f not in getattr(module, "attributes", [])
and f[0] != "_"
and f != "attributes"
]
for attr in functions:
f = getattr(module, attr)
if not isinstance(f, ModuleType):
setattr(self, attr, f)
# Ini_data should be define as a Method Type
if hasattr(module, "ini_data"):
self.ini_data = MethodType(module.ini_data, self)
except PluginError as e:
# If plugin initialized from parent, ignore
if not getattr(plugin, "from_parent", False):
raise e
else:
self.plugin.name = None
self.plugin.version = None
return
return module
[docs]
@classmethod
def flushall(cls):
"""Reset all class-level plugin caches.
Clears ``loaded_instances``, ``reference_instances`` and
``subreference_instances``. Useful between independent simulation runs
or in test suites to avoid cross-contamination between setups.
"""
cls.subreference_instances = {}
cls.loaded_instances = {}
cls.reference_instances = {}
# import sys
# mod2flush = [mod for mod in sys.modules if "pycif" in mod]
# for mod in mod2flush:
# del sys.modules[mod]
[docs]
def set_requirements(self):
"""Update requirements depending on the Plugin properties
By default, this method does nothing. It can be included in any new
Plugin depending on the developer needs.
Args:
self (Plugin): the Plugin to update
"""
return
[docs]
@classmethod
def dump_incorrect(cls, file_dump):
"""Dump undeclared YAML arguments to a text file for inspection.
Writes one line per plugin tree path listing the argument names that
were found in the YAML but are not declared in the plugin's
``input_arguments``.
Args:
file_dump (str): Path to the output file.
"""
with open(file_dump, "w") as f:
for k in cls.unauthorized_arguments:
f.write(f"- {k}: {cls.unauthorized_arguments[k]}\n")