Source code for pycif.plugins.platforms.fmi_puhti.jobs
import subprocess
import os
import numpy as np
from logging import info
job_template = {
"start": (
"#!/bin/bash\n"
"#SBATCH --job-name=cif"
"#SBATCH --partition=fmi\n"
"#SBATCH --account=project_2001147\n"
"#SBATCH -o %x.o%j"
),
"resource": (
"#SBATCH --nodes={nodes}\n"
"#SBATCH --ntasks-per-node={cores}\n"
"#SBATCH --time=05:00:00\n"
"#SBATCH --mem-per-cpu=2GB\n"
),
"end": (
"cd {rootdir}\n"
"{env_variables}\n"
"{exe}\n"
)
}
[docs]
def init_job(self, exe, job_file):
self.env_variables = getattr(self, "env_variables", {})
self.env_variables.update(
{"NUMEXPR_MAX_THREADS": self.cores}
)
env_variables = "" if not hasattr(self, "env_variables") \
else "\n".join(
["setenv {} {}".format(key, self.env_variables[key])
for key in self.env_variables]
)
if self.resource == "only_cores":
keys = ['start', 'only_cores', 'end']
job_temp = ''
job_temp = job_temp.join([job_template.get(key) for key in keys])
job_str = job_temp.format(
queue=self.queue,
cores=self.cores,
rootdir=os.path.dirname(job_file),
exe=exe,
env_variables=env_variables
)
else:
keys = ['start', 'resource', 'end']
job_temp = ''
job_temp = job_temp.join([job_template.get(key) for key in keys])
job_str = job_temp.format(
queue=self.queue,
nodes=self.nodes,
cores=self.cores,
rootdir=os.path.dirname(job_file),
exe=exe,
env_variables=env_variables
)
with open(job_file, "w") as f:
f.write(job_str)
[docs]
def submit_job(self, exe, job_file):
"""Submit a job to cluster with qsub"""
# Compute resource
if (
self.model.plugin.name == "CHIMERE"
and self.model.plugin.version == "std"
and self.resource == "auto"
):
auto_job_resource(self)
# Initialize the job file to be submitted
# self.init_job(exe, job_file)
init_job(self, exe, job_file)
# Submitting the job
info("Submitting job on Puhti server")
sub_command = "bash"
if self.submit_qsub:
sub_command = "sbatch"
process = subprocess.Popen(
"{} {}".format(sub_command, job_file),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file)
)
stdout, stderr = process.communicate()
return stdout.decode('utf-8').strip()
[docs]
def auto_job_resource(self):
"""Automatic job resource allocation"""
# Monitor resource - cores and nodes
nb_procs_sub = self.model.nzdoms * self.model.nmdoms + 1
min_nb_nodes = divmod(nb_procs_sub - 1, self.max_procs_node)[0] + 1
ideal_nb_procs_per_nodes = divmod(nb_procs_sub - 1, min_nb_nodes)[0] + 1
self.nodes = min_nb_nodes
self.cores = ideal_nb_procs_per_nodes
[docs]
def check_jobs(self, list_jobs):
if len(list_jobs) == 0:
return True
process = subprocess.Popen(
"squeue -u ${USER}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
# Sorting active jobs
active_jobs = len(list_jobs) * [False]
for ln in stdout.decode('utf-8').split("\n"):
if os.getenv("USER") in ln:
if ln.split()[0] in list_jobs:
active_jobs[list_jobs.index(ln.split()[0])] = True
return np.all(~np.array(active_jobs))