Source code for pycif.plugins.platforms.tgcc_ccrt.jobs

import subprocess
import os
import numpy as np
from . import modules_list

from logging import info

job_template = {
    "start":(
	"{env_variables}\n"

        "\n#MSUB -m scratch,work,store\n"
        "#MSUB -r pycif\n"
        "#MSUB -Q long\n"
        "#MSUB -T 259200\n"
        "#MSUB -q {partition}\n"
        "#MSUB -A {project}\n"
    ),
    "resource":(
        "#MSUB -n {nodes}\n"
        "#MSUB -c {cores}\n"
    ),
    "only_cores":(
        "#MSUB -n {cores}\n"
    ),
    "end":(
	"#MSUB -e {rootdir}/pycif_{testdir}.%I.e\n"
        "#MSUB -o {rootdir}/pycif_{testdir}.%I.o\n"

        "\n{modules}\n"

        "\nunset PYTHONUSERBASE\n"
        "\nunset LD_PRELOAD\n"
        "unset BRIDGE_MPRUN_PRELOAD\n"
        "echo $SLURM_NODELIST > machine_node_{partition}\n"

        "\n{exe}\n"

        "\n{deactivate}\n"
    )
}


[docs] def init_job(self, exe, job_file): """Write an ``#MSUB`` batch script for the TGCC ``ccc_msub`` scheduler. Args: self (Plugin): platform plugin instance. exe (str): shell command to execute inside the job. job_file (str): path where the job script will be written. """ if self.pythonuserbase=="": raise Exception( "No Python userbase defined!\n" "Please check your configuration file." ) self.env_variables = getattr(self, "env_variables", {}) self.env_variables.update( {"NUMEXPR_MAX_THREADS ": ["#setenv",self.cores], "PYTHONUSERBASE=": ["#export",self.pythonuserbase]} ) env_variables = "" if not hasattr(self, "env_variables") \ else "\n".join( ["{} {}{}".format(self.env_variables[key][0], key, self.env_variables[key][1]) for key in self.env_variables] ) if self.python_venv != "": env_variables += f"\nsource {self.python_venv}" deactivate = "deactivate" modules_str = "" else: deactivate = "" modules_str = "\n".join(["module {}".format(m) for m in modules_list.modules[self.partition]]) if self.resource == "only_cores": keys = ['start', 'only_cores', 'end'] job_temp = '' job_temp = job_temp.join([job_template.get(key) for key in keys]) job_str = job_temp.format( partition=self.partition, project=self.project, cores=self.cores, modules=modules_str, rootdir=os.path.dirname(job_file), testdir=os.path.basename(os.path.dirname(job_file)), env_variables=env_variables, exe=exe, deactivate=deactivate) else: keys = ['start', 'resource', 'end'] job_temp = '' job_temp = job_temp.join([job_template.get(key) for key in keys]) job_str = job_temp.format( partition=self.partition, project=self.project, nodes=self.nodes, cores=self.cores, modules=modules_str, rootdir=os.path.dirname(job_file), testdir=os.path.basename(os.path.dirname(job_file)), env_variables=env_variables, exe=exe, deactivate=deactivate) with open(job_file, "w") as f: f.write(job_str)
[docs] def submit_job(self, exe, job_file, **kwargs): """Submit a job to cluster with msub""" # Prepare optional job dependency job_id = kwargs.get('job_id', None) job_dependency = "" if not job_id \ else ("-a {}".format(job_id)) # Compute resource if ( self.model.plugin.name == "CHIMERE" and self.model.plugin.version == "std" and self.resource == "auto" ): auto_job_resource(self) # Initialize the job file to be submitted self.init_job(self, exe, job_file) # Submitting the job info("Submitting job on TGCC-CCRT server") process = subprocess.Popen( "ccc_msub {} {}".format(job_dependency,job_file), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file) ) stdout, stderr = process.communicate() return stdout.decode('utf-8').strip()
[docs] def auto_job_resource(self): """Set ``nodes`` and ``cores`` automatically for CHIMERE (``resource = 'auto'``). Computes the minimum number of nodes and the optimal core count per node so that ``nzdoms × nmdoms + 1`` MPI tasks fit with minimal waste. """ # Monitor resource - cores and nodes nb_procs_sub = self.model.nzdoms * self.model.nmdoms + 1 min_nb_nodes = divmod(nb_procs_sub - 1, self.max_procs_node)[0] + 1 ideal_nb_procs_per_nodes = divmod(nb_procs_sub - 1, min_nb_nodes)[0] + 1 self.nodes = min_nb_nodes self.cores = ideal_nb_procs_per_nodes
[docs] def check_jobs(self, list_jobs): """Return ``True`` when all listed jobs have left the TGCC scheduler queue. Args: self (Plugin): platform plugin instance. list_jobs (list[str]): job identifiers returned by ``submit_job``. Returns: bool: ``True`` if every job in *list_jobs* has finished. """ if len(list_jobs) == 0: return True process = subprocess.Popen( "ccc_mpp -H -n -u ${USER}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() # Sorting active jobs active_jobs = len(list_jobs) * [False] for ln in stdout.decode('utf-8').split("\n")[:-1]: if os.getenv("USER") in ln: if ln.split()[2] in list_jobs: active_jobs[list_jobs.index(ln.split()[2])] = True return np.all(~np.array(active_jobs))