Design¶
This package is inspired by the queue design described in this blog post [1] and presentation [2] on Redis as a Reliable Work Queue.
Assumptions¶
Torrelque strives to be minimal asyncio
wrapper around Redis commands/scripts
to model reliable job queue, and is built with these assumptions:
A task has upper bound of execution time
A task is idempotent or safe to retry otherwise
The package focuses on the library use case (i.e. running inside existing
asyncio
applications), and doesn’t provide facilities to run worker processes out of the boxThere’s no exchange, and tasks are always matched by queue name
There’s no dead letter queue as such (tasks can be delayed in far future), and responsibility of reaping “dead” tasks lies on the application
There’s no Python semantics in the data model, hence it should be possible to implement compatible library in another language
Note
A task must have beforehand-known upper bound of execution time,
which allows determining whether it has become stale. By default,
torrelque.Torrelque.task_timeout
equals to 5 minutes.
Task life-cycle¶
Full Torrelque task life-cycle, what action can lead to what task status, is illustrated below:
requeue or sweep
┌───────────────────┐
│ │
│ │
enqueue ┌───▼───┐ dequeue ┌───┴───┐ release ┌─────────┐
────────────►PENDING├───────────►WORKING├─────────►COMPLETED│
└───▲───┘ └───┬─┬─┘ └─────────┘
│ │ │
sweep │ │
│ │ │
enqueue ┌───┴───┐ requeue │ │ release ┌────────┐
────────────►DELAYED◄───────────────┘ └───────────►REJECTED│
with delay └───────┘ with delay └────────┘
Data model¶
Torrelque’s Redis data model is comprised by 4 keys per queue plus 1 key
per task. The following shows what effect torrelque.Torrelque
methods have on Redis’ state. There are two terminals, one with ipython
and another with redis-cli
. The former is initialised with:
In [1]: import redis.asyncio
...: import torrelque
In [2]: client = redis.asyncio.Redis()
...: queue = torrelque.Torrelque(client, queue='dissect')
In [3]: await queue.enqueue({'test': 1})
Out[3]: '801915328eea11ea9389b82a72fbafe5'
In [4]: await queue.enqueue({'test': 2})
Out[4]: '817a099a8eea11ea9389b82a72fbafe5'
127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:801915328eea11ea9389b82a72fbafe5"
3) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
4) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:pending
list
127.0.0.1:6379> TYPE dissect:tasks
hash
127.0.0.1:6379> TYPE dissect:task:801915328eea11ea9389b82a72fbafe5
hash
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "801915328eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:tasks
1) "801915328eea11ea9389b82a72fbafe5"
2) "{\"test\": 1}"
3) "817a099a8eea11ea9389b82a72fbafe5"
4) "{\"test\": 2}"
127.0.0.1:6379> HGETALL dissect:task:801915328eea11ea9389b82a72fbafe5
1) "status"
2) "0"
3) "enqueue_time"
4) "1588694841.0842385"
5) "timeout"
6) "300"
|
In [5]: await queue.dequeue()
Out[5]: ('801915328eea11ea9389b82a72fbafe5', {'test': 1})
127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:801915328eea11ea9389b82a72fbafe5"
3) "dissect:working"
4) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
5) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:working
zset
127.0.0.1:6379> ZRANGE dissect:working 0 0 WITHSCORES
1) "801915328eea11ea9389b82a72fbafe5"
2) "1588695984.3189406"
127.0.0.1:6379> HGETALL dissect:task:801915328eea11ea9389b82a72fbafe5
1) "status"
2) "1"
3) "enqueue_time"
4) "1588694841.0842385"
5) "timeout"
6) "300"
7) "last_dequeue_time"
8) "1588695864.3189406"
9) "dequeue_count"
10) "1"
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
|
In [6]: await queue.release('801915328eea11ea9389b82a72fbafe5')
127.0.0.1:6379> KEYS *
1) "dissect:pending"
2) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
3) "dissect:tasks"
127.0.0.1:6379> HGETALL dissect:tasks
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "{\"test\": 2}"
|
In [7]: await queue.dequeue()
Out[7]: ('817a099a8eea11ea9389b82a72fbafe5', {'test': 2})
In [8]: await queue.requeue('817a099a8eea11ea9389b82a72fbafe5', delay=300)
127.0.0.1:6379> KEYS *
1) "dissect:delayed"
2) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
3) "dissect:tasks"
127.0.0.1:6379> TYPE dissect:delayed
zset
127.0.0.1:6379> ZRANGE dissect:delayed 0 0 WITHSCORES
1) "817a099a8eea11ea9389b82a72fbafe5"
2) "1588705138.1027842"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
1) "status"
2) "2"
3) "enqueue_time"
4) "1588694843.397282"
5) "timeout"
6) "300"
7) "last_dequeue_time"
8) "1588704815.370081"
9) "dequeue_count"
10) "1"
|
In [9]: await queue.sweep()
Out[9]: 1
127.0.0.1:6379> KEYS *
1) "dissect:tasks"
2) "dissect:pending"
3) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> LRANGE dissect:pending 0 1
1) "817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
1) "status"
2) "0"
3) "enqueue_time"
4) "1588694843.397282"
5) "timeout"
6) "300"
7) "last_dequeue_time"
8) "1588704815.370081"
9) "dequeue_count"
10) "1"
11) "last_requeue_time"
12) "1589144172.8651192"
13) "requeue_count"
14) "1"
|
In [10]: await queue.dequeue()
Out[10]: ('817a099a8eea11ea9389b82a72fbafe5', {'test': 2})
In [11]: await queue.release('817a099a8eea11ea9389b82a72fbafe5', result=42)
127.0.0.1:6379> KEYS *
1) "dissect:task:817a099a8eea11ea9389b82a72fbafe5"
127.0.0.1:6379> HGETALL dissect:task:817a099a8eea11ea9389b82a72fbafe5
1) "status"
2) "3"
3) "enqueue_time"
4) "1588694843.397282"
5) "timeout"
6) "120"
7) "last_dequeue_time"
8) "1589144412.2976456"
9) "dequeue_count"
10) "2"
11) "last_requeue_time"
12) "1589144172.8651192"
13) "requeue_count"
14) "1"
15) "result"
16) "42"
17) "release_time"
18) "1589144447.6812174"
127.0.0.1:6379> TTL dissect:task:817a099a8eea11ea9389b82a72fbafe5
(integer) 3595
|