Module Reference

adv_prodcon

The top-level package for Advanced Producer-Consumer.

class adv_prodcon.Worker[source]

The main metaclass for adv_prodcon. Worker should not be used directly. Instead, the Producer and Consumer classes that derive from this should be used as the base for classes that users define.

Worker defines the methods that are common to both Producer and Consumer. Most importantly it defines the start_new method which handles creation of a new process where the work is run.

Worker also contains two instances of multiprocessing.Pipe. The result pipe is used by Producer and Consumer to send results from their work functions back to the main process. The message pipe is made available to derived classes by passing it as an argument to the work functions. This allows the work functions to communicate directly with the main process.

Worker defines three variables that describe the state of a Producer or Consumer’s work loop: stopped, started, and stop_at_queue_end.

__init__()[source]

Initialize self. See help(type(self)) for accurate signature.

get_state()[source]

Get Worker state.

Returns

Return type

Worker state

start_new(work_args=(), work_kwargs=None)[source]

Start a new worker process passing the abstract method work_loop as the target.

New instances of result_pipe and message_pipe are also created, and new threads are started to wait on them.

Parameters
  • work_args (tuple) – args to pass to the work function

  • work_kwargs (dict) – kwargs to pass to the work function

abstract static work_loop(*args, **kwargs)[source]

Static method passed as the target for the process in start_new. Implemented by Producer and Consumer.

Parameters
  • args

  • kwargs

abstract static work(*args, **kwargs)[source]

Static method representing the actual work function for a class derived from Producer or Consumer.

Parameters
  • args

  • kwargs

static on_start(state, message_pipe, *args, **kwargs)[source]

Static method representing a function to be called once when a work loop is started.

Parameters
  • state

  • message_pipe

  • args

  • kwargs

static on_stop(on_start_result, state, message_pipe, *args, **kwargs)[source]

Static method representing a function to be called once when a work loop is stopped.

Parameters
  • on_start_result

  • state

  • message_pipe

  • args

  • kwargs

set_stopped()[source]

Stop the work loop by setting the worker state to stopped.

wait_on_result_pipe()[source]

Loop waiting on data from the result pipe. Calls on_result_ready when data is received.

Returns when an error occurs (indicating that the paired process has ended).

wait_on_message_pipe()[source]

Loop waiting on data from the result pipe. Calls on_message_ready when data is received.

Returns when an error occurs (indicating that the paired process has ended).

on_result_ready(result)[source]

Method that can optionally be overloaded with a callback for when a result is received from the result pipe.

Parameters

result

on_message_ready(message)[source]

Method that can optionally be overloaded with a callback for when a message is received from the message pipe.

Parameters

message

class adv_prodcon.Producer(subscriber_queues=None, work_timeout=0)[source]

The metaclass defining adv_prodcon’s Producer. A Producer is a worker that runs its work function in a loop, and puts the results into each of a list of subscriber Queues.

__init__(subscriber_queues=None, work_timeout=0)[source]

Initialize the Producer. subscriber_queues is a list of queues into which the results of the work function should be put. work_timeout specifies the time in seconds between work function calls. If set to 0, the work function will be called as frequently as possible.

Parameters
  • subscriber_queues

  • work_timeout

set_subscribers(subscriber_queues)[source]

Set the queues into which the results of the work function should be put. This must be called before start_new.

Parameters

subscriber_queues

static work_loop(work, on_start, on_stop, state, work_queues, work_args, work_kwargs, result_pipe, message_pipe, work_timeout, buffer_size)[source]

Runs an infinite loop calling self.work until state is set to stopped. on_start is called at the start and on_stop is called at the end. self.work is called when time since last worked exceeds the work timeout. Results are put into each of the subscriber queues and the result pipe.

Parameters
  • work

  • on_start

  • on_stop

  • state

  • work_queues

  • work_args

  • work_kwargs

  • result_pipe

  • message_pipe

  • work_timeout

  • buffer_size

abstract static work(on_start_result, state, message_pipe, *args)[source]

Static method representing the actual work function for a class derived from Producer.

Parameters
  • on_start_result

  • state

  • message_pipe

  • args

class adv_prodcon.Consumer(work_timeout=5, max_buffer_size=1, lossy_queue=False, *args, **kwargs)[source]

The metaclass defining adv_prodcon’s Consumer. A Consumer is a worker which runs a loop checking for new items in its queue. When the correct criteria are met, Consumer runs its work function with the items from its queue as input.

The criteria for Consumer to run its work function can be: number of items in the queue, time passed since last work, or state stop_at_queue_end set.

__init__(work_timeout=5, max_buffer_size=1, lossy_queue=False, *args, **kwargs)[source]

Initialize the Consumer. work_timeout specifies the time in seconds between work function calls. max_buffer_size specifies the max number of items in the buffer before the work function is called. lossy_queue specifies whether the Consumer’s work_queue should be lossy.

Parameters
  • work_timeout

  • max_buffer_size

  • lossy_queue

  • args

  • kwargs

start_new(work_args=(), work_kwargs=None)[source]

Extends Worker.start_new by setting the Consumer’s work_queue to ready, allowing producers to put items into it.

Parameters
  • work_args

  • work_kwargs

static work_loop(work, on_start, on_stop, state, work_queues, work_args, work_kwargs, result_pipe, message_pipe, work_timeout, max_buffer_size)[source]

Runs an infinite loop calling self.work until state is set to stopped. on_start is called at the start and on_stop is called at the end. self.work is called when time since last worked exceeds the work timeout, or len(buffer) exceeds max_buffer_size, or state is set to stop_at_queue_end. Results are put into the result pipe.

Parameters
  • work

  • on_start

  • on_stop

  • state

  • work_queues

  • work_args

  • work_kwargs

  • result_pipe

  • message_pipe

  • work_timeout

  • max_buffer_size

abstract static work(items, on_start_result, state, message_pipe, *args)[source]

Static method representing the actual work function for a class derived from Consumer.

Parameters
  • items

  • on_start_result

  • state

  • message_pipe

  • args

get_work_queue()[source]

Gets the Consumer’s work_queue.

set_stop_at_queue_end()[source]

Sets the Consumer’s state to stop_at_queue_end.

class adv_prodcon.ReadyQueue(*args, **kwargs)[source]

A class defining an extended multiprocessing.queues.queue. ReadyQueue implements an internal state “Ready” which users can check before adding items to it. The queue is cleared when ready is set to false.

ReadyQueue also implements a “lossy” parameter. When set, if the queue is full, one item will be removed before a new one is placed.

__init__(*args, **kwargs)[source]

Initialize the ReadyQueue. self._ready is initialized to false. kwarg “lossy” is used to specify whether the queue should be lossy. args and other kwargs are passed on to the queue.

Parameters
  • args

  • kwargs

set_ready()[source]

Sets the state to ready.

set_not_ready()[source]

Sets the state to not ready and clears the queue.

is_ready()[source]

Returns self._ready.value.

clear()[source]

Clears the queue by calling queue.get until the Empty exception is reached.

put(obj, block=True, timeout=None)[source]

Puts obj in the queue. If self.lossy is true, calls self.get() first.

Parameters
  • obj

  • block

  • timeout

adv_prodcon.put_in_queue(queue, data)[source]

An interface to put an item into a Consumer’s work queue from the main process. This is defined instead of simply using queue.put in case queue items in adv_prodcon ever need to be wrapped in a dict with the data plus a command.

Parameters
  • queue

  • data