Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dramatiq/brokers/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def flush_all(self) -> None:
for queue_name in chain(self.queues, self.delay_queues):
self.flush(queue_name)

self.dead_letters_by_queue.clear()
# NOTE: do not clear the dead_letters_by_queue to avoid orphaning the references in existing consumers.
for dlq in self.dead_letters_by_queue.values():
dlq.clear()

def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: Optional[bool] = None) -> None:
"""Wait for all the messages on the given queue to be
Expand Down
20 changes: 20 additions & 0 deletions tests/test_stub_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,23 @@ def do_work():
# Then that exception should be raised in my thread
with pytest.raises(CustomError):
stub_broker.join(do_work.queue_name, fail_fast=True)


def test_stub_broker_flush_all_does_not_break_dead_letters(stub_broker, stub_worker):
class SomeError(Exception):
pass

@dramatiq.actor(max_retries=0)
def do_work():
raise SomeError(":(")

do_work.send()
with pytest.raises(SomeError):
stub_broker.join(do_work.queue_name)

# Flush queues then retry. Should have the same result
stub_broker.flush_all()

do_work.send()
with pytest.raises(SomeError):
stub_broker.join(do_work.queue_name)