from __future__ import annotations
from functools import wraps
from multiprocessing.pool import Pool, ThreadPool
from threading import Thread
from typing import Any, Callable, Iterable, Literal, TypeVar
V = TypeVar("V")
T = TypeVar("T")
__all__ = ["thread", "multicore", "AutoThreads"]
def _use_pool(
func: Callable[[V], T],
kind: Literal["thread", "multicore"],
processes: int | None = None,
unpack: bool = False,
) -> Callable[[Iterable[V]], Iterable[T]]:
if kind == "thread":
PoolClass = ThreadPool
elif kind == "multicore":
PoolClass = Pool
else:
raise ValueError("'kind' argument should be 'thread' or 'multicore'")
@wraps(func)
def pool_func(arg: Iterable[V], chunksize: int | None = None) -> Iterable[T]:
if not isinstance(arg, Iterable):
raise TypeError(
f"function '{func.__name__}' has been decorated with the '{kind}' "
"decorator. Its argument must be iterable."
)
with PoolClass(processes) as pool:
if unpack:
return pool.starmap(func, arg, chunksize) # type: ignore
else:
return pool.map(func, arg, chunksize)
return pool_func
[docs]
def thread(
func: Callable[[V], T],
processes: int | None = None,
unpack: bool = False,
) -> Callable[[Iterable[V]], Iterable[T]]:
"""Decorate a function to be called in threads with multiple arguments
.. warning::
If the :bash:`unpack` argument is False (default), the decorated function must
take exactly one argument.
Args:
func (callable (V) -> T):
Function to be parallelised with threads.
processes (int, optional):
Number of threads to use, if processes is None then the number
returned by os.process_cpu_count() is used. Defaults to None.
unpack (bool, optional):
Unpack the arguments when calling th decorated function.
If unpack is False :bash:`func([a, b])` returns :bash:`[func(a), func(b)]`
If unpack is True :bash:`func([a, b])` returns :bash:`[func(*a), func(*b)]`
Defaults to False.
Returns:
callable (Iterable[V]) -> T:
Function parallelised on threads, takes a chunksize argument
.. code-block:: python
>>> @thread
... def foo(x):
... return 2*x
...
>>> foo([2, 5, 4, 8])
[4, 10, 8, 16]
With :bash:`unpack=True`
.. code-block:: python
>>> @thread(unpack=True)
... def foo(a, b):
... return a + b
...
>>> foo([(1, 2), (3, 4), (5, 6)])
[3, 7, 11]
"""
return _use_pool(func, "thread", processes, unpack)
[docs]
def multicore(
func: Callable[[V], T],
processes: int | None = None,
unpack: bool = False,
) -> Callable[[Iterable[V]], Iterable[T]]:
"""Decorate a function to be called in threads with multiple arguments
.. note::
Due to pickling limitations, this decorator can not be used at the
decorated function definition, and must be called afterwrad.
.. warning::
If the :bash:`unpack` argument is False (default), the decorated function must
take exactly one argument.
Args:
func (callable (V) -> T):
Function to be parallelised on multiple cores.
processes (int, optional):
Number of threads to use, if processes is None then the number
returned by os.process_cpu_count() is used. Defaults to None.
unpack (bool, optional):
Unpack the arguments when calling th decorated function.
If unpack is False :bash:`func([a, b])` returns :bash:`[func(a), func(b)]`
If unpack is True :bash:`func([a, b])` returns :bash:`[func(*a), func(*b)]`
Defaults to False.
Returns:
callable (Iterable[V]) -> T:
Function parallelised on multiple cores, takes a chunksize argument
.. code-block:: python
>>> def foo(x):
... return 2*x
...
>>> parallel_foo = multicore(foo)
>>> parallel_foo([2, 5, 4, 8])
[4, 10, 8, 16]
With :bash:`unpack=True`
.. code-block:: python
>>> def foo(a, b):
... return a + b
...
>>> parallel_foo = multicore(foo, unpack=True)
>>> parallel_foo([(1, 2), (3, 4), (5, 6)])
[3, 7, 11]
"""
return _use_pool(func, "multicore", processes, unpack)
[docs]
class AutoThreads:
"""A group of threads that can be used as a context manager for automatic
joining.
With a context manager:
.. code-block:: python
>>> with AutoThreads() as threads:
... threads.start(foo, 1)
... threads.start(bar, 2, flag=True)
... # Some other code
... threads.start(bar, 24, flag=False)
...
>>> # Implicit join here
Without a context manager:
.. code-block:: python
>>> threads = AutoThreads()
>>> threads.start(foo, 1)
>>> threads.start(bar, 2, flag=True)
>>> # Some other code
>>> threads.start(bar, 24, flag=False)
>>> threads.join() # Explicit join here
Args:
timeout : float, optional
Default timeout in seconds, by default None
"""
def __init__(self, timeout: float | None = None) -> None:
self._threads: list[Thread] = []
self._timeout = timeout
self._context_manager = False
self._terminated = False
def __repr__(self) -> str:
names = [thread.name for thread in self._threads]
return f"<AutoThreads {names}>"
def start(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Call a function in a new thread, positional and keyword arguments are
passed to the function.
Args:
func (callable):
Function to be called in a new thread.
args (tuple):
Positional arguments to be passed to the function.
kwargs (dict):
Keyword arguments to be passed to the function.
"""
if self._context_manager and self._terminated:
raise RuntimeError("AutoThreads object already exited")
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.start()
self._threads.append(thread)
def join(self, timeout: float | None = None) -> None:
"""Join all threads.
Args:
timeout (float, optional):
Timeout in seconds overrides the default timeout, by default None
"""
timeout = timeout if timeout is not None else self._timeout
for thread in self._threads:
thread.join(timeout)
def __enter__(self) -> AutoThreads:
if self._threads:
raise RuntimeError("AutoThreads object already contains some threads")
self._context_manager = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.join()
self._terminated = True