from __future__ import annotations
import os
import subprocess
from logging import info
from pathlib import Path
[docs]
def init_job(self, command: str, job_file) -> None:
"""Write a job shell file for the JEAN-ZAY cluster
Args:
command (str): command to execute
job_file (str): path to the job file
"""
# Some input argument mandatory for submiting jobs are marked as optional
# we only check if they are present when submitting a job within pycif
mandatory_arguments = ["project", "allocation", "qos"]
for arg in mandatory_arguments:
if not hasattr(self, arg):
raise ValueError(f"input argument '{arg}' is mandatory for submitting jobs")
self.env_variables = getattr(self, "env_variables", {})
job_file = Path(job_file)
prefix = os.getenv("SLURM_JOB_NAME", "pycif_job")
job_name = f"{prefix}_{job_file.stem}"
# Shebang, job name, output and partition
lines = [
"#!/usr/bin/bash",
"",
f"#SBATCH --job-name={job_name}",
"#SBATCH --output=%x.%j.out",
"#SBATCH --error=%x.%j.out",
"",
f"#SBATCH --account={self.project}@{self.allocation}",
]
if hasattr(self, "constraint"):
lines.append(f"#SBATCH -C={self.constraint}")
if hasattr(self, "partition"):
lines.append(f"#SBATCH --partition={self.partition}")
lines.append(f"#SBATCH --qos={self.qos}")
# Ressource allocation, nodes, cores, etc.
if self.allocation == "cpu":
raise NotImplementedError("CPU jobs are not implemented yet for Jean-Zay")
else:
if self.nodes != 1:
raise ValueError("GPU jobs can not be submitted with more than 1 node")
lines.extend(
[
"",
f"#SBATCH --nodes={self.nodes}",
"#SBATCH --ntasks-per-node=1",
f"#SBATCH --cpus-per-task={self.cores}",
"#SBATCH --gres=gpu:1",
"#SBATCH --hint=nomultithread",
"",
]
)
# Walltime, modules
lines.extend(
[
f"#SBATCH --time={self.walltime}",
"",
"module purge",
"module load git/2.39.1",
"module load gcc/14.2.0",
f"module load {self.python_module}",
"module load gdal/3.11.0",
"",
]
)
# Python virtual environment
if hasattr(self, "python_venv"):
activate_file = Path(self.python_venv, "bin", "activate")
if not activate_file.is_file():
raise FileNotFoundError(
f"The python virtual environment at '{self.python_venv}' was not found"
)
lines.append(f"source {activate_file}")
lines.append("")
# Command
lines.append(command)
lines.append("")
# Deactivate Python virtual environment
if hasattr(self, "python_venv"):
lines.append("deactivate")
lines.append("")
# Writting job file
with open(job_file, "w") as f:
f.write("\n".join(lines))
[docs]
def submit_job(self, command: str, job_file: str) -> str:
"""Write and submit a job file to cluster scheduler with sbatch
Args:
command (str): command to execute
job_file (str): path to the job file
Returns:
str: job id if sbatch is used, else "" (empty string)
"""
# Initialize the job file to be submitted
self.init_job(self, command, job_file)
if self.submit_sbatch:
info("Submitting job on JEAN-ZAY")
sub_command = "sbatch"
else:
info("Running job in subprocess")
sub_command = "bash"
# Submitting the job
process = subprocess.run(
[sub_command, job_file],
stdout=subprocess.PIPE,
cwd=Path(job_file).parent,
check=True,
)
if process.returncode != 0:
raise RuntimeError(
f"Error while running command '{command}' in job file "
f"'{job_file}' with '{sub_command}' command."
)
if self.submit_sbatch:
job_id = process.stdout.decode("utf-8").strip().split()[-1]
return job_id
else:
return ""
[docs]
def check_jobs(self, job_id_list: list[str]) -> bool:
"""Check if the provieded jobs are running
Args:
job_id_list (list of str): ids of the job to check
Returns:
bool: True if no job is running, False if at least one job is still running
"""
# Filter empty strings (may be returned by submit_job when ''submit_qsub'
# arguement is set to False)
filtered_job_id_list = [job_id for job_id in job_id_list if job_id]
if not filtered_job_id_list:
return True
for job_id in filtered_job_id_list:
process = subprocess.run(
["squeue", "--noheader", f"--name={job_id}", r"--user=${USER}"],
stdout=subprocess.PIPE,
check=True,
)
squeue_output = process.stdout.decode("utf-8").strip()
if squeue_output:
# At least one job is still running
return False
# No job is running
return True