diff --git a/dramatiq/brokers/stub.py b/dramatiq/brokers/stub.py index 8e2637f9..3b8b1c84 100644 --- a/dramatiq/brokers/stub.py +++ b/dramatiq/brokers/stub.py @@ -137,7 +137,9 @@ def flush_all(self) -> None: for queue_name in chain(self.queues, self.delay_queues): self.flush(queue_name) - self.dead_letters_by_queue.clear() + # NOTE: do not clear the dead_letters_by_queue to avoid orphaning the references in existing consumers. + for dlq in self.dead_letters_by_queue.values(): + dlq.clear() def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: Optional[bool] = None) -> None: """Wait for all the messages on the given queue to be diff --git a/tests/test_stub_broker.py b/tests/test_stub_broker.py index 43979dab..b5e7a9c9 100644 --- a/tests/test_stub_broker.py +++ b/tests/test_stub_broker.py @@ -91,3 +91,23 @@ def do_work(): # Then that exception should be raised in my thread with pytest.raises(CustomError): stub_broker.join(do_work.queue_name, fail_fast=True) + + +def test_stub_broker_flush_all_does_not_break_dead_letters(stub_broker, stub_worker): + class SomeError(Exception): + pass + + @dramatiq.actor(max_retries=0) + def do_work(): + raise SomeError(":(") + + do_work.send() + with pytest.raises(SomeError): + stub_broker.join(do_work.queue_name) + + # Flush queues then retry. Should have the same result + stub_broker.flush_all() + + do_work.send() + with pytest.raises(SomeError): + stub_broker.join(do_work.queue_name)