Source code for pycif.plugins.platforms.empa_daint.jobs

import inspect
import subprocess
import os
from time import sleep
import numpy as np

from logging import debug, error, info, warning
from .utils import n_jobs_running


[docs] def get_template_job(self): job_template = "\n".join(( "#!/bin/bash -l", '#SBATCH --uenv="icon-wcp/v1:rc4"', '#SBATCH --job-name="{jobname}"', "#SBATCH --time={time}", "#SBATCH --account={project}", "#SBATCH --nodes={nodes}", "#SBATCH --ntasks-per-node={cores}", "#SBATCH --ntasks-per-core=1", "#SBATCH --cpus-per-task=1", "#SBATCH --partition={queue}", "#SBATCH --constraint={constraint}\n", )) if self.memory: job_template += "\n".join(( "#SBATCH --mem={memory}GB\n", )) job_template += "\n".join(( "#SBATCH --chdir={rootdir}", "#SBATCH --output={rootdir}/sbatch_log", )) job_template += "\n".join(( '', "export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK", '', "echo done", '', "{env_variables}", '', "{exe}", )) return job_template
[docs] def init_job(self, exe, job_file, jobname='CIF'): self.env_variables = getattr(self, "env_variables", {}) env_variables = "" if not hasattr(self, "env_variables") \ else "\n".join( ["setenv {} {}".format(key, self.env_variables[key]) for key in self.env_variables] ) if not hasattr(self, "memory"): self.memory = False job_template = get_template_job(self) job_str = job_template.format( jobname=jobname, project=self.project, queue=self.queue, nodes=self.nodes if self.queue != 'debug' else min(self.nodes, 10), cores=self.cores, time=self.time, rootdir=os.path.dirname(job_file), constraint=self.constraint, memory=min(self.memory, 120) if self.queue != 'debug' else min(self.memory, 10), exe=exe, env_variables=env_variables ) with open(job_file, "w") as f: f.write(job_str)
[docs] def submit_job(self, exe, job_file, jobname='CIF'): """Submit a job to cluster with SBATCH""" # Initialize the job file to be submitted self.init_job(self, exe, job_file, jobname=jobname) n_jobs_in_run = n_jobs_running(self) info(f'{n_jobs_in_run} jobs are in queue.') if self.queue == "debug" and n_jobs_in_run > 1: # Debug queue has a maximum of jobs it can run warning( f"{self.queue} queue is full. " f"CIF will check in {self.sleep_time}s if the queue is free. " "Alternatively, try running the job on the 'normal' queue." ) # Wait one minute and try again to submit the job sleep(self.sleep_time) return self.submit_job(exe, job_file) # Submitting the job info(f"Submitting job with exe = {exe} on daint from batch file job_file = {job_file}") process = subprocess.Popen( "sbatch {}".format(job_file), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file) ) process.wait() stdout, stderr = process.communicate() error_message = stderr.decode('utf-8') if error_message: err_filepath = "{}/log.stderr".format(os.path.dirname(job_file)) error( f"Errors while running {exe}.\n" f"They are logged in {err_filepath}." ) with open(err_filepath, "w") as err_file: err_file.write(error_message) # Job number is given by the last str of the sbatch output try: sbatch_job_number = stdout.decode('utf-8').strip().split()[-1] return sbatch_job_number except IndexError: raise PermissionError( "The job could not be submitted with the given settings.\n" f"Checked the errors in {err_filepath}." )
[docs] def check_jobs(self, list_jobs): """Check if the given jobs are still active.""" debug(f"Checking jobs from list_jobs={list_jobs}") if len(list_jobs) == 0: return True squeue_cmd = f"squeue -j {','.join(list_jobs)}" process = subprocess.Popen( squeue_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() debug(f"{squeue_cmd}: \n{stdout.decode('utf-8')}") # Will have stdout_lines = stdout.decode('utf-8').split("\n") if len(stdout_lines) < 2: # If the message is too short, it does not contain the output of squeue debug("Problem with the squeue process communication.") debug(stdout_lines) return self.check_jobs(list_jobs) # Sorting active jobs active_jobs = len(list_jobs) * [False] # Read each lines after the header and before the last one (empty \n) for ln in stdout_lines[1:-1]: # The jobid is the first word of the line jobid = ln.split()[0] active_jobs[list_jobs.index(jobid)] = True ret = np.all(~np.array(active_jobs)) debug(f"check_jobs: -> {ret}") return ret