Skip to content

⚙️ FEATURE-#290: Decouple Task status side effects via EventBus#299

Open
FernandoCelmer wants to merge 6 commits into
developfrom
feature/290
Open

⚙️ FEATURE-#290: Decouple Task status side effects via EventBus#299
FernandoCelmer wants to merge 6 commits into
developfrom
feature/290

Conversation

@FernandoCelmer
Copy link
Copy Markdown
Member

@FernandoCelmer FernandoCelmer commented May 3, 2026

Description

  • dotflow/core/events.py (new): EventBus class and StatusChanged dataclass for in-process pub/sub
  • dotflow/core/subscribers.py (new): LogSubscriber, NotifySubscriber, MetricsSubscriber — each routes StatusChanged events to the corresponding provider
  • dotflow/core/config.py: Config.__init__ creates an EventBus instance and wires the three default subscribers after provider validation
  • dotflow/core/task.py: Task.status setter now captures the previous status and emits StatusChanged instead of calling providers directly
  • tests/core/test_events.py (new): 4 tests covering EventBus emit, subscriber isolation, and ordering guarantees
  • tests/core/test_subscribers.py (new): 10 tests covering all status branches for each subscriber

Motivation and Context

Task status side effects (logging, notification, metrics) were hardcoded directly in the Task.status setter, coupling the task model to concrete provider calls. This change decouples the model from its observers via an event bus, making it straightforward to add or replace subscribers without modifying Task or Config.

Closes #290

Types of changes

  • Bug fix (change that fixes an issue)
  • New feature (change which adds functionality)
  • Documentation

Checklist

  • I have performed a self-review of my own code
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the CHANGELOG
  • I have updated the documentation accordingly

Copy link
Copy Markdown
Member Author

@FernandoCelmer FernandoCelmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review — 2 issues found.

Comment thread dotflow/core/events.py

task: Task
old: TypeStatus
new: TypeStatus
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]

Problem — The old field is typed as TypeStatus, but the very first status transition on every Task emits StatusChanged(old=None, ...) because Task._status is initialised to None in TaskInstance.__init__ and the status setter captures old = self._status before any value has been set.

The type annotation creates a false contract: anyone writing a custom subscriber who reads event.old and treats it as a guaranteed TypeStatus will get an AttributeError or silent None-comparison failure on the very first event.

Failure scenario

class AuditSubscriber:
    def __call__(self, event):
        if isinstance(event, StatusChanged):
            # Crashes with AttributeError on first task creation:
            # 'NoneType' object has no attribute 'value'
            print(f"Transition: {event.old.value}{event.new.value}")

Every Task(...) call immediately triggers this path (self.status = TypeStatus.NOT_STARTED in __init__), so the crash happens on construction, not during execution.

Fix — Narrow the annotation to reflect reality:

@dataclass
class StatusChanged:
    task: Task
    old: TypeStatus | None   # None on the very first transition
    new: TypeStatus

Alternatively, initialise Task._status to TypeStatus.NOT_STARTED in TaskInstance.__init__ so the field is never None, which would let the annotation stay as TypeStatus and also remove the if not self._status guard in the status getter.

ReferencesPEP 484 — Type Hints

Comment thread dotflow/core/events.py
self._subs.append(handler)

def emit(self, event: Any) -> None:
for handler in self._subs:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]

Problememit passes event with a live reference to the Task object inside StatusChanged. Nothing prevents a subscriber from mutating task.status — which calls emit again — producing unbounded recursion that exhausts the call stack.

The three built-in subscribers (LogSubscriber, NotifySubscriber, MetricsSubscriber) are safe today, but the pattern is invisible to users implementing custom subscribers. The docstring says "Subscribers run in registration order" but says nothing about re-entrancy.

Failure scenario

class AutoRetrySubscriber:
    def __call__(self, event):
        if isinstance(event, StatusChanged):
            if event.new == TypeStatus.FAILED:
                # Attempting a retry — sets status → triggers emit → calls this again
                event.task.status = TypeStatus.RETRY  # RecursionError

Fix — Add a simple re-entrancy guard:

class EventBus:
    def __init__(self) -> None:
        self._subs: list[Callable[[Any], None]] = []
        self._emitting: bool = False

    def emit(self, event: Any) -> None:
        if self._emitting:
            return   # or raise, or queue for later — depends on desired semantics
        self._emitting = True
        try:
            for handler in self._subs:
                try:
                    handler(event)
                except Exception:
                    logger.exception("event handler failed")
        finally:
            self._emitting = False

Document the constraint clearly in the class docstring so that subscriber authors know status mutations inside a handler are not allowed (or are deferred).

ReferencesObserver pattern — Re-entrancy considerations

@FernandoCelmer FernandoCelmer added the enhancement New feature or request label May 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant