Source code for pycif.utils.parallel

from __future__ import annotations

from functools import wraps
from multiprocessing.pool import Pool, ThreadPool
from threading import Thread
from typing import Any, Callable, Iterable, Literal, TypeVar

V = TypeVar("V")
T = TypeVar("T")

__all__ = ["thread", "multicore", "AutoThreads"]

def _use_pool(
    func: Callable[[V], T],
    kind: Literal["thread", "multicore"],
    processes: int | None = None,
    unpack: bool = False,
) -> Callable[[Iterable[V]], Iterable[T]]:
    if kind == "thread":
        PoolClass = ThreadPool
    elif kind == "multicore":
        PoolClass = Pool
    else:
        raise ValueError("'kind' argument should be 'thread' or 'multicore'")

    @wraps(func)
    def pool_func(arg: Iterable[V], chunksize: int | None = None) -> Iterable[T]:
        if not isinstance(arg, Iterable):
            raise TypeError(
                f"function '{func.__name__}' has been decorated with the '{kind}' "
                "decorator. Its argument must be iterable."
            )
        with PoolClass(processes) as pool:
            if unpack:
                return pool.starmap(func, arg, chunksize)  # type: ignore
            else:
                return pool.map(func, arg, chunksize)

    return pool_func


[docs] def thread( func: Callable[[V], T], processes: int | None = None, unpack: bool = False, ) -> Callable[[Iterable[V]], Iterable[T]]: """Decorate a function to be called in threads with multiple arguments .. warning:: If the :bash:`unpack` argument is False (default), the decorated function must take exactly one argument. Args: func (callable (V) -> T): Function to be parallelised with threads. processes (int, optional): Number of threads to use, if processes is None then the number returned by os.process_cpu_count() is used. Defaults to None. unpack (bool, optional): Unpack the arguments when calling th decorated function. If unpack is False :bash:`func([a, b])` returns :bash:`[func(a), func(b)]` If unpack is True :bash:`func([a, b])` returns :bash:`[func(*a), func(*b)]` Defaults to False. Returns: callable (Iterable[V]) -> T: Function parallelised on threads, takes a chunksize argument .. code-block:: python >>> @thread ... def foo(x): ... return 2*x ... >>> foo([2, 5, 4, 8]) [4, 10, 8, 16] With :bash:`unpack=True` .. code-block:: python >>> @thread(unpack=True) ... def foo(a, b): ... return a + b ... >>> foo([(1, 2), (3, 4), (5, 6)]) [3, 7, 11] """ return _use_pool(func, "thread", processes, unpack)
[docs] def multicore( func: Callable[[V], T], processes: int | None = None, unpack: bool = False, ) -> Callable[[Iterable[V]], Iterable[T]]: """Decorate a function to be called in threads with multiple arguments .. note:: Due to pickling limitations, this decorator can not be used at the decorated function definition, and must be called afterwrad. .. warning:: If the :bash:`unpack` argument is False (default), the decorated function must take exactly one argument. Args: func (callable (V) -> T): Function to be parallelised on multiple cores. processes (int, optional): Number of threads to use, if processes is None then the number returned by os.process_cpu_count() is used. Defaults to None. unpack (bool, optional): Unpack the arguments when calling th decorated function. If unpack is False :bash:`func([a, b])` returns :bash:`[func(a), func(b)]` If unpack is True :bash:`func([a, b])` returns :bash:`[func(*a), func(*b)]` Defaults to False. Returns: callable (Iterable[V]) -> T: Function parallelised on multiple cores, takes a chunksize argument .. code-block:: python >>> def foo(x): ... return 2*x ... >>> parallel_foo = multicore(foo) >>> parallel_foo([2, 5, 4, 8]) [4, 10, 8, 16] With :bash:`unpack=True` .. code-block:: python >>> def foo(a, b): ... return a + b ... >>> parallel_foo = multicore(foo, unpack=True) >>> parallel_foo([(1, 2), (3, 4), (5, 6)]) [3, 7, 11] """ return _use_pool(func, "multicore", processes, unpack)
[docs] class AutoThreads: """A group of threads that can be used as a context manager for automatic joining. With a context manager: .. code-block:: python >>> with AutoThreads() as threads: ... threads.start(foo, 1) ... threads.start(bar, 2, flag=True) ... # Some other code ... threads.start(bar, 24, flag=False) ... >>> # Implicit join here Without a context manager: .. code-block:: python >>> threads = AutoThreads() >>> threads.start(foo, 1) >>> threads.start(bar, 2, flag=True) >>> # Some other code >>> threads.start(bar, 24, flag=False) >>> threads.join() # Explicit join here Args: timeout : float, optional Default timeout in seconds, by default None """ def __init__(self, timeout: float | None = None) -> None: self._threads: list[Thread] = [] self._timeout = timeout self._context_manager = False self._terminated = False def __repr__(self) -> str: names = [thread.name for thread in self._threads] return f"<AutoThreads {names}>" def start(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: """Call a function in a new thread, positional and keyword arguments are passed to the function. Args: func (callable): Function to be called in a new thread. args (tuple): Positional arguments to be passed to the function. kwargs (dict): Keyword arguments to be passed to the function. """ if self._context_manager and self._terminated: raise RuntimeError("AutoThreads object already exited") thread = Thread(target=func, args=args, kwargs=kwargs) thread.start() self._threads.append(thread) def join(self, timeout: float | None = None) -> None: """Join all threads. Args: timeout (float, optional): Timeout in seconds overrides the default timeout, by default None """ timeout = timeout if timeout is not None else self._timeout for thread in self._threads: thread.join(timeout) def __enter__(self) -> AutoThreads: if self._threads: raise RuntimeError("AutoThreads object already contains some threads") self._context_manager = True return self def __exit__(self, exc_type, exc_val, exc_tb): self.join() self._terminated = True