⚙️ FEATURE-#290: Decouple Task status side effects via EventBus#299
⚙️ FEATURE-#290: Decouple Task status side effects via EventBus#299FernandoCelmer wants to merge 6 commits into
Conversation
FernandoCelmer
left a comment
There was a problem hiding this comment.
Code review — 2 issues found.
|
|
||
| task: Task | ||
| old: TypeStatus | ||
| new: TypeStatus |
There was a problem hiding this comment.
[Suggestion]
Problem — The old field is typed as TypeStatus, but the very first status transition on every Task emits StatusChanged(old=None, ...) because Task._status is initialised to None in TaskInstance.__init__ and the status setter captures old = self._status before any value has been set.
The type annotation creates a false contract: anyone writing a custom subscriber who reads event.old and treats it as a guaranteed TypeStatus will get an AttributeError or silent None-comparison failure on the very first event.
Failure scenario
class AuditSubscriber:
def __call__(self, event):
if isinstance(event, StatusChanged):
# Crashes with AttributeError on first task creation:
# 'NoneType' object has no attribute 'value'
print(f"Transition: {event.old.value} → {event.new.value}")Every Task(...) call immediately triggers this path (self.status = TypeStatus.NOT_STARTED in __init__), so the crash happens on construction, not during execution.
Fix — Narrow the annotation to reflect reality:
@dataclass
class StatusChanged:
task: Task
old: TypeStatus | None # None on the very first transition
new: TypeStatusAlternatively, initialise Task._status to TypeStatus.NOT_STARTED in TaskInstance.__init__ so the field is never None, which would let the annotation stay as TypeStatus and also remove the if not self._status guard in the status getter.
References — PEP 484 — Type Hints
| self._subs.append(handler) | ||
|
|
||
| def emit(self, event: Any) -> None: | ||
| for handler in self._subs: |
There was a problem hiding this comment.
[Suggestion]
Problem — emit passes event with a live reference to the Task object inside StatusChanged. Nothing prevents a subscriber from mutating task.status — which calls emit again — producing unbounded recursion that exhausts the call stack.
The three built-in subscribers (LogSubscriber, NotifySubscriber, MetricsSubscriber) are safe today, but the pattern is invisible to users implementing custom subscribers. The docstring says "Subscribers run in registration order" but says nothing about re-entrancy.
Failure scenario
class AutoRetrySubscriber:
def __call__(self, event):
if isinstance(event, StatusChanged):
if event.new == TypeStatus.FAILED:
# Attempting a retry — sets status → triggers emit → calls this again
event.task.status = TypeStatus.RETRY # RecursionErrorFix — Add a simple re-entrancy guard:
class EventBus:
def __init__(self) -> None:
self._subs: list[Callable[[Any], None]] = []
self._emitting: bool = False
def emit(self, event: Any) -> None:
if self._emitting:
return # or raise, or queue for later — depends on desired semantics
self._emitting = True
try:
for handler in self._subs:
try:
handler(event)
except Exception:
logger.exception("event handler failed")
finally:
self._emitting = FalseDocument the constraint clearly in the class docstring so that subscriber authors know status mutations inside a handler are not allowed (or are deferred).
References — Observer pattern — Re-entrancy considerations
Description
dotflow/core/events.py(new):EventBusclass andStatusChangeddataclass for in-process pub/subdotflow/core/subscribers.py(new):LogSubscriber,NotifySubscriber,MetricsSubscriber— each routesStatusChangedevents to the corresponding providerdotflow/core/config.py:Config.__init__creates anEventBusinstance and wires the three default subscribers after provider validationdotflow/core/task.py:Task.statussetter now captures the previous status and emitsStatusChangedinstead of calling providers directlytests/core/test_events.py(new): 4 tests coveringEventBusemit, subscriber isolation, and ordering guaranteestests/core/test_subscribers.py(new): 10 tests covering all status branches for each subscriberMotivation and Context
Task status side effects (logging, notification, metrics) were hardcoded directly in the
Task.statussetter, coupling the task model to concrete provider calls. This change decouples the model from its observers via an event bus, making it straightforward to add or replace subscribers without modifyingTaskorConfig.Closes #290
Types of changes
Checklist