Module Reference¶
adv_prodcon¶
The top-level package for Advanced Producer-Consumer.
- class adv_prodcon.Worker[source]¶
The main metaclass for adv_prodcon. Worker should not be used directly. Instead, the Producer and Consumer classes that derive from this should be used as the base for classes that users define.
Worker defines the methods that are common to both Producer and Consumer. Most importantly it defines the start_new method which handles creation of a new process where the work is run.
Worker also contains two instances of multiprocessing.Pipe. The result pipe is used by Producer and Consumer to send results from their work functions back to the main process. The message pipe is made available to derived classes by passing it as an argument to the work functions. This allows the work functions to communicate directly with the main process.
Worker defines three variables that describe the state of a Producer or Consumer’s work loop: stopped, started, and stop_at_queue_end.
- start_new(work_args=(), work_kwargs=None)[source]¶
Start a new worker process passing the abstract method work_loop as the target.
New instances of result_pipe and message_pipe are also created, and new threads are started to wait on them.
- Parameters
work_args (tuple) – args to pass to the work function
work_kwargs (dict) – kwargs to pass to the work function
- abstract static work_loop(*args, **kwargs)[source]¶
Static method passed as the target for the process in start_new. Implemented by Producer and Consumer.
- Parameters
args –
kwargs –
- abstract static work(*args, **kwargs)[source]¶
Static method representing the actual work function for a class derived from Producer or Consumer.
- Parameters
args –
kwargs –
- static on_start(state, message_pipe, *args, **kwargs)[source]¶
Static method representing a function to be called once when a work loop is started.
- Parameters
state –
message_pipe –
args –
kwargs –
- static on_stop(on_start_result, state, message_pipe, *args, **kwargs)[source]¶
Static method representing a function to be called once when a work loop is stopped.
- Parameters
on_start_result –
state –
message_pipe –
args –
kwargs –
- wait_on_result_pipe()[source]¶
Loop waiting on data from the result pipe. Calls on_result_ready when data is received.
Returns when an error occurs (indicating that the paired process has ended).
- wait_on_message_pipe()[source]¶
Loop waiting on data from the result pipe. Calls on_message_ready when data is received.
Returns when an error occurs (indicating that the paired process has ended).
- class adv_prodcon.Producer(subscriber_queues=None, work_timeout=0)[source]¶
The metaclass defining adv_prodcon’s Producer. A Producer is a worker that runs its work function in a loop, and puts the results into each of a list of subscriber Queues.
- __init__(subscriber_queues=None, work_timeout=0)[source]¶
Initialize the Producer. subscriber_queues is a list of queues into which the results of the work function should be put. work_timeout specifies the time in seconds between work function calls. If set to 0, the work function will be called as frequently as possible.
- Parameters
subscriber_queues –
work_timeout –
- set_subscribers(subscriber_queues)[source]¶
Set the queues into which the results of the work function should be put. This must be called before start_new.
- Parameters
subscriber_queues –
- static work_loop(work, on_start, on_stop, state, work_queues, work_args, work_kwargs, result_pipe, message_pipe, work_timeout, buffer_size)[source]¶
Runs an infinite loop calling self.work until state is set to stopped. on_start is called at the start and on_stop is called at the end. self.work is called when time since last worked exceeds the work timeout. Results are put into each of the subscriber queues and the result pipe.
- Parameters
work –
on_start –
on_stop –
state –
work_queues –
work_args –
work_kwargs –
result_pipe –
message_pipe –
work_timeout –
buffer_size –
- class adv_prodcon.Consumer(work_timeout=5, max_buffer_size=1, lossy_queue=False, *args, **kwargs)[source]¶
The metaclass defining adv_prodcon’s Consumer. A Consumer is a worker which runs a loop checking for new items in its queue. When the correct criteria are met, Consumer runs its work function with the items from its queue as input.
The criteria for Consumer to run its work function can be: number of items in the queue, time passed since last work, or state stop_at_queue_end set.
- __init__(work_timeout=5, max_buffer_size=1, lossy_queue=False, *args, **kwargs)[source]¶
Initialize the Consumer. work_timeout specifies the time in seconds between work function calls. max_buffer_size specifies the max number of items in the buffer before the work function is called. lossy_queue specifies whether the Consumer’s work_queue should be lossy.
- Parameters
work_timeout –
max_buffer_size –
lossy_queue –
args –
kwargs –
- start_new(work_args=(), work_kwargs=None)[source]¶
Extends Worker.start_new by setting the Consumer’s work_queue to ready, allowing producers to put items into it.
- Parameters
work_args –
work_kwargs –
- static work_loop(work, on_start, on_stop, state, work_queues, work_args, work_kwargs, result_pipe, message_pipe, work_timeout, max_buffer_size)[source]¶
Runs an infinite loop calling self.work until state is set to stopped. on_start is called at the start and on_stop is called at the end. self.work is called when time since last worked exceeds the work timeout, or len(buffer) exceeds max_buffer_size, or state is set to stop_at_queue_end. Results are put into the result pipe.
- Parameters
work –
on_start –
on_stop –
state –
work_queues –
work_args –
work_kwargs –
result_pipe –
message_pipe –
work_timeout –
max_buffer_size –
- class adv_prodcon.ReadyQueue(*args, **kwargs)[source]¶
A class defining an extended multiprocessing.queues.queue. ReadyQueue implements an internal state “Ready” which users can check before adding items to it. The queue is cleared when ready is set to false.
ReadyQueue also implements a “lossy” parameter. When set, if the queue is full, one item will be removed before a new one is placed.
- adv_prodcon.put_in_queue(queue, data)[source]¶
An interface to put an item into a Consumer’s work queue from the main process. This is defined instead of simply using queue.put in case queue items in adv_prodcon ever need to be wrapped in a dict with the data plus a command.
- Parameters
queue –
data –