feat: redis-stream task queue#21343
Conversation
…ainst multiple server replicas
…mplementation owns configuration in initalizer
…streams does not well support ability to fetch blocking from multiple streams but only receive one item based on priority; would have to loop through available streams and do non-blocking xread until a message is retrieved; only then could fall back to blocking xreadgroup across all keys and expect a single entry; lua required; production systems that ive researched basically all use redis lists for just this reason
|
Closing #21219 in favor of this PR. See that PR for design discussion. |
|
@desertaxle closed #21219 in favor of this PR as i already had all of my stream-related changes in this base. Base logic is there. Taking a final pass / self review in the next day or two and then will mark as ready for review. |
|
@desertaxle Ready for review whenever y'all get a chance. I'll review comments and make any necessary changes towards the end of the week. There's one failing test that i don't think is related to these changes. |
|
Thanks @gpickney-tsp! Sorry for the delay in review. I'll make sure to give this a proper review either today or tomorrow! |
desertaxle
left a comment
There was a problem hiding this comment.
I like the direction here, but I don’t think the abstraction is quite in the right place yet. There are a couple of correctness issues to sort out first, especially around retry() semantics: right now it’s being used for both interrupted delivery/redelivery and orchestration-driven AwaitingRetry, and those don’t seem interchangeable across backends.
Separately, I think the interface is trying to abstract both publishing and consumption, but the code here already requires consumer-specific behavior such as multi-key subscription, per-connection consumer identity, ack/redelivery lifecycle, and disconnect cleanup. If consumption is part of the contract now, I’d rather see that modeled explicitly with a publisher + consumer/subscription split than keep pushing fan-in and delivery orchestration into the API layer. I think that would give us a much stronger long-term seam for additional backends.
We’ve also talked about whether pydocket could be a better fit for the consumer side of this, since we already use it server-side. If that ends up simplifying the design here, I’m open to that direction as well.
Thanks for sticking with this, and sorry for the long wait for review. Let me know if you have any questions about anything that I posted!
|
|
||
| if validated_state.name == "AwaitingRetry": | ||
| await queue.retry(task_run) | ||
| await backend.retry(task_run) |
There was a problem hiding this comment.
I think this breaks actual task retries for the Redis backend. By the time a deferred task fails and gets moved to AwaitingRetry, the original delivery has already been ACKed on the subscription side, so there’s nothing left in the PEL to recover. Since prefect_redis.task_queue.TaskQueueBackend.retry() is a no-op, this looks like it leaves the task in AwaitingRetry in the DB without ever re-enqueueing it for execution.
| async def retry(self, task_run: schemas.core.TaskRun) -> None: | ||
| # No-op for Redis: entry stays in PEL. XAUTOCLAIM recovers it after | ||
| # inflight_visibility_timeout (default 30s). This means disconnect | ||
| # recovery is not immediate — there is a delay until another consumer | ||
| # claims the stale entry. | ||
| pass |
There was a problem hiding this comment.
This no-op makes sense for disconnect recovery, but I don’t think it works for orchestration-driven retries. The worker ACKs as soon as it receives the task, before execution starts, so a later transition to AwaitingRetry needs to create a new queue entry somewhere. Otherwise, I think Redis-backed deferred tasks can get stuck after their first failure.
| async def retry(self, task_run: schemas.core.TaskRun) -> None: | ||
| kq = self._get_or_create_queue(task_run.task_key) | ||
| try: | ||
| await asyncio.wait_for(kq.retry_sem.acquire(), timeout=5) |
There was a problem hiding this comment.
This changes the in-memory behavior from backpressure to dropping retries after 5 seconds. Previously, we’d block on put() until capacity opened up; now we log and return, which means the retry is just lost. Was that intentional? It feels like a pretty meaningful behavioral regression for both disconnect redelivery and AwaitingRetry.
| await asyncio.shield(backend.retry(queue.get_nowait().task_run)) | ||
| except Exception: | ||
| logger.warning("Failed to retry task during cleanup", exc_info=True) | ||
| await models.task_workers.forget_worker(client_id) |
There was a problem hiding this comment.
This cleanup looks risky when multiple websocket connections reuse the same client_id (which is supported here, since we explicitly test deduping in that case). Closing one socket would remove the tracker entry for the other still-active socket.
| """Add a task run to the queue for its task_key.""" | ||
| ... | ||
|
|
||
| async def retry(self, task_run: schemas.core.TaskRun) -> None: |
There was a problem hiding this comment.
I think retry() is doing two different jobs right now: requeueing an interrupted delivery and scheduling a brand new orchestration-driven retry. Those have pretty different semantics across backends. I’d be tempted to separate those concepts in the interface before other implementations start depending on the current shape.
| async def dequeue( | ||
| self, | ||
| key: str, | ||
| timeout: float = 1, | ||
| ) -> DeliveredTaskRun: |
There was a problem hiding this comment.
dequeue(key) feels pretty tailored to the current server loop. For a pluggable backend, I wonder whether we want the backend to own multi-key subscription/fairness itself, rather than forcing the API layer to spin a separate dequeue loop per key. That seems like it would fit Redis, RabbitMQ, Kafka, etc., a lot better long term.
| TaskQueueBackend: type["TaskQueueBackend"] | ||
|
|
||
|
|
||
| class TaskQueueBackend(Protocol): |
There was a problem hiding this comment.
I think we’ve crossed the threshold where consumption is part of the abstraction too, and if that’s the case, I’d rather model it explicitly now rather than hide it behind the backend object.
This PR already needs consumer-specific behavior like subscription to a set of keys, per-connection consumer identity, delivery lifecycle, ack/redelivery semantics, and cleanup on disconnect. Those feel more like responsibilities of a consumer/subscription object than a global task queue backend. The current dequeue(key) shape seems to leak that mismatch by pushing fan-in and delivery orchestration up into the API layer.
I think this would be a stronger abstraction if the backend exposed a publisher plus a real consumer/subscription interface, and kept orchestration-driven retries separate from consumer redelivery.
Add Redis-backed task queue for multi-replica server deployments
Summary
closes #21218
requires PrefectHQ/prefect-helm#608
The default in-memory task queue uses
asyncio.Queue, which is process-local. When running multiple Prefect server replicas, a task enqueued on Replica A is invisible to TaskWorkers connected to Replica B — tasks silently get lost or pile up on one replica while workers on others sit idle.This PR adds a pluggable task queue backend protocol so deployments can swap in a Redis Streams implementation that shares queues across all replicas. No client-side changes needed.
Design
Single stream per task key — each key gets one Redis Stream (
prefect:tqs:{key}) plus a dead-letter stream (prefect:tqs:{key}:dlq). Consumer groups provide atomic inflight tracking via the PEL (pending entries list). No separate retry stream; the PEL is the retry mechanism. XAUTOCLAIM recovers stale entries from crashed consumers, and entries exceedingmax_retriesare atomically moved to the DLQ via Lua script.Per-key dequeue loops — the websocket handler spawns one
asyncio.Taskper subscribed key, each doing a blockingXREADGROUPon its own stream. Results fan into a sharedasyncio.Queue. A single sender coroutine drains the queue and talks to the websocket, preserving the existing serial send-ack-send protocol unchanged.Connection management — each blocking
XREADGROUPholds a Redis connection for the block duration. Two settings control this:dequeue_block_ms(default 1000ms) sets the block duration, anddequeue_max_concurrency(default uncapped) limits concurrent blocking calls via semaphore to prevent the task queue subsystem from exhausting the shared connection pool.Changes
Protocol + pluggable backend (
src/prefect/server/task_queue/):The old
src/prefect/server/task_queue.pysingle-file module (containingTaskQueueandMultiQueueclasses with directasyncio.Queueusage) is replaced by a package:__init__.py—TaskQueueBackendprotocol withenqueue,retry,ack,dequeue_from_keys;DeliveredTaskRunwrapper carrying an opaqueack_token;get_task_queue_backend()factory that loads the configured backend modulememory.py— In-memory backend (default, preserves existing behavior). Replaces the oldTaskQueue/MultiQueueclasses with a singleTaskQueueBackendclass that conforms to the new protocol. Usesasyncio.PriorityQueueper key soretry()re-enqueues at priority 0 (ahead of scheduled items at priority 1). Adds round-robin key rotation to prevent starvation across keys.ack()is a no-op (no inflight tracking needed in-memory).dequeue_from_keys()blocks withasyncio.Condition+ timeout instead of the oldsleep(0.01)polling loop.Settings (
src/prefect/settings/models/server/tasks.py):backend— module path for the task queue backend (PREFECT_TASK_SCHEDULING_BACKEND), defaults toprefect.server.task_queue.memoryinflight_visibility_timeout— seconds before stale task recovery (Redis only)stream_max_retries— max redelivery attempts before DLQ (Redis only)dequeue_block_ms— XREADGROUP block duration in milliseconds (Redis only)dequeue_max_concurrency— concurrent blocking dequeue cap via semaphore (Redis only)Redis integration (
prefect-redis):src/integrations/prefect-redis/prefect_redis/task_queue.py— Redis StreamsTaskQueueBackend: XADD for enqueue, XREADGROUP for blocking dequeue, atomic XACK+XDEL via Lua for ack, XAUTOCLAIM for stale recovery, DLQ after max retries,retry()is a no-op (PEL handles it),@throttledecorator for consumer cleanup, semaphore-based concurrency capServer (
src/prefect/server/api/task_runs.py):asyncio.Queue. Retry on disconnect, cleanup on cancellationTests
Added —
src/integrations/prefect-redis/tests/test_task_queue.py(new file, 12 tests):TestStreamBasics— enqueue/dequeue, timeout on empty, FIFO ordering, blocking deliveryTestStaleRecovery— XAUTOCLAIM recovery of stale PEL entries, orphan entry skippingTestAck— atomic XACK+XDEL removes from PEL and stream, no-op with None tokenTestRetry—retry()is a no-op for RedisTestDLQ— entry moved to DLQ after max retries, redelivery under thresholdTestConsumerCleanup— idle consumers removed from consumer groupAdded —
tests/test_background_tasks.py(3 new tests):test_fixed_order_multiqueue_starves_later_keys— validates round-robin prevents starvationtest_multiqueue_retry_priority_per_key— validates retry items dequeue before scheduledtest_get_task_queue_backend_rejects_invalid_module— validates backend loader error handlingAdded —
tests/test_settings.py(new entries in SUPPORTED_SETTINGS):PREFECT_SERVER_TASKS_SCHEDULING_*env vars and their legacy aliasesModified —
tests/test_background_tasks.py(existing tests updated):test_scheduled_tasks_are_enqueued_server_side— usesget_task_queue_backend().dequeue_from_keys()instead ofTaskQueue.for_key().get()test_tasks_are_not_enqueued_server_side_when_executed_directly— usesdequeue_from_keys()with timeout instead ofget_nowait()clear_scheduled_task_queuesfixture — callsMemoryTaskQueueBackend().reset()instead ofTaskQueue.reset()Modified —
tests/server/orchestration/api/test_task_run_subscriptions.py:test_task_worker_basic_tracking(parametrized) → split intotest_single_connection_tracked_while_connectedandtest_multiple_connections_deduplicated. Assertions now run inside open websocket connections (no timing races), then verify cleanup after disconnect viaasyncio.sleep(2).Usage
Set
PREFECT_TASK_SCHEDULING_BACKEND=prefect_redis.task_queueon the server. The Redis connection is configured via the existingPREFECT_REDIS_MESSAGING_*environment variables. Optionally tunePREFECT_SERVER_TASKS_SCHEDULING_DEQUEUE_BLOCK_MSandPREFECT_SERVER_TASKS_SCHEDULING_DEQUEUE_MAX_CONCURRENCYfor connection pool pressure. No client-side changes needed.Checklist
<link to issue>"mint.json.