API

Main API of the package follows.

Torrelque(client, *[, queue, serialiser])

Reliable work queue.

Torrelque.enqueue(task, *[, task_timeout, ...])

Put a task on the queue optionally providing its timeout and delay.

Torrelque.dequeue([timeout, max_tasks])

Get a task or a task list from the queue with optional timeout.

Torrelque.requeue(task_id[, delay, ...])

Return failed task into the queue with optional delay.

Torrelque.release(task_id, *[, result, ...])

Remove finished task from the queue.

Torrelque.watch(task_id, *[, timeout])

Watch task status change until it's released from the queue.

Torrelque.schedule_sweep([interval])

Schedule the sweep in a background coroutine.

Torrelque.get_task_state(task_id)

Get task state.

TorrelqueQueueStats(tasks, pending, working, ...)

Queue counters.

TorrelqueTaskStatus(value)

Task status.

TorrelqueTaskState(status, timeout, result, ...)

Task state.

TorrelqueError

Generic Torrelque error.

TorrelqueTimeoutError

Torrelque timeout error.

TorrelqueLookupError

Torrelque lookup error.

TorrelqueTaskSerialiser(dumps, loads)

Task serialisation delegate.

class torrelque.Torrelque(client, *, queue='trq', serialiser=TorrelqueTaskSerialiser(lambda obj: ..., json.loads))

Reliable work queue.

Parameters:
  • client (Redis) – Redis client instance.

  • queue (str) – Name of the queue. Must match across producers and consumers.

  • serialiser – An object with dumps and loads that (de)serialises task bodies.

  • client

  • queue

Raises:

TorrelqueError – Not a redis.asyncio.Redis instance passed.

task_timeout = 300

Default timeout for a task in the “working” set to be considered stale.

sweep_interval = 30

Default interval between sweep calls, when sweep is scheduled.

result_ttl = 3600

Default time-to-live of a task result (when applicable).

keys = {'delayed': 'delayed', 'dequeueing': 'dequeueing', 'pending': 'pending', 'task': 'task', 'tasks': 'tasks', 'undequeued': 'undequeued', 'working': 'working'}

Queue Redis key name mapping.

On initialisation the values are prefixed with the queue name, and the new dictionary is rebound to the instance.

Redis key description

pending

a list containing enqueued task ids

dequeueing

a short-living task id list where dequeue() RPOPLPUSH-es task ids from pending list

undequeued

a set where sweep() stored potentially (evaluated on the next sweep) stale, “undequeued”, task ids

working

a sorted set with successfully dequeued task ids where the score is the Unix timestamp when the task becomes stale according to its timeout

delayed

a sorted set with delayed task ids where the score is the Unix timestamp when the task becomes due

tasks

a hash mapping a task id to its serialised representation

task

a string prefix, followed by task id, for hashes storing individual task stats

async enqueue(task, *, task_timeout=None, delay=None, pipeline=None)

Put a task on the queue optionally providing its timeout and delay.

Parameters:
  • task – Arbitrary serialisable task payload.

  • task_timeout (float | None) – Time since the task’s processing start after which it is considered stale.

  • delay (float | None) – Number of seconds to delay processing of the task, i.e. putting it into the “pending” list. Note that the sweep must be scheduled for the delayed tasks to return, and the delay only has effect after the sweep execution.

  • pipeline (Pipeline | None) – External Redis pipeline that allows for bulk enqueue.

  • task_timeout

  • delay

  • pipeline

Returns:

Task identifier.

Return type:

str

async dequeue(timeout=None, *, max_tasks=1)

Get a task or a task list from the queue with optional timeout.

Parameters:
  • timeout (int | None) – Time to wait until a task is available. Timeout applies only to fetching single task. Note that Redis only supports an integer timeout.

  • max_tasks (int) – If greater than 1 the method will try to optimistically dequeue as many tasks. That means that only 1 task is guaranteed.

  • timeout

  • max_tasks

Raises:
  • TorrelqueTimeoutError – If timeout was provided and there was no result within it.

  • TorrelqueLookupError – Indicates that the task id has become staling during the runtime of this method. This is not expected under normal circumstances. It can happen if this method is paused, say on a debugger breakpoint, for a duration of 2 sweeps.

Returns:

Tuple of the task identifier and the deserialised task payload. If max_tasks is greater than 1, no matter if it dequeues more than 1 task, the return value is a list of said 2-tuples.

Return type:

Tuple[str, dict] | List[Tuple[str, dict]]

async requeue(task_id, delay=None, *, task_timeout=None, pipeline=None)

Return failed task into the queue with optional delay.

Parameters:
  • task_id (str) – Task identifier.

  • delay (float | None) – Number of seconds to delay putting the task into “pending” list. Note that the sweep must be scheduled in order for tasks from “delayed” to return to “pending” list.

  • task_timeout (float | None) – Redefine task timeout, which is the time since the task’s processing start after which it is considered stale.

  • pipeline (Pipeline | None) – External Redis pipeline that allows for bulk requeue.

  • task_id

  • delay

  • task_timeout

  • pipeline

async release(task_id, *, result=None, result_ttl=None, status=TorrelqueTaskStatus.COMPLETED, pipeline=None)

Remove finished task from the queue.

Unless result is specified, all task information is removed from the queue immediately.

Since there’s no dead letter queue, tasks that have exceeded allowed number of retries should also be released, possibly with TorrelqueTaskStatus.REJECTED status if producer is interested in the status.

Parameters:
  • task_id (str) – Task identifier.

  • result – Arbitrary serialisable task result. If result is None task state key is removed immediately on release.

  • result_ttl (int | None) – Number of seconds to keep task state key after release. Override of default result TTL.

  • status (TorrelqueTaskStatus) – Task status to set on release. It only apples when result is not None.

  • pipeline (Pipeline | None) – External Redis pipeline that allows for bulk release.

  • task_id

  • result_ttl

  • status

  • pipeline

Raises:

TorrelqueError – If the status is not final.

async watch(task_id, *, timeout=None)

Watch task status change until it’s released from the queue.

Note

This method relies on notify-keyspace-events introduced in Redis 2.8. The configuration must have generic and hash commands enabled. That is, the configuration must include either KA or Kgh.

Parameters:
  • task_id (str) – Task identifier.

  • timeout (float | None) – Timeout for watching.

  • task_id

  • timeout

Raises:
  • TorrelqueError – If notify-keyspace-events is not configured properly.

  • TorrelqueTimeoutError – If watch has taken longer than timeout.

  • TorrelqueLookupError – If the task state key is not found.

Returns:

Asynchronous generator that yields task state dictionaries as returned by get_task_state(). Generator stops when the task is released. If the task is released without result, generator won’t yield dict with final status.

Return type:

AsyncIterable[TorrelqueTaskState]

async sweep()

Execute the task sweep.

Returns:

3-tuple with counts of:

  • stale tasks from “working” set returned into “pending” list

  • due delayed tasks from “delayed” set returned into “pending” list

  • stale dequeueing task ids returned into “pending” list

Return type:

Tuple[int, int, int]

schedule_sweep(interval=None)

Schedule the sweep in a background coroutine.

Parameters:
  • interval (float | None) – Override of default sweep interval.

  • interval

unschedule_sweep()

Unschedule the sweep in a background coroutine.

async get_queue_stats()

Get queue counters.

Return type:

TorrelqueQueueStats

async get_task_state(task_id)

Get task state.

Parameters:
  • task_id (str) – Task identifier.

  • task_id

Raises:

TorrelqueLookupError – If the task state key is not found.

Return type:

TorrelqueTaskState

class torrelque.TorrelqueQueueStats(tasks, pending, working, delayed)

Queue counters.

Parameters:
  • tasks (int) –

  • pending (int) –

  • working (int) –

  • delayed (int) –

tasks: int

Total number of tasks in the queue.

pending: int

Number of pending tasks in the queue.

working: int

Number of working tasks in the queue.

delayed: int

Number of delayed tasks in the queue.

class torrelque.TorrelqueTaskStatus(value)

Task status.

PENDING = 0

Task is enqueued.

WORKING = 1

Task is dequeued.

DELAYED = 2

Task is delayed.

COMPLETED = 3

Task is released, as the result of successful completion.

REJECTED = 4

Task is released, as the result of (multiple) failed attempts.

isfinal()

Tells whether the status is final.

class torrelque.TorrelqueTaskState(status, timeout, result, dequeue_count, requeue_count, enqueue_time, last_dequeue_time, last_requeue_time, release_time)

Task state.

Parameters:
  • status (TorrelqueTaskStatus) –

  • timeout (float) –

  • result (Any) –

  • dequeue_count (int) –

  • requeue_count (int) –

  • enqueue_time (float) –

  • last_dequeue_time (float | None) –

  • last_requeue_time (float | None) –

  • release_time (float | None) –

status: TorrelqueTaskStatus

Status of the task.

timeout: float

Execution timeout of the task after while it’s considered stale.

result: Any

Optional result of the task with in a final state.

dequeue_count: int

Number of times the task was dequeued from the queue.

requeue_count: int

Number of times the task was requeued from the queue.

enqueue_time: float

Unix timestamp of the enqueue time of the task.

last_dequeue_time: float | None

Optional Unix timestamp of the last dequeue time of the task.

last_requeue_time: float | None

Optional Unix timestamp of the last requeue time of the task.

release_time: float | None

Optional Unix timestamp of the release time of the task in a final status.

class torrelque.TorrelqueTaskSerialiser(dumps, loads)

Task serialisation delegate.

Parameters:
  • dumps (Callable[[Any], bytes]) –

  • loads (Callable[[bytes], Any]) –

dumps: Callable[[Any], bytes]

Serialise task data.

loads: Callable[[bytes], Any]

Deserialise task data.