import subprocess
import os
import numpy as np
from . import modules_list
from logging import info
job_template = {
"start":(
"{env_variables}\n"
"\n#MSUB -m scratch,work,store\n"
"#MSUB -r pycif\n"
"#MSUB -Q long\n"
"#MSUB -T 259200\n"
"#MSUB -q {partition}\n"
"#MSUB -A {project}\n"
),
"resource":(
"#MSUB -n {nodes}\n"
"#MSUB -c {cores}\n"
),
"only_cores":(
"#MSUB -n {cores}\n"
),
"end":(
"#MSUB -e {rootdir}/pycif_{testdir}.%I.e\n"
"#MSUB -o {rootdir}/pycif_{testdir}.%I.o\n"
"\n{modules}\n"
"\nunset PYTHONUSERBASE\n"
"\nunset LD_PRELOAD\n"
"unset BRIDGE_MPRUN_PRELOAD\n"
"echo $SLURM_NODELIST > machine_node_{partition}\n"
"\n{exe}\n"
"\n{deactivate}\n"
)
}
[docs]
def init_job(self, exe, job_file):
"""Write an ``#MSUB`` batch script for the TGCC ``ccc_msub`` scheduler.
Args:
self (Plugin): platform plugin instance.
exe (str): shell command to execute inside the job.
job_file (str): path where the job script will be written.
"""
if self.pythonuserbase=="":
raise Exception(
"No Python userbase defined!\n"
"Please check your configuration file."
)
self.env_variables = getattr(self, "env_variables", {})
self.env_variables.update(
{"NUMEXPR_MAX_THREADS ": ["#setenv",self.cores],
"PYTHONUSERBASE=": ["#export",self.pythonuserbase]}
)
env_variables = "" if not hasattr(self, "env_variables") \
else "\n".join(
["{} {}{}".format(self.env_variables[key][0], key, self.env_variables[key][1])
for key in self.env_variables]
)
if self.python_venv != "":
env_variables += f"\nsource {self.python_venv}"
deactivate = "deactivate"
modules_str = ""
else:
deactivate = ""
modules_str = "\n".join(["module {}".format(m) for m in modules_list.modules[self.partition]])
if self.resource == "only_cores":
keys = ['start', 'only_cores', 'end']
job_temp = ''
job_temp = job_temp.join([job_template.get(key) for key in keys])
job_str = job_temp.format(
partition=self.partition,
project=self.project,
cores=self.cores,
modules=modules_str,
rootdir=os.path.dirname(job_file),
testdir=os.path.basename(os.path.dirname(job_file)),
env_variables=env_variables,
exe=exe,
deactivate=deactivate)
else:
keys = ['start', 'resource', 'end']
job_temp = ''
job_temp = job_temp.join([job_template.get(key) for key in keys])
job_str = job_temp.format(
partition=self.partition,
project=self.project,
nodes=self.nodes,
cores=self.cores,
modules=modules_str,
rootdir=os.path.dirname(job_file),
testdir=os.path.basename(os.path.dirname(job_file)),
env_variables=env_variables,
exe=exe,
deactivate=deactivate)
with open(job_file, "w") as f:
f.write(job_str)
[docs]
def submit_job(self, exe, job_file, **kwargs):
"""Submit a job to cluster with msub"""
# Prepare optional job dependency
job_id = kwargs.get('job_id', None)
job_dependency = "" if not job_id \
else ("-a {}".format(job_id))
# Compute resource
if (
self.model.plugin.name == "CHIMERE"
and self.model.plugin.version == "std"
and self.resource == "auto"
):
auto_job_resource(self)
# Initialize the job file to be submitted
self.init_job(self, exe, job_file)
# Submitting the job
info("Submitting job on TGCC-CCRT server")
process = subprocess.Popen(
"ccc_msub {} {}".format(job_dependency,job_file),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file)
)
stdout, stderr = process.communicate()
return stdout.decode('utf-8').strip()
[docs]
def auto_job_resource(self):
"""Set ``nodes`` and ``cores`` automatically for CHIMERE (``resource = 'auto'``).
Computes the minimum number of nodes and the optimal core count per node
so that ``nzdoms × nmdoms + 1`` MPI tasks fit with minimal waste.
"""
# Monitor resource - cores and nodes
nb_procs_sub = self.model.nzdoms * self.model.nmdoms + 1
min_nb_nodes = divmod(nb_procs_sub - 1, self.max_procs_node)[0] + 1
ideal_nb_procs_per_nodes = divmod(nb_procs_sub - 1, min_nb_nodes)[0] + 1
self.nodes = min_nb_nodes
self.cores = ideal_nb_procs_per_nodes
[docs]
def check_jobs(self, list_jobs):
"""Return ``True`` when all listed jobs have left the TGCC scheduler queue.
Args:
self (Plugin): platform plugin instance.
list_jobs (list[str]): job identifiers returned by ``submit_job``.
Returns:
bool: ``True`` if every job in *list_jobs* has finished.
"""
if len(list_jobs) == 0:
return True
process = subprocess.Popen(
"ccc_mpp -H -n -u ${USER}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
# Sorting active jobs
active_jobs = len(list_jobs) * [False]
for ln in stdout.decode('utf-8').split("\n")[:-1]:
if os.getenv("USER") in ln:
if ln.split()[2] in list_jobs:
active_jobs[list_jobs.index(ln.split()[2])] = True
return np.all(~np.array(active_jobs))