import os
import subprocess
from logging import info
from pathlib import Path
from typing import List
[docs]
def init_job(self, command: str, job_file) -> None:
"""Write a job shell file for the CCRT cluster
Args:
command (str): command to execute
job_file (str): path to the job file
"""
# Some input argument mandatory for submiting jobs are marked as optional
# we only check if they are present when submitting a job within pycif
mandatory_arguments = ["partition", "project"]
for arg in mandatory_arguments:
if not hasattr(self, arg):
raise ValueError(f"input argument '{arg}' is mandatory for submitting jobs")
self.env_variables = getattr(self, 'env_variables', {})
job_file = Path(job_file)
job_name = os.getenv("BRIDGE_MSUB_REQNAME", "pycif_job")
job_name = f"{job_name}_{job_file.stem}"
out_file = job_file.with_suffix(".%I.out")
# Shebang and job arguments
lines = [
"#!/usr/bin/bash",
"",
f"#MSUB -r {job_name}"
f"#MSUB -o {out_file}"
f"#MSUB -e {out_file}"
f"#MSUB -A {self.project}",
f"#MSUB -q {self.partition}",
f"#MSUB -n {self.nodes}",
f"#MSUB -c {self.cores}",
f"#MSUB -Q {self.qos}",
f"#MSUB -T {self.walltime}",
f"#MSUB -m {self.filesystem}",
"",
]
# Setup environment modules
if hasattr(self, "job_env"):
lines.extend(self.job_env)
lines.append("")
else:
lines.extend(
[
"module purge",
f"module load intel/20 mpi/openmpi/4 {self.python_module}",
"module load flavor/hdf5/parallel netcdf-fortran",
"module load grib",
"",
]
)
# Environement variables
lines.extend([f"export {key}={val}\n" for key, val in self.env_variables.items()])
lines.append("")
# Python virtual environment
if hasattr(self, 'python_venv'):
activate_file = os.path.join(self.python_venv, "bin/activate")
if not os.path.isfile(activate_file):
raise FileNotFoundError(
"The python virtual environment at "
f"'{self.python_venv}' was not found"
)
lines.append(f"source {activate_file}")
lines.append("")
# Command
lines.append(command)
lines.append("")
# Deactivate Python virtual environment
if hasattr(self, 'python_venv'):
lines.append("deactivate")
lines.append("")
# Writting job file
with open(job_file, "w") as f:
f.write('\n'.join(lines))
[docs]
def submit_job(self, command: str, job_file: str) -> str:
"""Write and submit a job file to cluster scheduler with ccc_msub
Args:
command (str): command to execute
job_file (str): path to the job file
Returns:
str: job id if ccc_msub is used, else "" (empty string)
"""
# Initialize the job file to be submitted
self.init_job(self, command, job_file)
if self.submit_msub:
info("Submitting job on TGCC-CCRT")
sub_command = "ccc_msub"
else:
info("Running job in subprocess")
sub_command = "bash"
# Submitting the job
process = subprocess.Popen(
[sub_command, job_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file),
)
stdout, _ = process.communicate()
if process.returncode != 0:
raise RuntimeError(
f"Error while running command '{command}' in job file "
f"'{job_file}' with '{sub_command}' command."
)
if self.submit_msub:
return stdout.decode('utf-8').strip()
else:
return ""
[docs]
def check_jobs(self, job_id_list: List[str]) -> bool:
"""Check if the provieded jobs are running
Args:
job_id_list (list of str): ids of the job to check
Returns:
bool: True if no job is running, False if at least one job is still running
"""
# Filter empty strings (may be returned by submit_job when ''submit_qsub'
# arguement is set to False)
filtered_job_id_list = [job_id for job_id in job_id_list if job_id]
if not filtered_job_id_list:
return True
# Running 'ccc_mpp -H -n -u ${USER}' command
process = subprocess.Popen(
["ccc_mpp", "-H", "-n", "-u", "${USER}"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, _ = process.communicate()
# Parsing command output
lines = stdout.decode('utf-8').split("\n")
for ln in lines:
# Skip empty lines
if not ln:
continue
job_id = ln.split()[2]
if job_id in filtered_job_id_list:
# At least one job is still running
return False
# No job is running
return True