import inspect
import subprocess
import os
from time import sleep
import numpy as np
from logging import debug, error, info, warning
from .utils import n_jobs_running
[docs]
def get_template_job(self):
job_template = "\n".join((
"#!/bin/bash -l",
'#SBATCH --uenv="icon-wcp/v1:rc4"',
'#SBATCH --job-name="{jobname}"',
"#SBATCH --time={time}",
"#SBATCH --account={project}",
"#SBATCH --nodes={nodes}",
"#SBATCH --ntasks-per-node={cores}",
"#SBATCH --ntasks-per-core=1",
"#SBATCH --cpus-per-task=1",
"#SBATCH --partition={queue}",
"#SBATCH --constraint={constraint}\n",
))
if self.memory:
job_template += "\n".join((
"#SBATCH --mem={memory}GB\n",
))
job_template += "\n".join((
"#SBATCH --chdir={rootdir}",
"#SBATCH --output={rootdir}/sbatch_log",
))
job_template += "\n".join((
'',
"export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK",
'',
"echo done",
'',
"{env_variables}",
'',
"{exe}",
))
return job_template
[docs]
def init_job(self, exe, job_file, jobname='CIF'):
self.env_variables = getattr(self, "env_variables", {})
env_variables = "" if not hasattr(self, "env_variables") \
else "\n".join(
["setenv {} {}".format(key, self.env_variables[key])
for key in self.env_variables]
)
if not hasattr(self, "memory"):
self.memory = False
job_template = get_template_job(self)
job_str = job_template.format(
jobname=jobname,
project=self.project,
queue=self.queue,
nodes=self.nodes if self.queue != 'debug' else min(self.nodes, 10),
cores=self.cores,
time=self.time,
rootdir=os.path.dirname(job_file),
constraint=self.constraint,
memory=min(self.memory, 120) if self.queue != 'debug' else min(self.memory, 10),
exe=exe,
env_variables=env_variables
)
with open(job_file, "w") as f:
f.write(job_str)
[docs]
def submit_job(self, exe, job_file, jobname='CIF'):
"""Submit a job to cluster with SBATCH"""
# Initialize the job file to be submitted
self.init_job(self, exe, job_file, jobname=jobname)
n_jobs_in_run = n_jobs_running(self)
info(f'{n_jobs_in_run} jobs are in queue.')
if self.queue == "debug" and n_jobs_in_run > 1:
# Debug queue has a maximum of jobs it can run
warning(
f"{self.queue} queue is full. "
f"CIF will check in {self.sleep_time}s if the queue is free. "
"Alternatively, try running the job on the 'normal' queue."
)
# Wait one minute and try again to submit the job
sleep(self.sleep_time)
return self.submit_job(exe, job_file)
# Submitting the job
info(f"Submitting job with exe = {exe} on daint from batch file job_file = {job_file}")
process = subprocess.Popen(
"sbatch {}".format(job_file),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file)
)
process.wait()
stdout, stderr = process.communicate()
error_message = stderr.decode('utf-8')
if error_message:
err_filepath = "{}/log.stderr".format(os.path.dirname(job_file))
error(
f"Errors while running {exe}.\n"
f"They are logged in {err_filepath}."
)
with open(err_filepath, "w") as err_file:
err_file.write(error_message)
# Job number is given by the last str of the sbatch output
try:
sbatch_job_number = stdout.decode('utf-8').strip().split()[-1]
return sbatch_job_number
except IndexError:
raise PermissionError(
"The job could not be submitted with the given settings.\n"
f"Checked the errors in {err_filepath}."
)
[docs]
def check_jobs(self, list_jobs):
"""Check if the given jobs are still active."""
debug(f"Checking jobs from list_jobs={list_jobs}")
if len(list_jobs) == 0:
return True
squeue_cmd = f"squeue -j {','.join(list_jobs)}"
process = subprocess.Popen(
squeue_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
debug(f"{squeue_cmd}: \n{stdout.decode('utf-8')}")
# Will have
stdout_lines = stdout.decode('utf-8').split("\n")
if len(stdout_lines) < 2:
# If the message is too short, it does not contain the output of squeue
debug("Problem with the squeue process communication.")
debug(stdout_lines)
return self.check_jobs(list_jobs)
# Sorting active jobs
active_jobs = len(list_jobs) * [False]
# Read each lines after the header and before the last one (empty \n)
for ln in stdout_lines[1:-1]:
# The jobid is the first word of the line
jobid = ln.split()[0]
active_jobs[list_jobs.index(jobid)] = True
ret = np.all(~np.array(active_jobs))
debug(f"check_jobs: -> {ret}")
return ret