Add backpressure queue for Celery event ingestion#1491
Add backpressure queue for Celery event ingestion#1491ShubhAtWork wants to merge 1 commit intomher:masterfrom
Conversation
The existing `on_event()` handler calls `io_loop.add_callback()` for every Celery event. Under high throughput (thousands of events/sec), this creates unbounded pending callbacks in the IOLoop, causing: - Monotonically growing memory as callbacks queue up faster than they can be processed - IOLoop starvation — HTTP handlers become unresponsive because the event loop is saturated draining callbacks - No visibility into the problem (no logging when events are being dropped or backed up) Replace the direct `add_callback()` with a bounded `queue.Queue` (capacity 10,000) and a periodic drain timer that processes events in batches of 500 every 100ms. When the queue is full, events are dropped with rate-limited warning logs (at most once per 5 seconds) so operators can detect the condition. Additional fixes in this change: 1. **Capped retry backoff**: The reconnection retry interval doubles on each failure but had no upper bound — after 20 consecutive failures, the interval reached ~12 days. Cap it at 60 seconds. 2. **Safer shutdown**: `Events.stop()` and `Flower.stop()` now wrap each timer/component teardown in try/except, ensuring that a crash in one doesn't prevent others from stopping or state from being saved. Previously, an exception in `timer.stop()` would skip `save_state()`, losing persistent event data. 3. **Shelve safety**: `shelve.open()` calls now use context managers and exception handling. The old code could crash on startup if the shelve file was corrupt, and `save_state()` could silently fail without logging. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces backpressure to Celery event ingestion in Flower to prevent unbounded IOLoop callback growth under high event throughput, and hardens reconnection/shutdown persistence behavior.
Changes:
- Replace per-event
IOLoop.add_callbackscheduling with a bounded in-memory queue plus periodic batch draining. - Cap exponential reconnect backoff to a maximum interval.
- Make shutdown/state persistence more resilient to exceptions and improve shelve load/save error handling.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
flower/events.py |
Implements bounded event queue + periodic drain, caps retry interval, and makes shelve/timer shutdown more resilient. |
flower/app.py |
Wraps events.stop() during shutdown to avoid teardown aborting the rest of shutdown flow. |
tests/unit/test_events.py |
Adds unit tests for queue backpressure, drain behavior, retry cap, and stop/save safety. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def start(self): | ||
| threading.Thread.start(self) | ||
| if self.enable_events: | ||
| logger.debug("Starting enable events timer...") | ||
| self.timer.start() | ||
|
|
||
| if self.state_save_timer: | ||
| logger.debug("Starting state save timer...") | ||
| self.state_save_timer.start() | ||
|
|
||
| self._drain_timer = PeriodicCallback(self._drain_events, | ||
| self._DRAIN_INTERVAL_MS) | ||
| self._drain_timer.start() | ||
|
|
There was a problem hiding this comment.
PeriodicCallback.start()/stop() binds to the current IOLoop of the calling thread. Here the timers are started directly in Events.start(), which may not be running on self.io_loop’s thread (and Events is explicitly constructed with an io_loop). This can result in the drain/state-save/enable-events callbacks attaching to the wrong loop and calling self.io_loop.run_in_executor(...) from the wrong thread. Consider scheduling timer.start() / state_save_timer.start() / _drain_timer.start() onto self.io_loop via self.io_loop.add_callback(...) (and likewise for .stop()), so these callbacks always run on the intended IOLoop.
| capp = celery.Celery() | ||
| io_loop = MagicMock() | ||
| events = Events(capp, io_loop, persistent=True, db='test_db') | ||
|
|
There was a problem hiding this comment.
This test uses a hard-coded db='test_db' with persistent=True, which will cause shelve.open() to touch the real filesystem (creating/locking files in the repo working directory) and can make unit tests flaky or fail in read-only/parallel environments. Use a temporary path (e.g., tempfile.TemporaryDirectory()/NamedTemporaryFile) or patch shelve.open so the test doesn't create persistent artifacts.
Summary
The existing
on_event()handler callsio_loop.add_callback()for every Celery event. Under high throughput (thousands of events/sec from many workers), this creates unbounded pending callbacks in the IOLoop, causing:Changes
Backpressure queue
Replace the direct
add_callback()with a boundedqueue.Queue(capacity 10,000) and a periodic drain timer that processes events in batches of 500 every 100ms. When the queue is full, events are dropped with rate-limited warning logs (at most once per 5 seconds) so operators can detect and respond to the condition.Capped retry backoff
The reconnection retry interval doubles on each failure but had no upper bound — after 20 consecutive failures the interval reached ~12 days, meaning Flower would appear permanently disconnected. Cap it at 60 seconds.
Safer shutdown
Events.stop()andFlower.stop()now wrap each timer/component teardown in try/except. Previously, an exception intimer.stop()would skipsave_state(), losing all persistent event data accumulated since the last save.Shelve safety
shelve.open()calls now use context managers and exception handling. The old code could crash on startup if the shelve file was corrupt (no exception handling aroundstate['events']), andsave_state()could silently fail without logging.Test plan
pytest tests/unit/test_events.py— 9 tests pass🤖 Generated with Claude Code