Source code for pycif.utils.classes.platforms

from types import MethodType
import os
import subprocess
from .baseclass import Plugin


[docs] class Platform(Plugin): """Plugin type for HPC job submission platforms. Handles job submission and status checking for platforms such as SLURM, PBS or a local interactive shell. Concrete plugins override :meth:`submit_job` and :meth:`check_jobs`. Concrete implementations live in ``pycif/plugins/platforms/``. """ def __init__(self, **kwargs): """Initialise a Platform plugin with default job management settings.""" super(Platform, self).__init__(**kwargs) # Default attributes self.sleep_time = 5 self.max_active_jobs = 40 self.env_variables = {}
[docs] def initiate_template(self): """Initialise the Platform plugin template. Loads the registered platform module and attaches ``submit_job`` and ``check_jobs`` as bound methods on this instance. """ super(Platform, self).initiate_template( plg_type="platform", default_functions={"submit_job": True, "check_jobs": True} )
[docs] @classmethod def register_plugin(cls, name, version, module, subtype="", **kwargs): """Register a module for a plugin and version with possibly options Args: name (str): name of the plugin version (str): version of the plugin module (types.ModuleType): module defining the interface between pyCIF and the plugin plugin_type (str): type of plugin **kwargs (dictionary): default options for module """ super(Platform, cls).register_plugin( name, version, module, plugin_type="platform", subtype=subtype )
[docs] def submit_job(self, exe, job_file): """Submit a job script to the platform. Default implementation runs ``exe`` as a subprocess in the directory containing ``job_file`` and waits for it to complete. Args: exe (str): Command (and arguments) to execute. job_file (str): Path to the job script; its parent directory is used as the working directory. """ process = subprocess.Popen( exe.split(), cwd=os.path.dirname(job_file), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate()
[docs] def check_jobs(self, list_jobs): """Check whether a list of submitted jobs have completed. Default implementation always returns True (assumes synchronous execution where jobs complete before this method is called). Args: list_jobs (list): List of job identifiers to check. Returns: bool: True if all jobs are finished. """ return True