zaku.client
TaskQ Client¶
- class zaku.TaskQ[source]¶
Bases:
PrefixProtoTaskQ Client¶
This is the client that interacts with the TaskQ server. We do not include the configs in the command line because the TaskServer is the primary entry point from the command line.
We do provide the option to load environment variables for these configurations.
Usage¶
# Put this into an .env file (without the exports) export ZAKU_URI=http://localhost:9000 export ZAKU_QUEUE_NAME=ZAKU_TEST:debug-queue-1
Now you can create a queue like this:
queue = TaskQ()
Out[2]: { "uri": "http://localhost:9000", "name": "ZAKU_TEST:debug-queue-1", "ttl": 5.0 }
Task Queue Configurations¶
- uri = 'http://localhost:9000'¶
host endpoint uri, including protocol and port.
uri: str = Proto( "http://localhost:9000", env="ZAKU_URI", )
- name = 'jq-bc2a272a-cdea-49a3-b531-087e6c7bf72b'¶
This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.
name: str = Proto( f"jq-{uuid4()}", env="ZAKU_QUEUE_NAME", )
- ttl = 5.0¶
time to live in seconds. Defaults to 5.
- no_init = None¶
Flag for skipping the queue creation.
Helper Methods¶
Queue Info¶
Task Life-cycle Methods¶
PubSub and RPC Methods¶
- subscribe_one(topic: str, timeout=0.1)[source]¶
subscribe to wait for one publishing event
- Parameters:
topic (str) –
- subscribe_stream(topic: str, timeout=0.1)[source]¶
subscribe to collect all publishing events
- Parameters:
topic (str) –
- rpc(*args, _timeout=1.0, **kwargs)[source]¶
This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.
- Args:
response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments
- Returns:
- rpc_stream(*args, _timeout=1.0, **kwargs)[source]¶
This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.
- Args:
response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments
- Returns:
Async Workflow¶
- gather(jobs: list, gather_tokens=None, prefix='{self.name}.return-queue')[source]¶
Gather the jobs (not quite, will fix - Ge)
Usage¶
queue = TaskQ() jobs = [dict(seed=i) for i in range(30)] is_done, tokens = job_queue.gather(jobs) # this is blocking. if is_done(): print("done")
Now, to add jobs in batches:
queue = TaskQ() jobs = [dict(seed=i) for i in range(30)] is_done, tokens = job_queue.gather(jobs) # add the second batch jobs = [dict(seed=i) for i in range(30, 50)] is_done, tokens = job_queue.gather(jobs, tokens) # now wait for done. if is_done(): print("done")
- param self:
- type self:
TaskQ
- param jobs:
- type jobs:
dict
- param gather_tokens:
This is a set that you can add to, contains the list of tokens
- type gather_tokens:
Union[None, set]
- param prefix:
- type prefix:
str
- return:
Union[Callable, set]
- rtype:
- Parameters:
jobs (list) –
- gather_one(job, gather_tokens=None, **kwargs)[source]¶
Gather the jobs (not quite, will fix - Ge)
Usage¶
queue = TaskQ() tokens = None for i in range(30): is_done, tokens = job_queue.gather_one(jobs, tokens) # this is blocking. if is_done(): print("done")
- param self:
- type self:
TaskQ
- param jobs:
- type jobs:
dict
- param gather_tokens:
this is a singleton, a set that contains just one element, unless you pass in another token set.
- type gather_tokens:
Union[None, set]
- param prefix:
- type prefix:
str
- return:
Union[Callable, set]
- rtype:
- uri = 'http://localhost:9000'¶
host endpoint uri, including protocol and port.
uri: str = Proto( "http://localhost:9000", env="ZAKU_URI", )
- name = 'jq-bc2a272a-cdea-49a3-b531-087e6c7bf72b'¶
This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.
name: str = Proto( f"jq-{uuid4()}", env="ZAKU_QUEUE_NAME", )
- ttl = 5.0¶
time to live in seconds. Defaults to 5.
- no_init = None¶
Flag for skipping the queue creation.
- verbose = None¶
- ZAKU_USER = None¶
- ZAKU_KEY = None¶