Source code for pycif.plugins.platforms.lsce_obelix_nvidia.jobs

import os
import subprocess
from logging import info
from typing import List


[docs] def init_job(self, command: str, job_file: str) -> None: """Write a job shell file for the obelix cluster (PBS scheduler) Parameters ---------- command : str command to execute job_file : str path to the job file """ self.env_variables = getattr(self, 'env_variables', {}) # Shebang and queue lines = ["#!/usr/bin/sh\n", "\n", f"#PBS -q {self.queue}\n"] if self.queue == 'gpus' and hasattr(self, 'cores'): raise ValueError( "Cannot allocate cores with 'gpus' queue. " "Remove the 'cores' argument or change the 'queue' argument" ) else: # Ressource allocation if self.queue != "gpus": cores = self.cores if hasattr(self, 'cores') else 1 self.env_variables.update({'HDF5_USE_FILE_LOCKING': "FALSE"}) if hasattr(self, 'nodes'): lines.append(f"#PBS -l nodes={self.nodes}:ppn={cores}\n") else: lines.append(f"#PBS -l nodes={cores}\n") # Python module if self.queue == 'gpus': lines.extend(["\n", "module purge\n", "module load python/3\n", "module load nvenv\n", "\n"]) else: lines.extend(["\n", "module purge\n", "module load python/3\n", "\n"]) # Environement variables lines.extend([f"export {key}={val}\n" for key, val in self.env_variables.items()]) # Command lines.extend(["\n", f"{command}\n", "\n"]) # Writting job file with open(job_file, "w") as f: f.writelines(lines)
[docs] def submit_job(self, command: str, job_file: str) -> str: """Write and submit a job file to cluster scheduler with qsub Parameters ---------- command : str command to execute job_file : str path to the job file Returns ------- str job id if qsub is used, else "" (empty string) """ # Initialize the job file to be submitted init_job(self, command, job_file) if self.submit_qsub: #if self.queue == 'gpus': # Temporary #raise RuntimeError info("Submitting job on Obelix server") sub_command = "qsub" else: info("Running job in subprocess") sub_command = "bash" # Submitting the job process = subprocess.Popen( [sub_command, job_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file) ) stdout, _ = process.communicate() if process.returncode != 0: raise RuntimeError( f"Error while running command '{command}' in job file " f"'{job_file}' with '{sub_command}' command." ) if self.submit_qsub: return stdout.decode('utf-8').strip() else: return ""
[docs] def check_jobs(self, job_id_list: List[str]) -> bool: """Check if the provieded jobs are running Parameters ---------- list_jobs : list of str ids of the job to check Returns ------- bool True if no job is running, False if at least one job is still running """ # Filter empty strings (may be returned by submit_job when ''submit_qsub' # arguement is set to False) filtered_job_id_list = [job_id for job_id in job_id_list if job_id] if not filtered_job_id_list: return True # Running 'qstat -u ${USER}' command process = subprocess.Popen( ["qstat", "-u", "${USER}"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, _ = process.communicate() # Parsing command output lines = stdout.decode('utf-8').split("\n") for ln in lines: # Skip empty lines if not ln: continue job_id = ln.split()[0] if job_id in filtered_job_id_list: # At least one job is still running return False # No job is running return True