Source code for pycif.plugins.platforms.lsce_obelix_nvidia.jobs
import os
import subprocess
from logging import info
from typing import List
[docs]
def init_job(self, command: str, job_file: str) -> None:
"""Write a job shell file for the obelix cluster (PBS scheduler)
Parameters
----------
command : str
command to execute
job_file : str
path to the job file
"""
self.env_variables = getattr(self, 'env_variables', {})
# Shebang and queue
lines = ["#!/usr/bin/sh\n",
"\n",
f"#PBS -q {self.queue}\n"]
if self.queue == 'gpus' and hasattr(self, 'cores'):
raise ValueError(
"Cannot allocate cores with 'gpus' queue. "
"Remove the 'cores' argument or change the 'queue' argument"
)
else:
# Ressource allocation
if self.queue != "gpus":
cores = self.cores if hasattr(self, 'cores') else 1
self.env_variables.update({'HDF5_USE_FILE_LOCKING': "FALSE"})
if hasattr(self, 'nodes'):
lines.append(f"#PBS -l nodes={self.nodes}:ppn={cores}\n")
else:
lines.append(f"#PBS -l nodes={cores}\n")
# Python module
if self.queue == 'gpus':
lines.extend(["\n",
"module purge\n",
"module load python/3\n",
"module load nvenv\n",
"\n"])
else:
lines.extend(["\n",
"module purge\n",
"module load python/3\n",
"\n"])
# Environement variables
lines.extend([f"export {key}={val}\n" for key,
val in self.env_variables.items()])
# Command
lines.extend(["\n",
f"{command}\n",
"\n"])
# Writting job file
with open(job_file, "w") as f:
f.writelines(lines)
[docs]
def submit_job(self, command: str, job_file: str) -> str:
"""Write and submit a job file to cluster scheduler with qsub
Parameters
----------
command : str
command to execute
job_file : str
path to the job file
Returns
-------
str
job id if qsub is used, else "" (empty string)
"""
# Initialize the job file to be submitted
init_job(self, command, job_file)
if self.submit_qsub:
#if self.queue == 'gpus':
# Temporary
#raise RuntimeError
info("Submitting job on Obelix server")
sub_command = "qsub"
else:
info("Running job in subprocess")
sub_command = "bash"
# Submitting the job
process = subprocess.Popen(
[sub_command, job_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file)
)
stdout, _ = process.communicate()
if process.returncode != 0:
raise RuntimeError(
f"Error while running command '{command}' in job file "
f"'{job_file}' with '{sub_command}' command."
)
if self.submit_qsub:
return stdout.decode('utf-8').strip()
else:
return ""
[docs]
def check_jobs(self, job_id_list: List[str]) -> bool:
"""Check if the provieded jobs are running
Parameters
----------
list_jobs : list of str
ids of the job to check
Returns
-------
bool
True if no job is running, False if at least one job is still running
"""
# Filter empty strings (may be returned by submit_job when ''submit_qsub'
# arguement is set to False)
filtered_job_id_list = [job_id for job_id in job_id_list if job_id]
if not filtered_job_id_list:
return True
# Running 'qstat -u ${USER}' command
process = subprocess.Popen(
["qstat", "-u", "${USER}"],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, _ = process.communicate()
# Parsing command output
lines = stdout.decode('utf-8').split("\n")
for ln in lines:
# Skip empty lines
if not ln:
continue
job_id = ln.split()[0]
if job_id in filtered_job_id_list:
# At least one job is still running
return False
# No job is running
return True