PyPI - License Pipeline status Test code coverage PyPI RTFD

Torrelque

Torrelque is a Python package that provides minimal asynchronous reliable distributed Redis-backed (or a protocol-compatible alternative) work queues. It is built:

  1. Lock-free. It relies on Redis transactions and its single-threaded execution model.

  2. Poll-free. Waiting subset of the Python API relies either on blocking Redis commands or notifications.

  3. Bulk-friendly. Tasks can be produced and consumed in bulk.

  4. Introspectable. Task stats, task status transition watching API, and the data model comprehensible directly in Redis.

Supported Redis server implementations: Redis, KeyDB.

Install

pip install Torrelque

Quickstart

Producer:

import redis.asyncio
import torrelque

client = redis.asyncio.Redis()
queue = torrelque.Torrelque(client, queue='email')
queue.schedule_sweep()  # to make due requeued tasks available again

task_data = {'addr': 'joe@doe.com', 'subj': 'hello', 'body': '...'}
task_id = await queue.enqueue(task_data)
print('Email task enqueued', task_id)

Consumer:

import redis.asyncio
import torrelque

client = redis.asyncio.Redis()
queue = torrelque.Torrelque(client, queue='email')

while True:
    task_id, task_data = await queue.dequeue()
    try:
        await some_email_client.send(**task_data)
    except Exception:
        print('Email sending error, retrying in 30s', task_id)
        await queue.requeue(task_id, delay=30)
    else:
        print('Email sent', task_id)
        await queue.release(task_id)

Example list

  • Producer-consumer. Infinite producing and consuming loops.

  • Batch processing. Finite number of tasks, consumers stop with a poison pill, bulk enqueue. This example can be used as a synthetic benchmark. Because there’s no IO-bound workload, it’ll be CPU-bound which isn’t normal mode of operation for an asynchronous application. But it can be used to compare between CPython, PyPy and concurrency parameters.

  • Web application background task. This tornado application allows to start a task and push server-sent events (SSE) to UI about its status. UI starts a task and waits for it to complete. When a task fails it’s re-queued with exponential back-off.