ori.poolchain module

Module for the poolchain.

The “poolchain” is a high-level abstraction over the Python concurrent.futures concurrency module. A poolchain used to run a single iterable (e.g. a list) through a chain of functions and collect an iterable of results at the end.

For example, here is one way to use a poolchain.

from ori.poolchain import PoolChain

results = (
    PoolChain()
    .add_threadpool(lambda num: num * 10)
    .add_processpool(str, chunksize=2, max_workers=3)
    .add_threadpool(lambda s: s + " is a number", timeout=5)
).execute_eager([1, 2, 3, 4, 5])

for row in results:
    print(row)
# `results` is an iterator, so we go through it with a for loop.
# we could have run `.execute_eager()` to automatically
# turn the iterator into a list.

The general workflow is to create a PoolChain object and then make a series of chained calls that add functions to execute over every item in the iterable in sequence.

After the chain of functions has been specified, call one of the .execute_*() functions like PoolChain.execute_lazy() to get an iterator of the final return value or PoolChain.execute_eager() to get an entire list upfront.

class ori.poolchain.PoolChain

Bases: object

Creates a poolchain.

add(function, *, executor_class, max_workers=None, timeout=None, chunksize=1)

Add a new element to the poolchain.

Note that if your executor_class is a concurrent.futures.ProcessPoolExecutor, you will not be able to pass lambda functions as func. You will need to use named functions defined with the def keyword.

Parameters
  • function (Callable) – This is the function that you want to run at the end of the current chain.

  • executor_class (type) – the concurrent.futures.Executor subclass type to use to execute this part of the poolchain.

  • max_workers (Optional[int]) – The maximum number of workers that the concurrent.futures.ProcessPoolExecutor can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.

  • timeout (Optional[int]) – The amount of time (in seconds) to wait for a concurrent.futures.Future before throwing an error.

  • chunksize (int) – Sometimes the executor runs faster when providing chunks of the iterable to workers rather than just one iterable element at a time (the default). Try setting chunksize to a value greater than one to more efficiently use interprocess communication.

Returns

The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.

Raises

OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.

add_processpool(function, *, max_workers=None, timeout=None, chunksize=1)

Add a function to run with a “process pool” to the poolchain.

You cannot pass a lambda function to a process pool. You need to make up your poolchain entirely with named functions defined with the def keyword.

Parameters
  • function (Callable) – This is the function that you want to run at the end of the current chain.

  • max_workers (Optional[int]) – The maximum number of workers that the concurrent.futures.ProcessPoolExecutor can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.

  • timeout (Optional[int]) – The amount of time (in seconds) to wait for a concurrent.futures.Future before throwing an error.

  • chunksize (int) – Sometimes the executor runs faster when providing chunks of the iterable to workers rather than just one iterable element at a time (the default). Try setting chunksize to a value greater than one to more efficiently use interprocess communication.

Returns

The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.

Raises

OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.

add_threadpool(function, *, max_workers=None, timeout=None)

Add a function to run with threadpool to the poolchain.

Parameters
  • function (Callable) – This is the function that you want to run at the end of the current chain.

  • max_workers (Optional[int]) – The maximum number of workers that the concurrent.futures.ProcessPoolExecutor can start. If this is None, then then the executor will create the same number of workers as you have processors on your machine. This is usually a sensible default.

  • timeout (Optional[int]) – The amount of time (in seconds) to wait for a concurrent.futures.Future before throwing an error.

Returns

The PoolChain object itself, allowing you to chain subsequent add_*() calls and at the end an execute_*() call.

Raises

OriValidationError – Thrown if you don’t send a real function, or a real Executor, a non-integer for one of the integer fields, etc.

execute_eager(iterable)

Run the given iterable through the poolchain, eagerly returning a list.

This method returns the results upfront in a list, which lets you look at the entire output without having to iterate through it yourself.

Parameters

iterable (Iterable[Any]) – This is an iterable where every element in the iterable is processed through the chain.

Returns

This function returns a complete Python list containing the return value of every element in the list.

execute_eager_single_threaded(iterable)

Execute the chain in the current thread, returning a Python list.

This method is typically used when you want to test your chain without creating threads or processes. When running in one thread, PoolChain would never be faster than just running the functions yourself in sequence.

This method also returns the results upfront in a list, which lets you look at the entire output without having to iterate through it yourself.

Parameters

iterable (Iterable[Any]) – This is an iterable where every element in the iterable is processed through the chain.

Returns

This function returns a complete Python list containing the return value of every element in the list.

execute_lazy(iterable)

Process the given iterable through the poolchain.

When you call this function, the poolchain begins executing immediately

Returns

This function returns an iterator that lazily executes the poolchain.

execute_lazy_single_threaded(iterable)

Execute the chain in the current thread, returning a generator.

This function is typically used when you want to test your chain without creating threads or processes. When running in one thread, PoolChain would never be faster than just running the functions yourself in sequence.

This function returns a generator that you have to iterate through yourself for results.

Parameters

iterable (Iterable[Any]) – This is an iterable where every element in the iterable is processed through the chain.

Returns

This function returns an iterator that lazily executes the poolchain in the current thread. Every time the next output is fetched, the corresponding input is sent through every function in the chain.