Zaku Remote Procedural Call Example¶
This example shows you how to use a remote worker to execute a function, and collect its result. This is useful for offloading work to a separate process, or a separate machine and getting the results back.
from zaku import TaskQ
queue = TaskQ()
Simple Remote Procedure Call (single return)¶
We can call a remote function using the task_queue.rpc method.
queue_name = "ZAKU_TEST:debug-rpc-queue"
rpc_queue = TaskQ(name=queue_name, uri="http://localhost:9000")
result = rpc_queue.rpc(seed=100, _timeout=5)
assert result["seed"] == 100, "the seed should be correct"
How do we implement the worker process? We can use the following code snippet:
Run this either as a stand along script, or in a separate process.
from multiprocessing import Process
def worker_process(queue_name):
from time import sleep
queue = TaskQ(name=queue_name, uri="http://localhost:9000")
job = None
while not job:
with queue.pop() as job:
if job is None:
pass
# _request_id is the topic to respond to. We pop it out.
topic = job.pop("_request_id")
# we simulate a long-running job.
sleep(1.0)
# we return the result to the response topic.
queue.publish(
{"result": "good", **job},
topic=topic,
)
p = Process(target=worker_process, args=("ZAKU_TEST:debug-rpc-queue",))
p.start()
Streaming Remote Procedure Call¶
Sometimes we want to recieve a stream of messages from a topic. We can use the
task_queue.rpc_stream method to do this. This function returns a generator
that we can iterate over.
queue_name = "ZAKU_TEST:debug-rpc-queue"
rpc_queue = TaskQ(name=queue_name, uri="http://localhost:9000")
# remember to run the worker process after calling this.
stream = rpc_queue.rpc_stream(start=5, end=10, _timeout=5)
for i, result in enumerate(stream):
print(">>>", result)
assert result["value"] == i + 5, "the value should be correct"
We can implement the streamer worker via a small modification of the worker process from above: just publish multiple results to the same topic.
from multiprocessing import Process
def streamer_process(queue_name):
from time import sleep
queue = TaskQ(name=queue_name, uri="http://localhost:9000")
job = None
while not job:
with queue.pop() as job:
if job is None:
pass
topic = job.pop("_request_id")
args = job.pop("_args")
# we simulate a long-running job.
sleep(1.0)
# we return a sequence of results
for i in range(job["start"], job["end"]):
sleep(0.1)
queue.publish({"value": i}, topic=topic)
p = Process(target=streamer_process, args=(queue_name,))
p.start()