Source code for pycif.plugins.platforms.tgcc_ccrt_nvidia.jobs

import os
import subprocess
from logging import info
from pathlib import Path
from typing import List


[docs] def init_job(self, command: str, job_file) -> None: """Write a job shell file for the CCRT cluster Args: command (str): command to execute job_file (str): path to the job file """ # Some input argument mandatory for submiting jobs are marked as optional # we only check if they are present when submitting a job within pycif mandatory_arguments = ["partition", "project"] for arg in mandatory_arguments: if not hasattr(self, arg): raise ValueError(f"input argument '{arg}' is mandatory for submitting jobs") self.env_variables = getattr(self, 'env_variables', {}) job_file = Path(job_file) job_name = os.getenv("BRIDGE_MSUB_REQNAME", "pycif_job") job_name = f"{job_name}_{job_file.stem}" out_file = job_file.with_suffix(".%I.out") # Shebang and job arguments lines = [ "#!/usr/bin/bash", "", f"#MSUB -r {job_name}" f"#MSUB -o {out_file}" f"#MSUB -e {out_file}" f"#MSUB -A {self.project}", f"#MSUB -q {self.partition}", f"#MSUB -n {self.nodes}", f"#MSUB -c {self.cores}", f"#MSUB -Q {self.qos}", f"#MSUB -T {self.walltime}", f"#MSUB -m {self.filesystem}", "", ] # Setup environment modules if hasattr(self, "job_env"): lines.extend(self.job_env) lines.append("") else: lines.extend( [ "module purge", f"module load intel/20 mpi/openmpi/4 {self.python_module}", "module load flavor/hdf5/parallel netcdf-fortran", "module load grib", "", ] ) # Environement variables lines.extend([f"export {key}={val}\n" for key, val in self.env_variables.items()]) lines.append("") # Python virtual environment if hasattr(self, 'python_venv'): activate_file = os.path.join(self.python_venv, "bin/activate") if not os.path.isfile(activate_file): raise FileNotFoundError( "The python virtual environment at " f"'{self.python_venv}' was not found" ) lines.append(f"source {activate_file}") lines.append("") # Command lines.append(command) lines.append("") # Deactivate Python virtual environment if hasattr(self, 'python_venv'): lines.append("deactivate") lines.append("") # Writting job file with open(job_file, "w") as f: f.write('\n'.join(lines))
[docs] def submit_job(self, command: str, job_file: str) -> str: """Write and submit a job file to cluster scheduler with ccc_msub Args: command (str): command to execute job_file (str): path to the job file Returns: str: job id if ccc_msub is used, else "" (empty string) """ # Initialize the job file to be submitted self.init_job(self, command, job_file) if self.submit_msub: info("Submitting job on TGCC-CCRT") sub_command = "ccc_msub" else: info("Running job in subprocess") sub_command = "bash" # Submitting the job process = subprocess.Popen( [sub_command, job_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file), ) stdout, _ = process.communicate() if process.returncode != 0: raise RuntimeError( f"Error while running command '{command}' in job file " f"'{job_file}' with '{sub_command}' command." ) if self.submit_msub: return stdout.decode('utf-8').strip() else: return ""
[docs] def check_jobs(self, job_id_list: List[str]) -> bool: """Check if the provieded jobs are running Args: job_id_list (list of str): ids of the job to check Returns: bool: True if no job is running, False if at least one job is still running """ # Filter empty strings (may be returned by submit_job when ''submit_qsub' # arguement is set to False) filtered_job_id_list = [job_id for job_id in job_id_list if job_id] if not filtered_job_id_list: return True # Running 'ccc_mpp -H -n -u ${USER}' command process = subprocess.Popen( ["ccc_mpp", "-H", "-n", "-u", "${USER}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, _ = process.communicate() # Parsing command output lines = stdout.decode('utf-8').split("\n") for ln in lines: # Skip empty lines if not ln: continue job_id = ln.split()[2] if job_id in filtered_job_id_list: # At least one job is still running return False # No job is running return True