Skip to content

feat: redis-stream task queue#21343

Open
gpickney-tsp wants to merge 28 commits intoPrefectHQ:mainfrom
gpickney-tsp:redis-stream-spike
Open

feat: redis-stream task queue#21343
gpickney-tsp wants to merge 28 commits intoPrefectHQ:mainfrom
gpickney-tsp:redis-stream-spike

Conversation

@gpickney-tsp
Copy link
Copy Markdown

@gpickney-tsp gpickney-tsp commented Mar 29, 2026

Add Redis-backed task queue for multi-replica server deployments

Summary

closes #21218
requires PrefectHQ/prefect-helm#608

The default in-memory task queue uses asyncio.Queue, which is process-local. When running multiple Prefect server replicas, a task enqueued on Replica A is invisible to TaskWorkers connected to Replica B — tasks silently get lost or pile up on one replica while workers on others sit idle.

This PR adds a pluggable task queue backend protocol so deployments can swap in a Redis Streams implementation that shares queues across all replicas. No client-side changes needed.

Design

Single stream per task key — each key gets one Redis Stream (prefect:tqs:{key}) plus a dead-letter stream (prefect:tqs:{key}:dlq). Consumer groups provide atomic inflight tracking via the PEL (pending entries list). No separate retry stream; the PEL is the retry mechanism. XAUTOCLAIM recovers stale entries from crashed consumers, and entries exceeding max_retries are atomically moved to the DLQ via Lua script.

Per-key dequeue loops — the websocket handler spawns one asyncio.Task per subscribed key, each doing a blocking XREADGROUP on its own stream. Results fan into a shared asyncio.Queue. A single sender coroutine drains the queue and talks to the websocket, preserving the existing serial send-ack-send protocol unchanged.

Server-side (per websocket connection):
├── dequeue_loop(key_A)  ──→  asyncio.Queue
├── dequeue_loop(key_B)  ──→  asyncio.Queue
├── ...                       ↓
├── dequeue_loop(key_N)  ──→  asyncio.Queue
└── sender()  ←── asyncio.Queue ──→ websocket (send, wait ack, backend.ack)

Connection management — each blocking XREADGROUP holds a Redis connection for the block duration. Two settings control this: dequeue_block_ms (default 1000ms) sets the block duration, and dequeue_max_concurrency (default uncapped) limits concurrent blocking calls via semaphore to prevent the task queue subsystem from exhausting the shared connection pool.

Changes

Protocol + pluggable backend (src/prefect/server/task_queue/):

The old src/prefect/server/task_queue.py single-file module (containing TaskQueue and MultiQueue classes with direct asyncio.Queue usage) is replaced by a package:

  • __init__.pyTaskQueueBackend protocol with enqueue, retry, ack, dequeue_from_keys; DeliveredTaskRun wrapper carrying an opaque ack_token; get_task_queue_backend() factory that loads the configured backend module
  • memory.py — In-memory backend (default, preserves existing behavior). Replaces the old TaskQueue/MultiQueue classes with a single TaskQueueBackend class that conforms to the new protocol. Uses asyncio.PriorityQueue per key so retry() re-enqueues at priority 0 (ahead of scheduled items at priority 1). Adds round-robin key rotation to prevent starvation across keys. ack() is a no-op (no inflight tracking needed in-memory). dequeue_from_keys() blocks with asyncio.Condition + timeout instead of the old sleep(0.01) polling loop.

Settings (src/prefect/settings/models/server/tasks.py):

  • backend — module path for the task queue backend (PREFECT_TASK_SCHEDULING_BACKEND), defaults to prefect.server.task_queue.memory
  • inflight_visibility_timeout — seconds before stale task recovery (Redis only)
  • stream_max_retries — max redelivery attempts before DLQ (Redis only)
  • dequeue_block_ms — XREADGROUP block duration in milliseconds (Redis only)
  • dequeue_max_concurrency — concurrent blocking dequeue cap via semaphore (Redis only)

Redis integration (prefect-redis):

  • src/integrations/prefect-redis/prefect_redis/task_queue.py — Redis Streams TaskQueueBackend: XADD for enqueue, XREADGROUP for blocking dequeue, atomic XACK+XDEL via Lua for ack, XAUTOCLAIM for stale recovery, DLQ after max retries, retry() is a no-op (PEL handles it), @throttle decorator for consumer cleanup, semaphore-based concurrency cap

Server (src/prefect/server/api/task_runs.py):

  • Websocket handler rewritten with per-key dequeue loops + sender coroutine + shared asyncio.Queue. Retry on disconnect, cleanup on cancellation

Tests

Added — src/integrations/prefect-redis/tests/test_task_queue.py (new file, 12 tests):

  • TestStreamBasics — enqueue/dequeue, timeout on empty, FIFO ordering, blocking delivery
  • TestStaleRecovery — XAUTOCLAIM recovery of stale PEL entries, orphan entry skipping
  • TestAck — atomic XACK+XDEL removes from PEL and stream, no-op with None token
  • TestRetryretry() is a no-op for Redis
  • TestDLQ — entry moved to DLQ after max retries, redelivery under threshold
  • TestConsumerCleanup — idle consumers removed from consumer group

Added — tests/test_background_tasks.py (3 new tests):

  • test_fixed_order_multiqueue_starves_later_keys — validates round-robin prevents starvation
  • test_multiqueue_retry_priority_per_key — validates retry items dequeue before scheduled
  • test_get_task_queue_backend_rejects_invalid_module — validates backend loader error handling

Added — tests/test_settings.py (new entries in SUPPORTED_SETTINGS):

  • All new PREFECT_SERVER_TASKS_SCHEDULING_* env vars and their legacy aliases

Modified — tests/test_background_tasks.py (existing tests updated):

  • test_scheduled_tasks_are_enqueued_server_side — uses get_task_queue_backend().dequeue_from_keys() instead of TaskQueue.for_key().get()
  • test_tasks_are_not_enqueued_server_side_when_executed_directly — uses dequeue_from_keys() with timeout instead of get_nowait()
  • clear_scheduled_task_queues fixture — calls MemoryTaskQueueBackend().reset() instead of TaskQueue.reset()

Modified — tests/server/orchestration/api/test_task_run_subscriptions.py:

  • test_task_worker_basic_tracking (parametrized) → split into test_single_connection_tracked_while_connected and test_multiple_connections_deduplicated. Assertions now run inside open websocket connections (no timing races), then verify cleanup after disconnect via asyncio.sleep(2).

Usage

Set PREFECT_TASK_SCHEDULING_BACKEND=prefect_redis.task_queue on the server. The Redis connection is configured via the existing PREFECT_REDIS_MESSAGING_* environment variables. Optionally tune PREFECT_SERVER_TASKS_SCHEDULING_DEQUEUE_BLOCK_MS and PREFECT_SERVER_TASKS_SCHEDULING_DEQUEUE_MAX_CONCURRENCY for connection pool pressure. No client-side changes needed.

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • If this pull request removes docs files, it includes redirect settings in mint.json.
  • If this pull request adds functions or classes, it includes helpful docstrings.

…mplementation owns configuration in initalizer
…streams does not well support ability to fetch blocking from multiple streams but only receive one item based on priority; would have to loop through available streams and do non-blocking xread until a message is retrieved; only then could fall back to blocking xreadgroup across all keys and expect a single entry; lua required; production systems that ive researched basically all use redis lists for just this reason
@codspeed-hq
Copy link
Copy Markdown

codspeed-hq bot commented Mar 29, 2026

Merging this PR will not alter performance

✅ 2 untouched benchmarks


Comparing gpickney-tsp:redis-stream-spike (c44e44e) with main (6506a37)

Open in CodSpeed

@gpickney-tsp
Copy link
Copy Markdown
Author

Closing #21219 in favor of this PR. See that PR for design discussion.

@gpickney-tsp gpickney-tsp changed the title info: redis-stream task queue poc feat: redis-stream task queue Apr 1, 2026
@github-actions github-actions bot added the enhancement An improvement of an existing feature label Apr 1, 2026
@gpickney-tsp
Copy link
Copy Markdown
Author

@desertaxle closed #21219 in favor of this PR as i already had all of my stream-related changes in this base.

Base logic is there. Taking a final pass / self review in the next day or two and then will mark as ready for review.

@gpickney-tsp gpickney-tsp marked this pull request as ready for review April 8, 2026 13:35
@gpickney-tsp
Copy link
Copy Markdown
Author

@desertaxle Ready for review whenever y'all get a chance. I'll review comments and make any necessary changes towards the end of the week. There's one failing test that i don't think is related to these changes.

@desertaxle
Copy link
Copy Markdown
Member

Thanks @gpickney-tsp! Sorry for the delay in review. I'll make sure to give this a proper review either today or tomorrow!

Copy link
Copy Markdown
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction here, but I don’t think the abstraction is quite in the right place yet. There are a couple of correctness issues to sort out first, especially around retry() semantics: right now it’s being used for both interrupted delivery/redelivery and orchestration-driven AwaitingRetry, and those don’t seem interchangeable across backends.

Separately, I think the interface is trying to abstract both publishing and consumption, but the code here already requires consumer-specific behavior such as multi-key subscription, per-connection consumer identity, ack/redelivery lifecycle, and disconnect cleanup. If consumption is part of the contract now, I’d rather see that modeled explicitly with a publisher + consumer/subscription split than keep pushing fan-in and delivery orchestration into the API layer. I think that would give us a much stronger long-term seam for additional backends.

We’ve also talked about whether pydocket could be a better fit for the consumer side of this, since we already use it server-side. If that ends up simplifying the design here, I’m open to that direction as well.

Thanks for sticking with this, and sorry for the long wait for review. Let me know if you have any questions about anything that I posted!


if validated_state.name == "AwaitingRetry":
await queue.retry(task_run)
await backend.retry(task_run)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks actual task retries for the Redis backend. By the time a deferred task fails and gets moved to AwaitingRetry, the original delivery has already been ACKed on the subscription side, so there’s nothing left in the PEL to recover. Since prefect_redis.task_queue.TaskQueueBackend.retry() is a no-op, this looks like it leaves the task in AwaitingRetry in the DB without ever re-enqueueing it for execution.

Comment on lines +242 to +247
async def retry(self, task_run: schemas.core.TaskRun) -> None:
# No-op for Redis: entry stays in PEL. XAUTOCLAIM recovers it after
# inflight_visibility_timeout (default 30s). This means disconnect
# recovery is not immediate — there is a delay until another consumer
# claims the stale entry.
pass
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no-op makes sense for disconnect recovery, but I don’t think it works for orchestration-driven retries. The worker ACKs as soon as it receives the task, before execution starts, so a later transition to AwaitingRetry needs to create a new queue entry somewhere. Otherwise, I think Redis-backed deferred tasks can get stuck after their first failure.

async def retry(self, task_run: schemas.core.TaskRun) -> None:
kq = self._get_or_create_queue(task_run.task_key)
try:
await asyncio.wait_for(kq.retry_sem.acquire(), timeout=5)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the in-memory behavior from backpressure to dropping retries after 5 seconds. Previously, we’d block on put() until capacity opened up; now we log and return, which means the retry is just lost. Was that intentional? It feels like a pretty meaningful behavioral regression for both disconnect redelivery and AwaitingRetry.

await asyncio.shield(backend.retry(queue.get_nowait().task_run))
except Exception:
logger.warning("Failed to retry task during cleanup", exc_info=True)
await models.task_workers.forget_worker(client_id)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup looks risky when multiple websocket connections reuse the same client_id (which is supported here, since we explicitly test deduping in that case). Closing one socket would remove the tracker entry for the other still-active socket.

"""Add a task run to the queue for its task_key."""
...

async def retry(self, task_run: schemas.core.TaskRun) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think retry() is doing two different jobs right now: requeueing an interrupted delivery and scheduling a brand new orchestration-driven retry. Those have pretty different semantics across backends. I’d be tempted to separate those concepts in the interface before other implementations start depending on the current shape.

Comment on lines +68 to +72
async def dequeue(
self,
key: str,
timeout: float = 1,
) -> DeliveredTaskRun:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dequeue(key) feels pretty tailored to the current server loop. For a pluggable backend, I wonder whether we want the backend to own multi-key subscription/fairness itself, rather than forcing the API layer to spin a separate dequeue loop per key. That seems like it would fit Redis, RabbitMQ, Kafka, etc., a lot better long term.

TaskQueueBackend: type["TaskQueueBackend"]


class TaskQueueBackend(Protocol):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we’ve crossed the threshold where consumption is part of the abstraction too, and if that’s the case, I’d rather model it explicitly now rather than hide it behind the backend object.

This PR already needs consumer-specific behavior like subscription to a set of keys, per-connection consumer identity, delivery lifecycle, ack/redelivery semantics, and cleanup on disconnect. Those feel more like responsibilities of a consumer/subscription object than a global task queue backend. The current dequeue(key) shape seems to leak that mismatch by pushing fan-in and delivery orchestration up into the API layer.

I think this would be a stronger abstraction if the backend exposed a publisher plus a real consumer/subscription interface, and kept orchestration-driven retries separate from consumer redelivery.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement An improvement of an existing feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Background task queue is per-process — .delay() tasks stuck in SCHEDULED with multiple server replicas

2 participants