Source code for pycif.utils.classes.platforms
from types import MethodType
import os
import subprocess
from .baseclass import Plugin
[docs]
class Platform(Plugin):
"""Plugin type for HPC job submission platforms.
Handles job submission and status checking for platforms such as SLURM,
PBS or a local interactive shell. Concrete plugins override
:meth:`submit_job` and :meth:`check_jobs`.
Concrete implementations live in ``pycif/plugins/platforms/``.
"""
def __init__(self, **kwargs):
"""Initialise a Platform plugin with default job management settings."""
super(Platform, self).__init__(**kwargs)
# Default attributes
self.sleep_time = 5
self.max_active_jobs = 40
self.env_variables = {}
[docs]
def initiate_template(self):
"""Initialise the Platform plugin template.
Loads the registered platform module and attaches ``submit_job`` and
``check_jobs`` as bound methods on this instance.
"""
super(Platform, self).initiate_template(
plg_type="platform",
default_functions={"submit_job": True, "check_jobs": True}
)
[docs]
@classmethod
def register_plugin(cls, name, version, module, subtype="", **kwargs):
"""Register a module for a plugin and version with possibly options
Args:
name (str): name of the plugin
version (str): version of the plugin
module (types.ModuleType): module defining the interface
between pyCIF and the plugin
plugin_type (str): type of plugin
**kwargs (dictionary): default options for module
"""
super(Platform, cls).register_plugin(
name, version, module, plugin_type="platform", subtype=subtype
)
[docs]
def submit_job(self, exe, job_file):
"""Submit a job script to the platform.
Default implementation runs ``exe`` as a subprocess in the directory
containing ``job_file`` and waits for it to complete.
Args:
exe (str): Command (and arguments) to execute.
job_file (str): Path to the job script; its parent directory is
used as the working directory.
"""
process = subprocess.Popen(
exe.split(),
cwd=os.path.dirname(job_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
[docs]
def check_jobs(self, list_jobs):
"""Check whether a list of submitted jobs have completed.
Default implementation always returns True (assumes synchronous
execution where jobs complete before this method is called).
Args:
list_jobs (list): List of job identifiers to check.
Returns:
bool: True if all jobs are finished.
"""
return True