zaku.client
TaskQ Client

class zaku.TaskQ[source]

Bases: PrefixProto

TaskQ Client

This is the client that interacts with the TaskQ server. We do not include the configs in the command line because the TaskServer is the primary entry point from the command line.

We do provide the option to load environment variables for these configurations.

Usage

# Put this into an .env file (without the exports)
export ZAKU_URI=http://localhost:9000
export ZAKU_QUEUE_NAME=ZAKU_TEST:debug-queue-1

Now you can create a queue like this:

queue = TaskQ()
Out[2]: {
    "uri": "http://localhost:9000",
    "name": "ZAKU_TEST:debug-queue-1",
    "ttl": 5.0
}

Task Queue Configurations

uri = 'http://localhost:9000'

host endpoint uri, including protocol and port.

uri: str = Proto(
    "http://localhost:9000",
    env="ZAKU_URI",
)   
name = 'jq-bc2a272a-cdea-49a3-b531-087e6c7bf72b'

This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.

name: str = Proto(
    f"jq-{uuid4()}",
    env="ZAKU_QUEUE_NAME",
)
ttl = 5.0

time to live in seconds. Defaults to 5.

no_init = None

Flag for skipping the queue creation.

Helper Methods

print_info()[source]

Print the current configurations of the queue.

Useful for debugging or when connection fails.

Queue Info

count()[source]

Count the number of available jobs in the queue.

num_of_jobs = queue.count()
:return None if the queue does not exist. This happens when there is no stale jobs.

0 if the queue only contains stale jobs number if the queue contains open jobs (created)

__len__()[source]

Returns the number of available jobs in the queue.

length = len(queue)

Hint

This is an alias of TaskQ.count.

:return None if the queue does not exist. This happens when there is no stale jobs.

0 if the queue only contains stale jobs number if the queue contains open jobs (created)

Task Life-cycle Methods

init_queue(name=None)[source]

Create a new collection.

Parameters:

name – (optional) The name of the queue.

add(value: Dict, *, key=None)[source]

Append a job to the queue.

Parameters:

value (Dict) –

take()[source]

Grab a job that has not been grabbed from the queue.

mark_done(job_id)[source]

Mark a job as done.

mark_reset(job_id)[source]
pop()[source]

Pop a job from the queue.

clear_queue()[source]

Remove all jobs in a queue. Useful when stale jobs degrades performance.

PubSub and RPC Methods

publish(value: Dict, *, topic=None)[source]

Append a job to the queue.

Parameters:

value (Dict) –

subscribe_one(topic: str, timeout=0.1)[source]

subscribe to wait for one publishing event

Parameters:

topic (str) –

subscribe_stream(topic: str, timeout=0.1)[source]

subscribe to collect all publishing events

Parameters:

topic (str) –

rpc(*args, _timeout=1.0, **kwargs)[source]

This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.

Args:

response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments

Returns:

rpc_stream(*args, _timeout=1.0, **kwargs)[source]

This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.

Args:

response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments

Returns:

Async Workflow

gather(jobs: list, gather_tokens=None, prefix='{self.name}.return-queue')[source]

Gather the jobs (not quite, will fix - Ge)

Usage

queue = TaskQ()

jobs = [dict(seed=i) for i in range(30)]
is_done, tokens = job_queue.gather(jobs)

# this is blocking.
if is_done():
    print("done")

Now, to add jobs in batches:

queue = TaskQ()

jobs = [dict(seed=i) for i in range(30)]
is_done, tokens = job_queue.gather(jobs)

# add the second batch
jobs = [dict(seed=i) for i in range(30, 50)]
is_done, tokens = job_queue.gather(jobs, tokens)

# now wait for done.
if is_done():
    print("done")
param self:

type self:

TaskQ

param jobs:

type jobs:

dict

param gather_tokens:

This is a set that you can add to, contains the list of tokens

type gather_tokens:

Union[None, set]

param prefix:

type prefix:

str

return:

Union[Callable, set]

rtype:

Parameters:

jobs (list) –

gather_one(job, gather_tokens=None, **kwargs)[source]

Gather the jobs (not quite, will fix - Ge)

Usage

queue = TaskQ()

tokens = None
for i in range(30):
    is_done, tokens = job_queue.gather_one(jobs, tokens)

# this is blocking.
if is_done():
    print("done")
param self:

type self:

TaskQ

param jobs:

type jobs:

dict

param gather_tokens:

this is a singleton, a set that contains just one element, unless you pass in another token set.

type gather_tokens:

Union[None, set]

param prefix:

type prefix:

str

return:

Union[Callable, set]

rtype:

uri = 'http://localhost:9000'

host endpoint uri, including protocol and port.

uri: str = Proto(
    "http://localhost:9000",
    env="ZAKU_URI",
)   
name = 'jq-bc2a272a-cdea-49a3-b531-087e6c7bf72b'

This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.

name: str = Proto(
    f"jq-{uuid4()}",
    env="ZAKU_QUEUE_NAME",
)
ttl = 5.0

time to live in seconds. Defaults to 5.

no_init = None

Flag for skipping the queue creation.

verbose = None
ZAKU_USER = None
ZAKU_KEY = None