Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions flower/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import logging
import time

from concurrent.futures import ThreadPoolExecutor

Expand All @@ -8,6 +9,7 @@

from tornado import ioloop
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback
from tornado.web import url

from .urls import handlers as default_handlers
Expand Down Expand Up @@ -64,6 +66,7 @@ def __init__(self, options=None, capp=None, events=None,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
self.started = False
self._purge_timer = None

def start(self):
self.events.start()
Expand All @@ -80,11 +83,20 @@ def start(self):

self.started = True
self.update_workers()

if self.options.purge_offline_workers is not None:
interval_ms = max(self.options.purge_offline_workers * 1000, 10000)
self._purge_timer = PeriodicCallback(self._purge_offline_workers,
interval_ms)
self._purge_timer.start()

self.io_loop.start()

def stop(self):
if self.started:
self.events.stop()
if self._purge_timer:
self._purge_timer.stop()
logging.debug("Stopping executors...")
self.executor.shutdown(wait=False)
logging.debug("Stopping event loop...")
Expand All @@ -101,3 +113,46 @@ def workers(self):

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

def _purge_offline_workers(self):
"""Purge workers that have been offline beyond the threshold.

Handles two cases:
- Workers present in state.workers: check alive status + heartbeat age
- Orphaned entries (in counter/inspector but not state.workers): always purge
"""
threshold = self.options.purge_offline_workers
if threshold is None:
return

now = time.time()
state = self.events.state

# Collect all known worker names from state.counter and inspector.workers
all_worker_names = set(state.counter.keys()) | set(self.inspector.workers.keys())

for worker_name in all_worker_names:
worker = state.workers.get(worker_name)
if worker is not None:
# Skip workers that are still alive
if worker.alive:
continue

# Check if the worker has been offline beyond the threshold
heartbeats = getattr(worker, 'heartbeats', [])
if heartbeats:
last_heartbeat = max(heartbeats)
if now - last_heartbeat <= threshold:
continue
# else: worker not in state.workers — orphaned entry, always purge

# Purge from state.counter
state.counter.pop(worker_name, None)

# Purge Prometheus metrics for this worker
state.metrics.remove_worker_metrics(worker_name)

# Purge from inspector
self.inspector.purge_worker(worker_name)

logger.debug("Purged offline worker: %s", worker_name)
24 changes: 24 additions & 0 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ def __init__(self):
['worker']
)

def remove_worker_metrics(self, worker_name):
"""Remove all Prometheus metric label series for a given worker."""
metrics = [
self.events, self.runtime, self.prefetch_time,
self.number_of_prefetched_tasks, self.worker_online,
self.worker_number_of_currently_executing_tasks,
]
for metric in metrics:
# _metrics is the internal dict of label-value tuples -> child metrics.
# Guard access since it's a private attr that may vary across versions.
storage = getattr(metric, '_metrics', None)
if storage is None:
continue
try:
keys_to_remove = [
key for key in storage
if key and key[0] == worker_name
]
for key in keys_to_remove:
metric.remove(*key)
except Exception:
logger.debug("Failed to remove metrics for worker %s from %s",
worker_name, metric, exc_info=True)


class EventsState(State):
# EventsState object is created and accessed only from ioloop thread
Expand Down
4 changes: 4 additions & 0 deletions flower/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def __init__(self, io_loop, capp, timeout):
self.timeout = timeout
self.workers = collections.defaultdict(dict)

def purge_worker(self, worker_name):
"""Remove a worker from the inspector's cached data."""
self.workers.pop(worker_name, None)

def inspect(self, workername=None):
feutures = []
for method in self.methods:
Expand Down
114 changes: 114 additions & 0 deletions tests/unit/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock

import celery
from celery.events import Event
from celery.events.state import Worker
from tornado.ioloop import IOLoop
from tornado.options import options

from flower import command # noqa: F401 side effect - define options
from flower.app import Flower
from flower.events import Events, EventsState, get_prometheus_metrics
from flower.urls import handlers, settings


class TestPurgeOfflineWorkers(unittest.TestCase):
def setUp(self):
capp = celery.Celery()
events = Events(capp, IOLoop.current())
self.app = Flower(capp=capp, events=events,
options=options, handlers=handlers, **settings)
self._orig_purge = options.purge_offline_workers

def tearDown(self):
options.purge_offline_workers = self._orig_purge

def test_purge_removes_offline_workers(self):
state = EventsState()
w, _ = state.get_or_create_worker('w1')
state.counter['w1']['worker-online'] = 1
w.heartbeats = [time.time() - 3600]
self.app.events.state = state

self.app.options.purge_offline_workers = 60
with patch.object(Worker, 'alive', new_callable=PropertyMock, return_value=False):
self.app._purge_offline_workers()

self.assertNotIn('w1', state.counter)

def test_purge_keeps_alive_workers(self):
state = EventsState()
w, _ = state.get_or_create_worker('w1')
state.counter['w1']['worker-online'] = 1
w.heartbeats = [time.time()]
self.app.events.state = state

self.app.options.purge_offline_workers = 60
with patch.object(Worker, 'alive', new_callable=PropertyMock, return_value=True):
self.app._purge_offline_workers()

self.assertIn('w1', state.counter)

def test_purge_keeps_recently_offline_workers(self):
state = EventsState()
w, _ = state.get_or_create_worker('w1')
state.counter['w1']['worker-online'] = 1
w.heartbeats = [time.time() - 10] # 10 seconds ago
self.app.events.state = state

self.app.options.purge_offline_workers = 60 # threshold 60s
with patch.object(Worker, 'alive', new_callable=PropertyMock, return_value=False):
self.app._purge_offline_workers()

self.assertIn('w1', state.counter)

def test_purge_removes_orphaned_counter_entries(self):
state = EventsState()
state.counter['orphan_worker']['worker-online'] = 1
self.app.events.state = state

self.app.options.purge_offline_workers = 60
self.app._purge_offline_workers()

self.assertNotIn('orphan_worker', state.counter)

def test_purge_removes_orphaned_inspector_entries(self):
state = EventsState()
self.app.events.state = state
self.app.inspector.workers['orphan_worker'] = {'stats': {}}

self.app.options.purge_offline_workers = 60
self.app._purge_offline_workers()

self.assertNotIn('orphan_worker', self.app.inspector.workers)

def test_purge_noop_when_threshold_is_none(self):
state = EventsState()
state.counter['w1']['worker-online'] = 1
self.app.events.state = state

self.app.options.purge_offline_workers = None
self.app._purge_offline_workers()

self.assertIn('w1', state.counter)

def test_purge_cleans_prometheus_metrics(self):
state = EventsState()
w, _ = state.get_or_create_worker('test_purge_prom_w1')
state.counter['test_purge_prom_w1']['worker-online'] = 1
w.heartbeats = [time.time() - 3600]
metrics = get_prometheus_metrics()
metrics.worker_online.labels('test_purge_prom_w1').set(1)
self.app.events.state = state

self.app.options.purge_offline_workers = 60
with patch.object(Worker, 'alive', new_callable=PropertyMock, return_value=False):
self.app._purge_offline_workers()

self.assertNotIn(('test_purge_prom_w1',), metrics.worker_online._metrics)


if __name__ == '__main__':
unittest.main()
53 changes: 53 additions & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest

from flower.events import get_prometheus_metrics


class TestPrometheusMetricsRemoval(unittest.TestCase):
"""Test remove_worker_metrics using the global singleton to avoid
duplicate registry errors from prometheus_client."""

def test_remove_worker_metrics_clears_labels(self):
metrics = get_prometheus_metrics()
metrics.worker_online.labels('test_remove_w1').set(1)
metrics.worker_online.labels('test_remove_w2').set(1)

self.assertIn(('test_remove_w1',), metrics.worker_online._metrics)

metrics.remove_worker_metrics('test_remove_w1')

self.assertNotIn(('test_remove_w1',), metrics.worker_online._metrics)
self.assertIn(('test_remove_w2',), metrics.worker_online._metrics)

def test_remove_nonexistent_worker_is_noop(self):
metrics = get_prometheus_metrics()
# Should not raise
metrics.remove_worker_metrics('test_remove_nonexistent_worker_xyz')

def test_remove_multi_label_metrics(self):
metrics = get_prometheus_metrics()
metrics.runtime.labels('test_remove_mw1', 'task1').observe(1.0)
metrics.runtime.labels('test_remove_mw1', 'task2').observe(2.0)
metrics.runtime.labels('test_remove_mw2', 'task1').observe(3.0)

metrics.remove_worker_metrics('test_remove_mw1')

remaining_keys = list(metrics.runtime._metrics.keys())
for key in remaining_keys:
self.assertNotEqual(key[0], 'test_remove_mw1')
self.assertIn(('test_remove_mw2', 'task1'), metrics.runtime._metrics)

def test_remove_handles_missing_private_attr(self):
metrics = get_prometheus_metrics()
# Temporarily remove _metrics to simulate missing attr
original = metrics.worker_online._metrics
try:
del metrics.worker_online._metrics
# Should not raise — getattr guard should catch it
metrics.remove_worker_metrics('w1')
finally:
metrics.worker_online._metrics = original


if __name__ == '__main__':
unittest.main()
37 changes: 37 additions & 0 deletions tests/unit/test_inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest
from unittest.mock import MagicMock

from flower.inspector import Inspector


class TestInspectorPurge(unittest.TestCase):
def test_purge_worker_removes_entry(self):
inspector = Inspector(MagicMock(), MagicMock(), 1.0)
inspector.workers['w1'] = {'stats': {}}
inspector.workers['w2'] = {'stats': {}}

inspector.purge_worker('w1')

self.assertNotIn('w1', inspector.workers)
self.assertIn('w2', inspector.workers)

def test_purge_worker_noop_for_unknown(self):
inspector = Inspector(MagicMock(), MagicMock(), 1.0)
inspector.workers['w1'] = {'stats': {}}

# Should not raise
inspector.purge_worker('nonexistent')

self.assertIn('w1', inspector.workers)

def test_purge_worker_empty_workers(self):
inspector = Inspector(MagicMock(), MagicMock(), 1.0)

# Should not raise on empty defaultdict
inspector.purge_worker('w1')

self.assertEqual(len(inspector.workers), 0)


if __name__ == '__main__':
unittest.main()