Skip to content

Add backpressure queue for Celery event ingestion#1491

Open
ShubhAtWork wants to merge 1 commit intomher:masterfrom
twofourlabs:fix/event-backpressure
Open

Add backpressure queue for Celery event ingestion#1491
ShubhAtWork wants to merge 1 commit intomher:masterfrom
twofourlabs:fix/event-backpressure

Conversation

@ShubhAtWork
Copy link

@ShubhAtWork ShubhAtWork commented Mar 3, 2026

Summary

The existing on_event() handler calls io_loop.add_callback() for every Celery event. Under high throughput (thousands of events/sec from many workers), this creates unbounded pending callbacks in the IOLoop, causing:

  • Monotonically growing memory as callbacks queue up faster than they can be processed
  • IOLoop starvation — HTTP handlers become unresponsive because the event loop is saturated draining callbacks
  • No visibility into the problem (no logging when events are backed up or dropped)

Changes

Backpressure queue

Replace the direct add_callback() with a bounded queue.Queue (capacity 10,000) and a periodic drain timer that processes events in batches of 500 every 100ms. When the queue is full, events are dropped with rate-limited warning logs (at most once per 5 seconds) so operators can detect and respond to the condition.

Capped retry backoff

The reconnection retry interval doubles on each failure but had no upper bound — after 20 consecutive failures the interval reached ~12 days, meaning Flower would appear permanently disconnected. Cap it at 60 seconds.

Safer shutdown

Events.stop() and Flower.stop() now wrap each timer/component teardown in try/except. Previously, an exception in timer.stop() would skip save_state(), losing all persistent event data accumulated since the last save.

Shelve safety

shelve.open() calls now use context managers and exception handling. The old code could crash on startup if the shelve file was corrupt (no exception handling around state['events']), and save_state() could silently fail without logging.

Test plan

  • pytest tests/unit/test_events.py — 9 tests pass
  • Under high event load, verify memory usage stays bounded
  • Verify "Event queue full" warnings appear in logs when events are dropped
  • Verify reconnection after broker outage happens within ~60s, not days

🤖 Generated with Claude Code

The existing `on_event()` handler calls `io_loop.add_callback()` for
every Celery event. Under high throughput (thousands of events/sec),
this creates unbounded pending callbacks in the IOLoop, causing:

- Monotonically growing memory as callbacks queue up faster than they
  can be processed
- IOLoop starvation — HTTP handlers become unresponsive because the
  event loop is saturated draining callbacks
- No visibility into the problem (no logging when events are being
  dropped or backed up)

Replace the direct `add_callback()` with a bounded `queue.Queue`
(capacity 10,000) and a periodic drain timer that processes events in
batches of 500 every 100ms. When the queue is full, events are dropped
with rate-limited warning logs (at most once per 5 seconds) so
operators can detect the condition.

Additional fixes in this change:

1. **Capped retry backoff**: The reconnection retry interval doubles
   on each failure but had no upper bound — after 20 consecutive
   failures, the interval reached ~12 days. Cap it at 60 seconds.

2. **Safer shutdown**: `Events.stop()` and `Flower.stop()` now wrap
   each timer/component teardown in try/except, ensuring that a crash
   in one doesn't prevent others from stopping or state from being
   saved. Previously, an exception in `timer.stop()` would skip
   `save_state()`, losing persistent event data.

3. **Shelve safety**: `shelve.open()` calls now use context managers
   and exception handling. The old code could crash on startup if the
   shelve file was corrupt, and `save_state()` could silently fail
   without logging.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces backpressure to Celery event ingestion in Flower to prevent unbounded IOLoop callback growth under high event throughput, and hardens reconnection/shutdown persistence behavior.

Changes:

  • Replace per-event IOLoop.add_callback scheduling with a bounded in-memory queue plus periodic batch draining.
  • Cap exponential reconnect backoff to a maximum interval.
  • Make shutdown/state persistence more resilient to exceptions and improve shelve load/save error handling.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
flower/events.py Implements bounded event queue + periodic drain, caps retry interval, and makes shelve/timer shutdown more resilient.
flower/app.py Wraps events.stop() during shutdown to avoid teardown aborting the rest of shutdown flow.
tests/unit/test_events.py Adds unit tests for queue backpressure, drain behavior, retry cap, and stop/save safety.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 163 to +176
def start(self):
threading.Thread.start(self)
if self.enable_events:
logger.debug("Starting enable events timer...")
self.timer.start()

if self.state_save_timer:
logger.debug("Starting state save timer...")
self.state_save_timer.start()

self._drain_timer = PeriodicCallback(self._drain_events,
self._DRAIN_INTERVAL_MS)
self._drain_timer.start()

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PeriodicCallback.start()/stop() binds to the current IOLoop of the calling thread. Here the timers are started directly in Events.start(), which may not be running on self.io_loop’s thread (and Events is explicitly constructed with an io_loop). This can result in the drain/state-save/enable-events callbacks attaching to the wrong loop and calling self.io_loop.run_in_executor(...) from the wrong thread. Consider scheduling timer.start() / state_save_timer.start() / _drain_timer.start() onto self.io_loop via self.io_loop.add_callback(...) (and likewise for .stop()), so these callbacks always run on the intended IOLoop.

Copilot uses AI. Check for mistakes.
Comment on lines +139 to +142
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop, persistent=True, db='test_db')

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses a hard-coded db='test_db' with persistent=True, which will cause shelve.open() to touch the real filesystem (creating/locking files in the repo working directory) and can make unit tests flaky or fail in read-only/parallel environments. Use a temporary path (e.g., tempfile.TemporaryDirectory()/NamedTemporaryFile) or patch shelve.open so the test doesn't create persistent artifacts.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants