Source code for pycif.plugins.platforms.fmi_puhti.jobs

import subprocess
import os
import numpy as np

from logging import info

job_template = {
    "start": (
        "#!/bin/bash\n"
        "#SBATCH --job-name=cif"
        "#SBATCH --partition=fmi\n"
        "#SBATCH --account=project_2001147\n"
        "#SBATCH -o %x.o%j"
    ),
    "resource": (
        "#SBATCH --nodes={nodes}\n"
        "#SBATCH --ntasks-per-node={cores}\n"
        "#SBATCH --time=05:00:00\n"
        "#SBATCH --mem-per-cpu=2GB\n"
    ),
    "end": (
        "cd {rootdir}\n"
        "{env_variables}\n"
        "{exe}\n"
    )
}


[docs] def init_job(self, exe, job_file): self.env_variables = getattr(self, "env_variables", {}) self.env_variables.update( {"NUMEXPR_MAX_THREADS": self.cores} ) env_variables = "" if not hasattr(self, "env_variables") \ else "\n".join( ["setenv {} {}".format(key, self.env_variables[key]) for key in self.env_variables] ) if self.resource == "only_cores": keys = ['start', 'only_cores', 'end'] job_temp = '' job_temp = job_temp.join([job_template.get(key) for key in keys]) job_str = job_temp.format( queue=self.queue, cores=self.cores, rootdir=os.path.dirname(job_file), exe=exe, env_variables=env_variables ) else: keys = ['start', 'resource', 'end'] job_temp = '' job_temp = job_temp.join([job_template.get(key) for key in keys]) job_str = job_temp.format( queue=self.queue, nodes=self.nodes, cores=self.cores, rootdir=os.path.dirname(job_file), exe=exe, env_variables=env_variables ) with open(job_file, "w") as f: f.write(job_str)
[docs] def submit_job(self, exe, job_file): """Submit a job to cluster with qsub""" # Compute resource if ( self.model.plugin.name == "CHIMERE" and self.model.plugin.version == "std" and self.resource == "auto" ): auto_job_resource(self) # Initialize the job file to be submitted # self.init_job(exe, job_file) init_job(self, exe, job_file) # Submitting the job info("Submitting job on Puhti server") sub_command = "bash" if self.submit_qsub: sub_command = "sbatch" process = subprocess.Popen( "{} {}".format(sub_command, job_file), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file) ) stdout, stderr = process.communicate() return stdout.decode('utf-8').strip()
[docs] def auto_job_resource(self): """Automatic job resource allocation""" # Monitor resource - cores and nodes nb_procs_sub = self.model.nzdoms * self.model.nmdoms + 1 min_nb_nodes = divmod(nb_procs_sub - 1, self.max_procs_node)[0] + 1 ideal_nb_procs_per_nodes = divmod(nb_procs_sub - 1, min_nb_nodes)[0] + 1 self.nodes = min_nb_nodes self.cores = ideal_nb_procs_per_nodes
[docs] def check_jobs(self, list_jobs): if len(list_jobs) == 0: return True process = subprocess.Popen( "squeue -u ${USER}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() # Sorting active jobs active_jobs = len(list_jobs) * [False] for ln in stdout.decode('utf-8').split("\n"): if os.getenv("USER") in ln: if ln.split()[0] in list_jobs: active_jobs[list_jobs.index(ln.split()[0])] = True return np.all(~np.array(active_jobs))