Source code for pycif.plugins.modes.response_functions.jobs

import os
import time
from logging import info, debug
from typing import Any, Generator, Iterable, List

from .base_function import BaseFunctionType, BaseFunctionSamplingBatch, dump_yaml_config

# Aliases for type hints
Mode = Any
Platform = Any


[docs] def submit_job( self: Mode, command: str, job_file: str ) -> str: """Submit a job, if dryrun only create the job file Args: self (Mode): the mode plugin command (str): command to execute job_file (str): path to the job file Returns: str: job id """ if self.dryrun: self.platform.init_job(self.platform, command, job_file) return "" else: return self.platform.submit_job(command, job_file)
[docs] def wait_jobs(self: Mode, job_id_list: List[str]) -> None: """Wait for all jobs to finish Args: platform (Platform): the mode plugin job_id_list (list of str): ids of the job to wait for """ if self.dryrun or not job_id_list: return info(f"Waiting for {len(job_id_list)} jobs to finish") while not self.platform.check_jobs(job_id_list): time.sleep(self.platform.sleep_time)
[docs] def job_batches( base_function_list: Iterable[BaseFunctionType], batch_size: int ) -> Generator[List[BaseFunctionType], None, None]: """Cut the base functions in batches Args: base_function_list (iterable of base functions): base functions batch_size (int): batch size Yields: list of base functions: batch of base functions """ if batch_size < 1: raise ValueError("batch size must be at least one") batch = [] for base_func in base_function_list: if base_func.has_run_succesfully(): info(f"{base_func.name} has already run, skipping it") continue # Always False for sampling batches if base_func.is_ignored(): info(f"{base_func} is ignored, skipping it") continue batch.append(base_func) if len(batch) == batch_size: yield batch batch = [] yield batch
[docs] def run_jobs_in_batches( self: Mode, base_function_list: List[BaseFunctionType] ) -> None: """Iterate over the control vector elements individualy or with batch sample and run the corresponding base function in job batches Args: self (Mode): the mode plugin base_function_list (iterable of base functions): base functions """ if self.job_batch_size > 0: batch_size = self.job_batch_size else: batch_size = len(base_function_list) # Initializing for first batch job_id_list = [] parallel_command = "\n" running_base_function_list = [] for i, batch in enumerate(job_batches(base_function_list, batch_size)): for base_func in batch: # Dumping input files base_func.dump_controlvect() dump_yaml_config(self, base_func) debug(f"Running {base_func}") if isinstance(base_func, BaseFunctionSamplingBatch): # Writing sampling batch content to a file path = os.path.join(base_func.rundir, "sampling_batch.txt") with open(path, "w") as f: f.write(str(base_func)) running_base_function_list.append(base_func) command = f"{self.platform.python} -m pycif {base_func.yaml_config_path}" if self.pseudo_parallel_job: # Adding job to pseudo parallel comand parallel_command = parallel_command + f"{command} &\n" else: # Submiting job job_file = os.path.join( base_func.rundir, f"job_pycif_{base_func.name}.sh") job_id = submit_job(self, command, job_file) job_id_list.append(job_id) if self.pseudo_parallel_job: # Submiting job batch in one pseudo parallel command parallel_command = parallel_command + "wait\n" job_file = os.path.join( self.basedir, f"job_pycif_batch_{i:05d}.sh") job_id = submit_job(self, parallel_command, job_file) job_id_list.append(job_id) info(f"Submitted job batch {i}") wait_jobs(self, job_id_list) for ran_base_func in running_base_function_list: # Flushing 'controlvect.pickle' file and 'chain' dir (if not dryrun) if self.autoflush and not self.dryrun: ran_base_func.flush() # Initializing for next batch job_id_list = [] parallel_command = "\n" running_base_function_list = [] if job_id_list: wait_jobs(self, job_id_list)