Source code for pycif.plugins.modes.response_functions.jobs
import os
import time
from logging import info, debug
from typing import Any, Generator, Iterable, List
from .base_function import BaseFunctionType, BaseFunctionSamplingBatch, dump_yaml_config
# Aliases for type hints
Mode = Any
Platform = Any
[docs]
def submit_job(
self: Mode,
command: str,
job_file: str
) -> str:
"""Submit a job, if dryrun only create the job file
Args:
self (Mode): the mode plugin
command (str): command to execute
job_file (str): path to the job file
Returns:
str: job id
"""
if self.dryrun:
self.platform.init_job(self.platform, command, job_file)
return ""
else:
return self.platform.submit_job(command, job_file)
[docs]
def wait_jobs(self: Mode, job_id_list: List[str]) -> None:
"""Wait for all jobs to finish
Args:
platform (Platform): the mode plugin
job_id_list (list of str): ids of the job to wait for
"""
if self.dryrun or not job_id_list:
return
info(f"Waiting for {len(job_id_list)} jobs to finish")
while not self.platform.check_jobs(job_id_list):
time.sleep(self.platform.sleep_time)
[docs]
def job_batches(
base_function_list: Iterable[BaseFunctionType],
batch_size: int
) -> Generator[List[BaseFunctionType], None, None]:
"""Cut the base functions in batches
Args:
base_function_list (iterable of base functions): base functions
batch_size (int): batch size
Yields:
list of base functions: batch of base functions
"""
if batch_size < 1:
raise ValueError("batch size must be at least one")
batch = []
for base_func in base_function_list:
if base_func.has_run_succesfully():
info(f"{base_func.name} has already run, skipping it")
continue
# Always False for sampling batches
if base_func.is_ignored():
info(f"{base_func} is ignored, skipping it")
continue
batch.append(base_func)
if len(batch) == batch_size:
yield batch
batch = []
yield batch
[docs]
def run_jobs_in_batches(
self: Mode,
base_function_list: List[BaseFunctionType]
) -> None:
"""Iterate over the control vector elements individualy or with batch
sample and run the corresponding base function in job batches
Args:
self (Mode): the mode plugin
base_function_list (iterable of base functions): base functions
"""
if self.job_batch_size > 0:
batch_size = self.job_batch_size
else:
batch_size = len(base_function_list)
# Initializing for first batch
job_id_list = []
parallel_command = "\n"
running_base_function_list = []
for i, batch in enumerate(job_batches(base_function_list, batch_size)):
for base_func in batch:
# Dumping input files
base_func.dump_controlvect()
dump_yaml_config(self, base_func)
debug(f"Running {base_func}")
if isinstance(base_func, BaseFunctionSamplingBatch):
# Writing sampling batch content to a file
path = os.path.join(base_func.rundir, "sampling_batch.txt")
with open(path, "w") as f:
f.write(str(base_func))
running_base_function_list.append(base_func)
command = f"{self.platform.python} -m pycif {base_func.yaml_config_path}"
if self.pseudo_parallel_job:
# Adding job to pseudo parallel comand
parallel_command = parallel_command + f"{command} &\n"
else:
# Submiting job
job_file = os.path.join(
base_func.rundir, f"job_pycif_{base_func.name}.sh")
job_id = submit_job(self, command, job_file)
job_id_list.append(job_id)
if self.pseudo_parallel_job:
# Submiting job batch in one pseudo parallel command
parallel_command = parallel_command + "wait\n"
job_file = os.path.join(
self.basedir, f"job_pycif_batch_{i:05d}.sh")
job_id = submit_job(self, parallel_command, job_file)
job_id_list.append(job_id)
info(f"Submitted job batch {i}")
wait_jobs(self, job_id_list)
for ran_base_func in running_base_function_list:
# Flushing 'controlvect.pickle' file and 'chain' dir (if not dryrun)
if self.autoflush and not self.dryrun:
ran_base_func.flush()
# Initializing for next batch
job_id_list = []
parallel_command = "\n"
running_base_function_list = []
if job_id_list:
wait_jobs(self, job_id_list)