zaku.client
TaskQ Client

class zaku.TaskQ[source]

Bases: PrefixProto

TaskQ Client

This is the client that interacts with the TaskQ server. We do not include the configs in the command line because the TaskServer is the primary entry point from the command line.

We do provide the option to load environment variables for these configurations.

Usage

# Put this into an .env file (without the exports)
export ZAKU_URI=http://localhost:9000
export ZAKU_QUEUE_NAME=ZAKU_TEST:debug-queue-1

Now you can create a queue like this:

queue = TaskQ()
Out[2]: {
    "uri": "http://localhost:9000",
    "name": "ZAKU_TEST:debug-queue-1",
    "ttl": 5.0
}

Task Queue Configurations

uri = 'http://localhost:9000'

host endpoint uri, including protocol and port.

uri: str = Proto(
    "http://localhost:9000",
    env="ZAKU_URI",
)   
name = 'jq-29fcc195-9501-424c-8b66-fce6ef75b534'

This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.

name: str = Proto(
    f"jq-{uuid4()}",
    env="ZAKU_QUEUE_NAME",
)
ttl = 5.0

time to live in seconds. Defaults to 5.

no_init = None

Flag for skipping the queue creation.

Helper Methods

print_info()[source]

Print the current configurations of the queue.

Useful for debugging or when connection fails.

Task Life-cycle Methods

init_queue(name=None)[source]

Create a new collection.

Parameters:

name – (optional) The name of the queue.

add(value: Dict, *, key=None)[source]

Append a job to the queue.

Parameters:

value (Dict) –

take()[source]

Grab a job that has not been grabbed from the queue.

mark_done(job_id)[source]

Mark a job as done.

mark_reset(job_id)[source]
pop()[source]

Pop a job from the queue.

clear_queue()[source]

Remove all jobs in a queue. Useful when stale jobs degrades performance.

PubSub and RPC Methods

publish(value: Dict, *, topic=None)[source]

Append a job to the queue.

Parameters:

value (Dict) –

subscribe_one(topic: str, timeout=0.1)[source]

subscribe to wait for one publishing event

Parameters:

topic (str) –

subscribe_stream(topic: str, timeout=0.1)[source]

subscribe to collect all publishing events

Parameters:

topic (str) –

rpc(*args, _timeout=1.0, **kwargs)[source]

This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.

Args:

response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments

Returns:

rpc_stream(*args, _timeout=1.0, **kwargs)[source]

This function is a synchronous RPC function that is used to send rendering requests to the rendering server and collect the response. This is a blocking function that waits for the response from the worker before returning.

Args:

response_topic: The pubsub topic to return the response to args: The positional arguments kwargs: The keyword arguments

Returns:

uri = 'http://localhost:9000'

host endpoint uri, including protocol and port.

uri: str = Proto(
    "http://localhost:9000",
    env="ZAKU_URI",
)   
name = 'jq-29fcc195-9501-424c-8b66-fce6ef75b534'

This is the name of the queue. It is unique to the client. Defaults to a random uuid to avoid collision, but you usually want to supply a name so that it is easier to find the queue. Zaku also reads from environment variables to side-load the configurations.

name: str = Proto(
    f"jq-{uuid4()}",
    env="ZAKU_QUEUE_NAME",
)
ttl = 5.0

time to live in seconds. Defaults to 5.

no_init = None

Flag for skipping the queue creation.

verbose = None
ZAKU_USER = None
ZAKU_KEY = None