Source code for pycif.plugins.platforms.jean_zay.jobs

from __future__ import annotations

import os
import subprocess
from logging import info
from pathlib import Path


[docs] def init_job(self, command: str, job_file) -> None: """Write a job shell file for the JEAN-ZAY cluster Args: command (str): command to execute job_file (str): path to the job file """ # Some input argument mandatory for submiting jobs are marked as optional # we only check if they are present when submitting a job within pycif mandatory_arguments = ["project", "allocation", "qos"] for arg in mandatory_arguments: if not hasattr(self, arg): raise ValueError(f"input argument '{arg}' is mandatory for submitting jobs") self.env_variables = getattr(self, "env_variables", {}) job_file = Path(job_file) prefix = os.getenv("SLURM_JOB_NAME", "pycif_job") job_name = f"{prefix}_{job_file.stem}" # Shebang, job name, output and partition lines = [ "#!/usr/bin/bash", "", f"#SBATCH --job-name={job_name}", "#SBATCH --output=%x.%j.out", "#SBATCH --error=%x.%j.out", "", f"#SBATCH --account={self.project}@{self.allocation}", ] if hasattr(self, "constraint"): lines.append(f"#SBATCH -C={self.constraint}") if hasattr(self, "partition"): lines.append(f"#SBATCH --partition={self.partition}") lines.append(f"#SBATCH --qos={self.qos}") # Ressource allocation, nodes, cores, etc. if self.allocation == "cpu": raise NotImplementedError("CPU jobs are not implemented yet for Jean-Zay") else: if self.nodes != 1: raise ValueError("GPU jobs can not be submitted with more than 1 node") lines.extend( [ "", f"#SBATCH --nodes={self.nodes}", "#SBATCH --ntasks-per-node=1", f"#SBATCH --cpus-per-task={self.cores}", "#SBATCH --gres=gpu:1", "#SBATCH --hint=nomultithread", "", ] ) # Walltime, modules lines.extend( [ f"#SBATCH --time={self.walltime}", "", "module purge", "module load git/2.39.1", "module load gcc/14.2.0", f"module load {self.python_module}", "module load gdal/3.11.0", "", ] ) # Python virtual environment if hasattr(self, "python_venv"): activate_file = Path(self.python_venv, "bin", "activate") if not activate_file.is_file(): raise FileNotFoundError( f"The python virtual environment at '{self.python_venv}' was not found" ) lines.append(f"source {activate_file}") lines.append("") # Command lines.append(command) lines.append("") # Deactivate Python virtual environment if hasattr(self, "python_venv"): lines.append("deactivate") lines.append("") # Writting job file with open(job_file, "w") as f: f.write("\n".join(lines))
[docs] def submit_job(self, command: str, job_file: str) -> str: """Write and submit a job file to cluster scheduler with sbatch Args: command (str): command to execute job_file (str): path to the job file Returns: str: job id if sbatch is used, else "" (empty string) """ # Initialize the job file to be submitted self.init_job(self, command, job_file) if self.submit_sbatch: info("Submitting job on JEAN-ZAY") sub_command = "sbatch" else: info("Running job in subprocess") sub_command = "bash" # Submitting the job process = subprocess.run( [sub_command, job_file], stdout=subprocess.PIPE, cwd=Path(job_file).parent, check=True, ) if process.returncode != 0: raise RuntimeError( f"Error while running command '{command}' in job file " f"'{job_file}' with '{sub_command}' command." ) if self.submit_sbatch: job_id = process.stdout.decode("utf-8").strip().split()[-1] return job_id else: return ""
[docs] def check_jobs(self, job_id_list: list[str]) -> bool: """Check if the provieded jobs are running Args: job_id_list (list of str): ids of the job to check Returns: bool: True if no job is running, False if at least one job is still running """ # Filter empty strings (may be returned by submit_job when ''submit_qsub' # arguement is set to False) filtered_job_id_list = [job_id for job_id in job_id_list if job_id] if not filtered_job_id_list: return True for job_id in filtered_job_id_list: process = subprocess.run( ["squeue", "--noheader", f"--name={job_id}", r"--user=${USER}"], stdout=subprocess.PIPE, check=True, ) squeue_output = process.stdout.decode("utf-8").strip() if squeue_output: # At least one job is still running return False # No job is running return True