zaku.types
Type Interfaces

Here are the basic types for events, html and three.js elements, and others.

class zaku.interfaces.BytesIO

Bases: _BufferedIOBase

Buffered I/O implementation using an in-memory bytes buffer.

__init__(*args, **kwargs)
__new__(**kwargs)
close()

Disable all I/O operations.

closed

True if the file is closed.

flush()

Does nothing.

getbuffer()

Get a read-write view over the contents of the BytesIO object.

getvalue()

Retrieve the entire contents of the BytesIO object.

isatty()

Always returns False.

BytesIO objects are not connected to a TTY-like device.

read(size=-1, /)

Read at most size bytes, returned as a bytes object.

If the size argument is negative, read until EOF is reached. Return an empty bytes object at EOF.

read1(size=-1, /)

Read at most size bytes, returned as a bytes object.

If the size argument is negative or omitted, read until EOF is reached. Return an empty bytes object at EOF.

readable()

Returns True if the IO object can be read.

readinto(buffer, /)

Read bytes into buffer.

Returns number of bytes read (0 for EOF), or None if the object is set not to block and has no data to read.

readline(size=-1, /)

Next line from the file, as a bytes object.

Retain newline. A non-negative size argument limits the maximum number of bytes to return (an incomplete line may be returned then). Return an empty bytes object at EOF.

readlines(size=None, /)

List of bytes objects, each a line from the file.

Call readline() repeatedly and return a list of the lines so read. The optional size argument, if given, is an approximate bound on the total number of bytes in the lines returned.

seek(pos, whence=0, /)

Change stream position.

Seek to byte offset pos relative to position indicated by whence:

0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative.

Returns the new absolute position.

seekable()

Returns True if the IO object can be seeked.

tell()

Current file position, an integer.

truncate(size=None, /)

Truncate the file to at most size bytes.

Size defaults to the current file position, as returned by tell(). The current file position is unchanged. Returns the new size.

writable()

Returns True if the IO object can be written.

write(b, /)

Write bytes to file.

Return the number of bytes written.

writelines(lines, /)

Write lines to the file.

Note that newlines are not added. lines can be any iterable object producing bytes-like objects. This is equivalent to calling write() for each element.

zaku.interfaces.perf_counter() float

Performance counter for benchmarking.

class zaku.interfaces.ZData[source]

Bases: object

static encode(data: torch.Tensor | ndarray)[source]

This converts arrays and tensors to z-format.

Parameters:

data (torch.Tensor | ndarray) –

static get_ztype(data: Dict) Literal['numpy.ndarray', 'torch.Tensor', 'generic'] | None[source]

check if it is z-payload

Parameters:

data (Dict) –

Return type:

Literal[‘numpy.ndarray’, ‘torch.Tensor’, ‘generic’] | None

static decode(zdata)[source]
class zaku.interfaces.Payload[source]

Bases: SimpleNamespace

__init__(_greedy=None, **payload)[source]
greedy = True

Set to False to avoid greedy convertion, and make it go faster

serialize()[source]
static deserialize(payload) Dict[source]
Return type:

Dict

static deserialize_unpacked(unpacked) Dict[source]

used with msgpack.Unpacker in the streaming mode, to let the unpacker handle the end of message. Handling it ourselves is messy.

Return type:

Dict

class zaku.interfaces.Job[source]

Bases: SimpleNamespace

created_ts: float
status: Literal[None, 'in_progress', 'created'] = 'created'
grab_ts: float = None
async static create_queue(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, name, *, prefix)[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

async static remove_queue(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue, *, prefix)[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

static add(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue: str, *, prefix: str, payload: bytes = None, job_id: str = None) Coroutine[source]
Parameters:
  • r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

  • queue (str) –

  • prefix (str) –

  • payload (bytes) –

  • job_id (str) –

Return type:

Coroutine

async static count_files(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue, *, prefix) int[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

Return type:

int

async static take(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue, *, prefix) Tuple[Any, Any][source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

Return type:

Tuple[Any, Any]

async static publish(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue: str, *, payload: bytes, topic_id: str, prefix: str) Coroutine[source]

Publish a job to a key — this is not saved in the queue and is ephemeral.

Parameters:
  • r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

  • queue (str) –

  • payload (bytes) –

  • topic_id (str) –

  • prefix (str) –

Return type:

Coroutine

async static subscribe(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue: str, *, topic_id: str, prefix: str, timeout: float = 0.1) str[source]

Returns the first non-empty message.

Parameters:
  • r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

  • queue (str) –

  • topic_id (str) –

  • prefix (str) –

  • timeout (float) –

Return type:

str

static subscribe_stream(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue: str, *, topic_id: str, prefix: str, timeout: float = 0.1)[source]
Parameters:
  • r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

  • queue (str) –

  • topic_id (str) –

  • prefix (str) –

  • timeout (float) –

async static remove(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, job_id, queue, *, prefix)[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

static reset(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, job_id, queue, *, prefix)[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –

async static unstale_tasks(r: redis.asyncio.Redis | redis.sentinel.asyncio.Redis, queue, *, prefix, ttl=None)[source]
Parameters:

r (redis.asyncio.Redis | redis.sentinel.asyncio.Redis) –