API¶
Main API of the package follows.
|
Reliable work queue. |
|
Put a task on the queue optionally providing its timeout and delay. |
|
Get a task or a task list from the queue with optional timeout. |
|
Return failed task into the queue with optional delay. |
|
Remove finished task from the queue. |
|
Watch task status change until it's released from the queue. |
|
Schedule the sweep in a background coroutine. |
|
Get task state. |
|
Queue counters. |
|
Task status. |
|
Task state. |
|
Generic Torrelque error. |
|
Torrelque timeout error. |
|
Torrelque lookup error. |
|
Task serialisation delegate. |
- class torrelque.Torrelque(client, *, queue='trq', serialiser=TorrelqueTaskSerialiser(lambda obj: ..., json.loads))¶
Reliable work queue.
- Parameters:
client (Redis) – Redis client instance.
queue (str) – Name of the queue. Must match across producers and consumers.
serialiser – An object with
dumps
andloads
that (de)serialises task bodies.client –
queue –
- Raises:
TorrelqueError – Not a
redis.asyncio.Redis
instance passed.
- task_timeout = 300¶
Default timeout for a task in the “working” set to be considered stale.
- sweep_interval = 30¶
Default interval between sweep calls, when sweep is scheduled.
- result_ttl = 3600¶
Default time-to-live of a task result (when applicable).
- keys = {'delayed': 'delayed', 'dequeueing': 'dequeueing', 'pending': 'pending', 'task': 'task', 'tasks': 'tasks', 'undequeued': 'undequeued', 'working': 'working'}¶
Queue Redis key name mapping.
On initialisation the values are prefixed with the queue name, and the new dictionary is rebound to the instance.
¶ pending
a list containing enqueued task ids
dequeueing
a short-living task id list where
dequeue()
RPOPLPUSH
-es task ids frompending
listundequeued
a set where
sweep()
stored potentially (evaluated on the next sweep) stale, “undequeued”, task idsworking
a sorted set with successfully dequeued task ids where the score is the Unix timestamp when the task becomes stale according to its timeout
delayed
a sorted set with delayed task ids where the score is the Unix timestamp when the task becomes due
tasks
a hash mapping a task id to its serialised representation
task
a string prefix, followed by task id, for hashes storing individual task stats
- async enqueue(task, *, task_timeout=None, delay=None, pipeline=None)¶
Put a task on the queue optionally providing its timeout and delay.
- Parameters:
task – Arbitrary serialisable task payload.
task_timeout (float | None) – Time since the task’s processing start after which it is considered stale.
delay (float | None) – Number of seconds to delay processing of the task, i.e. putting it into the “pending” list. Note that the sweep must be scheduled for the delayed tasks to return, and the delay only has effect after the sweep execution.
pipeline (Pipeline | None) – External Redis pipeline that allows for bulk enqueue.
task_timeout –
delay –
pipeline –
- Returns:
Task identifier.
- Return type:
str
- async dequeue(timeout=None, *, max_tasks=1)¶
Get a task or a task list from the queue with optional timeout.
- Parameters:
timeout (int | None) – Time to wait until a task is available. Timeout applies only to fetching single task. Note that Redis only supports an integer timeout.
max_tasks (int) – If greater than 1 the method will try to optimistically dequeue as many tasks. That means that only 1 task is guaranteed.
timeout –
max_tasks –
- Raises:
TorrelqueTimeoutError – If timeout was provided and there was no result within it.
TorrelqueLookupError – Indicates that the task id has become staling during the runtime of this method. This is not expected under normal circumstances. It can happen if this method is paused, say on a debugger breakpoint, for a duration of 2 sweeps.
- Returns:
Tuple of the task identifier and the deserialised task payload. If
max_tasks
is greater than 1, no matter if it dequeues more than 1 task, the return value is a list of said 2-tuples.- Return type:
Tuple[str, dict] | List[Tuple[str, dict]]
- async requeue(task_id, delay=None, *, task_timeout=None, pipeline=None)¶
Return failed task into the queue with optional delay.
- Parameters:
task_id (str) – Task identifier.
delay (float | None) – Number of seconds to delay putting the task into “pending” list. Note that the sweep must be scheduled in order for tasks from “delayed” to return to “pending” list.
task_timeout (float | None) – Redefine task timeout, which is the time since the task’s processing start after which it is considered stale.
pipeline (Pipeline | None) – External Redis pipeline that allows for bulk requeue.
task_id –
delay –
task_timeout –
pipeline –
- async release(task_id, *, result=None, result_ttl=None, status=TorrelqueTaskStatus.COMPLETED, pipeline=None)¶
Remove finished task from the queue.
Unless
result
is specified, all task information is removed from the queue immediately.Since there’s no dead letter queue, tasks that have exceeded allowed number of retries should also be released, possibly with
TorrelqueTaskStatus.REJECTED
status if producer is interested in the status.- Parameters:
task_id (str) – Task identifier.
result – Arbitrary serialisable task result. If
result
isNone
task state key is removed immediately on release.result_ttl (int | None) – Number of seconds to keep task state key after release. Override of default result TTL.
status (TorrelqueTaskStatus) – Task status to set on release. It only apples when result is not
None
.pipeline (Pipeline | None) – External Redis pipeline that allows for bulk release.
task_id –
result_ttl –
status –
pipeline –
- Raises:
TorrelqueError – If the status is not final.
- async watch(task_id, *, timeout=None)¶
Watch task status change until it’s released from the queue.
Note
This method relies on
notify-keyspace-events
introduced in Redis 2.8. The configuration must have generic and hash commands enabled. That is, the configuration must include eitherKA
orKgh
.- Parameters:
task_id (str) – Task identifier.
timeout (float | None) – Timeout for watching.
task_id –
timeout –
- Raises:
TorrelqueError – If
notify-keyspace-events
is not configured properly.TorrelqueTimeoutError – If
watch
has taken longer thantimeout
.TorrelqueLookupError – If the task state key is not found.
- Returns:
Asynchronous generator that yields task state dictionaries as returned by
get_task_state()
. Generator stops when the task is released. If the task is released without result, generator won’t yielddict
with final status.- Return type:
AsyncIterable[TorrelqueTaskState]
- async sweep()¶
Execute the task sweep.
- Returns:
3-tuple with counts of:
stale tasks from “working” set returned into “pending” list
due delayed tasks from “delayed” set returned into “pending” list
stale dequeueing task ids returned into “pending” list
- Return type:
Tuple[int, int, int]
- schedule_sweep(interval=None)¶
Schedule the sweep in a background coroutine.
- Parameters:
interval (float | None) – Override of default sweep interval.
interval –
- unschedule_sweep()¶
Unschedule the sweep in a background coroutine.
- async get_queue_stats()¶
Get queue counters.
- Return type:
- async get_task_state(task_id)¶
Get task state.
- Parameters:
task_id (str) – Task identifier.
task_id –
- Raises:
TorrelqueLookupError – If the task state key is not found.
- Return type:
- class torrelque.TorrelqueQueueStats(tasks, pending, working, delayed)¶
Queue counters.
- Parameters:
tasks (int) –
pending (int) –
working (int) –
delayed (int) –
- tasks: int¶
Total number of tasks in the queue.
- pending: int¶
Number of pending tasks in the queue.
- working: int¶
Number of working tasks in the queue.
- delayed: int¶
Number of delayed tasks in the queue.
- class torrelque.TorrelqueTaskStatus(value)¶
Task status.
- PENDING = 0¶
Task is enqueued.
- WORKING = 1¶
Task is dequeued.
- DELAYED = 2¶
Task is delayed.
- COMPLETED = 3¶
Task is released, as the result of successful completion.
- REJECTED = 4¶
Task is released, as the result of (multiple) failed attempts.
- isfinal()¶
Tells whether the status is final.
- class torrelque.TorrelqueTaskState(status, timeout, result, dequeue_count, requeue_count, enqueue_time, last_dequeue_time, last_requeue_time, release_time)¶
Task state.
- Parameters:
status (TorrelqueTaskStatus) –
timeout (float) –
result (Any) –
dequeue_count (int) –
requeue_count (int) –
enqueue_time (float) –
last_dequeue_time (float | None) –
last_requeue_time (float | None) –
release_time (float | None) –
- status: TorrelqueTaskStatus¶
Status of the task.
- timeout: float¶
Execution timeout of the task after while it’s considered stale.
- result: Any¶
Optional result of the task with in a final state.
- dequeue_count: int¶
Number of times the task was dequeued from the queue.
- requeue_count: int¶
Number of times the task was requeued from the queue.
- enqueue_time: float¶
Unix timestamp of the enqueue time of the task.
- last_dequeue_time: float | None¶
Optional Unix timestamp of the last dequeue time of the task.
- last_requeue_time: float | None¶
Optional Unix timestamp of the last requeue time of the task.
- release_time: float | None¶
Optional Unix timestamp of the release time of the task in a final status.