ori.poolchain module¶
Module for the poolchain.
The “poolchain” is a high-level abstraction over the Python
concurrent.futures
concurrency module. A poolchain used
to run a single iterable (e.g. a list) through a chain of functions
and collect an iterable of results at the end.
For example, here is one way to use a poolchain.
from ori.poolchain import PoolChain
results = (
PoolChain()
.add_threadpool(lambda num: num * 10)
.add_processpool(str, chunksize=2, max_workers=3)
.add_threadpool(lambda s: s + " is a number", timeout=5)
).execute_eager([1, 2, 3, 4, 5])
for row in results:
print(row)
# `results` is an iterator, so we go through it with a for loop.
# we could have run `.execute_eager()` to automatically
# turn the iterator into a list.
The general workflow is to create a PoolChain
object and
then make a series of chained calls that add functions to execute
over every item in the iterable in sequence.
After the chain of functions has been specified, call one of the
.execute_*() functions like PoolChain.execute_lazy()
to get an iterator of the final return value or
PoolChain.execute_eager()
to get an entire list upfront.
-
class
ori.poolchain.
PoolChain
¶ Bases:
object
Creates a poolchain.
-
add
(function, *, executor_class, max_workers=None, timeout=None, chunksize=1)¶ Add a new element to the poolchain.
Note that if your executor_class is a
concurrent.futures.ProcessPoolExecutor
, you will not be able to pass lambda functions as func. You will need to use named functions defined with the def keyword.- Parameters
function (
Callable
) – This is the function that you want to run at the end of the current chain.executor_class (
type
) – theconcurrent.futures.Executor
subclass type to use to execute this part of the poolchain.max_workers (
Optional
[int
]) – The maximum number of workers that theconcurrent.futures.ProcessPoolExecutor
can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.timeout (
Optional
[int
]) – The amount of time (in seconds) to wait for aconcurrent.futures.Future
before throwing an error.chunksize (
int
) – Sometimes the executor runs faster when providing chunks of the iterable to workers rather than just one iterable element at a time (the default). Try setting chunksize to a value greater than one to more efficiently use interprocess communication.
- Returns
The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.
- Raises
OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.
-
add_processpool
(function, *, max_workers=None, timeout=None, chunksize=1)¶ Add a function to run with a “process pool” to the poolchain.
You cannot pass a lambda function to a process pool. You need to make up your poolchain entirely with named functions defined with the def keyword.
- Parameters
function (
Callable
) – This is the function that you want to run at the end of the current chain.max_workers (
Optional
[int
]) – The maximum number of workers that theconcurrent.futures.ProcessPoolExecutor
can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.timeout (
Optional
[int
]) – The amount of time (in seconds) to wait for aconcurrent.futures.Future
before throwing an error.chunksize (
int
) – Sometimes the executor runs faster when providing chunks of the iterable to workers rather than just one iterable element at a time (the default). Try setting chunksize to a value greater than one to more efficiently use interprocess communication.
- Returns
The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.
- Raises
OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.
-
add_threadpool
(function, *, max_workers=None, timeout=None)¶ Add a function to run with threadpool to the poolchain.
- Parameters
function (
Callable
) – This is the function that you want to run at the end of the current chain.max_workers (
Optional
[int
]) – The maximum number of workers that theconcurrent.futures.ProcessPoolExecutor
can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.timeout (
Optional
[int
]) – The amount of time (in seconds) to wait for aconcurrent.futures.Future
before throwing an error.
- Returns
The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.
- Raises
OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.
-
execute_eager
(iterable)¶ Run the given iterable through the poolchain, eagerly returning a list.
This method returns the results upfront in a list, which lets you look at the entire output without having to iterate through it yourself.
-
execute_eager_single_threaded
(iterable)¶ Execute the chain in the current thread, returning a Python list.
This method is typically used when you want to test your chain without creating threads or processes. When running in one thread,
PoolChain
would never be faster than just running the functions yourself in sequence.This method also returns the results upfront in a list, which lets you look at the entire output without having to iterate through it yourself.
-
execute_lazy
(iterable)¶ Process the given iterable through the poolchain.
When you call this function, the poolchain begins executing immediately
- Returns
This function returns an iterator that lazily executes the poolchain.
-
execute_lazy_single_threaded
(iterable)¶ Execute the chain in the current thread, returning a generator.
This function is typically used when you want to test your chain without creating threads or processes. When running in one thread,
PoolChain
would never be faster than just running the functions yourself in sequence.This function returns a generator that you have to iterate through yourself for results.
- Parameters
iterable (
Iterable
[Any
]) – This is an iterable where every element in the iterable is processed through the chain.- Returns
This function returns an iterator that lazily executes the poolchain in the current thread. Every time the next output is fetched, the corresponding input is sent through every function in the chain.
-