Zaku PubSub Example

This example shows you how to subscribe and publish to topics, that can be used for broadcasting signals or data to multiple subscribers.

Published messages are ephemeral and are not stored in the queue. If you want to store messages, you should use the TaskQ.

This API is used by the RPC api for getting results back from the workers.

from zaku import TaskQ

task_queue = TaskQ()
{'uri': 'http://localhost:9000', 'name': 'jq-57276774-71c2-4925-8508-1f416c300b19', 'ttl': 5.0, 'no_init': None, 'verbose': None, 'ZAKU_USER': None, 'ZAKU_KEY': None}

Example of a Publisher

To publish to a channel, simply call the task_queue.publish method.

Run the following code snippet in a process.

from multiprocessing import Process

def publish(topic_id="example-topic"):
    """A publisher function. We run this in a separate process to publish
    messages to the channel."""
    from time import sleep

    sleep(0.1)

    for i in range(5):
        n = task_queue.publish({"step": i, "param_2": f"key-{i}"}, topic=topic_id)
        sleep(0.1)
        print("published to ", n, "subscribers.")

p = Process(target=publish)
p.start()

Simple Subscription

We can subscribe to a topic using the task_queue.subscribe_one method. This takes the first message from the topic and returns it.

result = task_queue.subscribe_one("example-topic", timeout=5)

print(">>>", result)
assert result["step"] == 0, "the step should be correct"

Streaming Subscription

Sometimes we want to recieve a stream of messages from a topic. We can use the task_queue.subscribe_stream method to do this. This function returns a generator that we can iterate over.

Both APIs are synchronous.

# remember to run the publisher after subscription!
stream = task_queue.subscribe_stream("example-topic", timeout=5)

for i, result in enumerate(stream):
    print(">>>", result)
    assert result["step"] == i, "the step should be correct"

assert i == 4, "there are 5 in total."