Advanced Producer-Consumer

https://www.travis-ci.com/acreegan/adv_prodcon.svg?branch=main Documentation Status Updates

Advanced Producer-Consumer is a python package implementing a full featured producer-consumer pattern for concurrent workers. This is useful for developing data acquisition programs or programs that involve real-time data processing while maintatining a responsive UI. It is compatible with PyQt5, which allows it to be used to develop graphical data acquisition and visualisation applications.

Features

  • Producer and Consumer background workers are defined as metaclasses that can be extended by the user. They implement work functions that run in separate processes.

  • Consumers are buffered and can be configured to run based on a timeout, a max buffer size, or both.

  • The queues that connect Producers to Consumers can be either non-lossy or lossy.

  • Producers and Consumers are connected by a subscription model. Producers can have multiple consumers subscribed to them. Consumers can be subscribed to multiple Producers.

  • Producers and Consumers have on_start and on_stop functions that can be defined to run code for setup and teardown.

  • Results from Consumers (and Producers) can be accessed in the main process through a user-defined callback.

  • User defined functions can be defined to communicate between the main process and the work functions.

  • Compatible with PyQt5, which allows development of graphical data acquisition and visualisation applications.

Installation

To install Advanced Producer Consumer, run this command in your terminal:

$ pip install adv_prodcon

Quick Start

The following is a quick example of how to use the adv_prodcon package

import adv_prodcon
import time
from itertools import count

Imports.

class ExampleProducer(adv_prodcon.Producer):

    @staticmethod
    def on_start(state, message_pipe, *args, **kwargs):
        return {"count": count()}

    @staticmethod
    def work(on_start_result, state, message_pipe, *args):
        return next(on_start_result["count"])

Define a Producer class. Here we are using the on_start method to establish a itertools.count iterator. This is made available in the work function through the on_start_result argument. The work function will return the next count each time it is run.

class ExampleConsumer(adv_prodcon.Consumer):

    @staticmethod
    def work(items, on_start_result, state, message_pipe, *args):
        return f"Got :{items} from producer"

    def on_result_ready(self, result):
        print(result)

Define a Consumer Class. This Consumer will just be used as a buffer, returning a string with the items received from the Producer. The on_result_ready function is called when the main process receives the result of the work function. Here we are just printing out the result.

if __name__ == "__main__":
    example_producer = ExampleProducer(work_timeout=1)
    example_consumer = ExampleConsumer(work_timeout=2,
                                       max_buffer_size=1000)

    example_producer.set_subscribers([example_consumer.get_work_queue()])
    example_producer.start_new()
    example_consumer.start_new()

    time.sleep(10)

In the main code block, we create an instance of both our ExampleProducer and our ExampleConsumer. We set the work_timeout of the ExampleProducer to 1 so that it runs once per second. We set the work_timeout of the ExampleConsumer to 2 so that every 2 seconds it performs work on all items in its queue. The max_buffer_size is set high so that the ExampleConsumer is controlled by its work_timeout.

The output of this code is shown below:

Got :[0, 1] from producer
Got :[2, 3] from producer
Got :[4, 5] from producer
Got :[6, 7] from producer

Process finished with exit code 0

Note that the output may be slightly different depending on the time taken to start the worker processes.

Credits