Design

This package is inspired by the queue design described in this blog post [1] and presentation [2] on Redis as a Reliable Work Queue.

Assumptions

Torrelque strives to be minimal asyncio wrapper around Redis commands/scripts to model reliable job queue, and is built with these assumptions:

  1. A task has upper bound of execution time

  2. A task is idempotent or safe to retry otherwise

  3. The package focuses on the library use case (i.e. running inside existing asyncio applications), and doesn’t provide facilities to run worker processes out of the box

  4. There’s no exchange, and tasks are always matched by queue name

  5. There’s no dead letter queue as such (tasks can be delayed in far future), and responsibility of reaping “dead” tasks lies on the application

  6. There’s no Python semantics in the data model, hence it should be possible to implement compatible library in another language

Note

A task must have beforehand-known upper bound of execution time, which allows determining whether it has become stale. By default, torrelque.Torrelque.task_timeout equals to 5 minutes.

Task life-cycle

Full Torrelque task life-cycle, what action can lead to what task status, is illustrated below:

                   requeue or sweep
                ┌───────────────────┐
                │                   │
                │                   │
 enqueue    ┌───▼───┐  dequeue  ┌───┴───┐ release ┌─────────┐
────────────►PENDING├───────────►WORKING├─────────►COMPLETED│
            └───▲───┘           └───┬─┬─┘         └─────────┘
                │                   │ │
              sweep                 │ │
                │                   │ │
 enqueue    ┌───┴───┐  requeue      │ │   release ┌────────┐
────────────►DELAYED◄───────────────┘ └───────────►REJECTED│
 with delay └───────┘  with delay                 └────────┘

Data model

Torrelque’s Redis data model is comprised by 4 keys per queue plus 1 key per task. The following shows what effect torrelque.Torrelque methods have on Redis’ state. There are two terminals, one with ipython and another with redis-cli. The former is initialised with:

In [1]: import redis.asyncio
   ...: import torrelque

In [2]: client = redis.asyncio.Redis()
   ...: queue = torrelque.Torrelque(client, queue='dissect')
In [3]: await queue.enqueue({'test': 1})
Out[3]: '801915328eea11ea9389b82a72fbafe5'

In [4]: await queue.enqueue({'test': 2})
Out[4]: '817a099a8eea11ea9389b82a72fbafe5'

127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:801915328eea11ea9389b82a72fbafe5"
3) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
4) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:pending
list
127.0.0.1:6379> TYPE dissect:tasks
hash
127.0.0.1:6379> TYPE dissect:task:801915328eea11ea9389b82a72fbafe5
hash
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "801915328eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:tasks
1) "801915328eea11ea9389b82a72fbafe5"
2) "{\"test\": 1}"
3) "817a099a8eea11ea9389b82a72fbafe5"
4) "{\"test\": 2}"
127.0.0.1:6379> HGETALL dissect:task:801915328eea11ea9389b82a72fbafe5
1) "status"
2) "0"
3) "enqueue_time"
4) "1588694841.0842385"
5) "timeout"
6) "300"
In [5]: await queue.dequeue()
Out[5]: ('801915328eea11ea9389b82a72fbafe5', {'test': 1})

127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:801915328eea11ea9389b82a72fbafe5"
3) "dissect:working"
4) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
5) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:working
zset
127.0.0.1:6379> ZRANGE dissect:working 0 0 WITHSCORES
1) "801915328eea11ea9389b82a72fbafe5"
2) "1588695984.3189406"
127.0.0.1:6379> HGETALL dissect:task:801915328eea11ea9389b82a72fbafe5
 1) "status"
 2) "1"
 3) "enqueue_time"
 4) "1588694841.0842385"
 5) "timeout"
 6) "300"
 7) "last_dequeue_time"
 8) "1588695864.3189406"
 9) "dequeue_count"
10) "1"
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
In [6]: await queue.release('801915328eea11ea9389b82a72fbafe5')

127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
3) "dissect:tasks"
127.0.0.1:6379> HGETALL dissect:tasks
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "{\"test\": 2}"
In [7]: await queue.dequeue()
Out[7]: ('817a099a8eea11ea9389b82a72fbafe5', {'test': 2})

In [8]: await queue.requeue('817a099a8eea11ea9389b82a72fbafe5', delay=300)

127.0.0.1:6379> KEYS *
1) "dissect:delayed"
2) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
3) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:delayed
zset
127.0.0.1:6379> ZRANGE dissect:delayed 0 0 WITHSCORES
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "1588705138.1027842"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
 1) "status"
 2) "2"
 3) "enqueue_time"
 4) "1588694843.397282"
 5) "timeout"
 6) "300"
 7) "last_dequeue_time"
 8) "1588704815.370081"
 9) "dequeue_count"
10) "1"
In [9]: await queue.sweep()
Out[9]: 1

127.0.0.1:6379> KEYS *
1) "dissect:tasks"
2) "dissect:pending"
3) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
 1) "status"
 2) "0"
 3) "enqueue_time"
 4) "1588694843.397282"
 5) "timeout"
 6) "300"
 7) "last_dequeue_time"
 8) "1588704815.370081"
 9) "dequeue_count"
10) "1"
11) "last_requeue_time"
12) "1589144172.8651192"
13) "requeue_count"
14) "1"
In [10]: await queue.dequeue()
Out[10]: ('817a099a8eea11ea9389b82a72fbafe5', {'test': 2})

In [11]: await queue.release('817a099a8eea11ea9389b82a72fbafe5', result=42)

127.0.0.1:6379> KEYS *
1) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
 1) "status"
 2) "3"
 3) "enqueue_time"
 4) "1588694843.397282"
 5) "timeout"
 6) "120"
 7) "last_dequeue_time"
 8) "1589144412.2976456"
 9) "dequeue_count"
10) "2"
11) "last_requeue_time"
12) "1589144172.8651192"
13) "requeue_count"
14) "1"
15) "result"
16) "42"
17) "release_time"
18) "1589144447.6812174"
127.0.0.1:6379> TTL dissect:task:817a099a8eea11ea9389b82a72fbafe5
(integer) 3595

Prior art

  1. Reliable queue pattern in Redis [3]

  2. Beanstalkd, a simple and fast work queue [4]