Skip to content

fix(broker): replace blocking stdout writer task with tokio::io#841

Merged
khaliqgant merged 2 commits into
mainfrom
fix/broker-async-stdout-writer
May 11, 2026
Merged

fix(broker): replace blocking stdout writer task with tokio::io#841
khaliqgant merged 2 commits into
mainfrom
fix/broker-async-stdout-writer

Conversation

@khaliqgant
Copy link
Copy Markdown
Member

Summary

Two writer tasks in the broker spawn-loop body used std::io::stdout().lock().write_all() from inside tokio::spawn:

  • `src/pty_worker.rs:182-193` (PTY worker → SDK frame stream)
  • `src/main.rs::run_headless_worker` ~ `:3817-3828` (headless worker → SDK frame stream)

Synchronous I/O from an async task blocks the OS worker thread when the parent reader stalls. On macOS the kernel pipe buffer is 64KB; once it fills, the `write_all()` syscall blocks, taking the tokio thread with it. On resumption, the `tokio::select!` wakeup that drives `pty_rx` is lost — the broker stays parked in `_pthread_cond_wait` and downstream PTY workers freeze.

This is the upstream root-cause fix corresponding to relay#838 (SDK-side drain — necessary but not sufficient) and ricky#98 (Node-side spawn monkeypatch — backstop). After both of those landed, multi-agent workflows still wedged at fanout because the broker writer task itself remained the trap.

Reproducer

Any workflow that fans out to ≥9 PTY workers with sustained `worker_stream` traffic. We reproduced via `proactive-runtime-m1` four times across two days — every wedge had the same shape:

Wedge bundle broker-init main thread broker-pty thread
92b45d3e (pre-drain) `write()` blocked (2 frames) + 4 mutex-wait frames `write()` blocked
c9600674 (pre-drain) `_pthread_mutex_firstfit_lock_wait` (4 frames) `write()` blocked
75e5064d (post-SDK-6.0.15 + post-ricky#98 loader unblocker) All 9 tokio workers in `_pthread_cond_wait`; zero `write`; zero mutex-wait n/a

The 75e5064d shape is the same blocking-writer design in cancel-unsafe steady state: workers had drained their kernel pipes by the time we sampled, but the wakeup chain into `tokio::select!` had been lost when one of them temporarily stalled in `write_all` earlier. All 9 workers' logs froze within 4 seconds of each other.

What changed

  • `src/pty_worker.rs` — writer task uses `tokio::io::stdout()` + `AsyncWriteExt::write_all().await`. Imports `AsyncWriteExt` inside the task to scope the trait. Parent-closes-pipe now breaks the loop instead of silently dropping writes.
  • `src/main.rs::run_headless_worker` — same change to the headless writer task. Uses the already-imported `AsyncWriteExt` from line 37.

Backpressure now surfaces as `Pending`. The OS thread never blocks. No wakeups are lost.

Verification

  • `cargo build --release` — clean
  • `cargo test --release --lib` — 232/232 pass (including all `pty::tests`)
  • `cargo test --release --test '*'` — 8/8 pass

Suggested follow-on tests (next PR, not in scope here):

  • `tokio` test that spawns `agent-relay-broker pty` in a child, never reads its stdout, feeds 1MB of stdin frames, and asserts the broker stays responsive for ≥60s. Pre-fix wedges within 1-2s on macOS (64KB pipe). Post-fix should run indefinitely.
  • Vitest harness with 9 mock workers emitting 200KB/4s + a deliberately-slow WS subscriber.
  • CI lint: `grep "std::io::stdout" src/` returns 0 under any `tokio::spawn` block.

Related

  • relay#838 — SDK 6.0.15 stdout drain
  • ricky#94 — Node-side loader unblocker (introduced)
  • ricky#98 — Node-side loader unblocker fix (use `data` event, not `pause`)

Two writer tasks in the broker spawn-loop body used
`std::io::stdout().lock().write_all()` from inside `tokio::spawn`:

- `src/pty_worker.rs` (PTY worker → SDK frame stream)
- `src/main.rs::run_headless_worker` (headless worker → SDK frame stream)

Synchronous I/O from an async task blocks the OS worker thread when the
parent reader stalls. On macOS the kernel pipe buffer is 64KB; once it
fills, the `write_all()` syscall blocks, taking the tokio thread with
it. On resumption, the `tokio::select!` wakeup that drives `pty_rx` is
lost — the broker stays parked in `_pthread_cond_wait` and downstream
PTY workers freeze.

Reproduces in proactive-runtime-m1 (9 PTY worker fanout). All workers
froze within 4 seconds of each other across three independent runs;
broker-init's tokio threads were either parked in `write()` (pre-drain
fix) or in `_pthread_cond_wait` (post-drain fix — same blocking-writer
design, now lost-wakeup mode).

Switch both writer tasks to `tokio::io::stdout()` +
`AsyncWriteExt::write_all().await`. Pipe backpressure now surfaces as a
normal `Pending` future; the OS thread never blocks; the parent closing
the pipe becomes a clean shutdown signal (break out of the loop).

Verification
- cargo build --release — clean
- cargo test --release --lib — 232/232 pass
- cargo test --release --test '*' — 8/8 pass

The ricky #98 loader-level unblocker (which monkeypatches
`child_process.spawn` on the Node SDK side) was a backstop; this is the
upstream root-cause fix on the Rust broker side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@khaliqgant khaliqgant requested a review from willwashburn as a code owner May 11, 2026 13:22
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: fc78e5ca-9dd0-415e-9270-a5dcb450b342

📥 Commits

Reviewing files that changed from the base of the PR and between e999199 and 0fe1a55.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml
  • src/main.rs
  • src/pty_worker.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/pty_worker.rs

📝 Walkthrough

Walkthrough

Headless and PTY worker stdout loops now use Tokio async writes and stop on write failures; tracing initialization adds a non-blocking stderr appender and stores its WorkerGuard in a global OnceLock for process lifetime.

Changes

Async stdout writes & tracing guard

Layer / File(s) Summary
Tracing appender & global guard
Cargo.toml, src/main.rs
Add tracing-appender = "0.2", create TRACING_GUARD: OnceLock, and store the non-blocking stderr WorkerGuard when installing the subscriber.
Headless worker async stdout writes
src/main.rs
Headless run_headless_worker uses a persistent tokio::io::stdout() handle with write_all(...).await/flush().await per newline-delimited frame; drops out_tx and awaits writer task completion.
PTY worker async stdout writes
src/pty_worker.rs
PTY writer_task now uses Tokio async writing (write_all(...).await) and breaks on write errors (e.g., parent pipe closed), replacing blocking std::io locking/flush.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested reviewers

  • willwashburn
  • barryollama

Poem

🐰 Soft bytes hop on silent streams,

I nibble logs and stitch the beams.
Tokio sings, no threads confined,
Pipes unwind and traces bind.
Hooray — async carrots for the mind!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: replacing blocking stdout writer tasks with async tokio::io, which is the primary fix addressing the broker freezing issue.
Description check ✅ Passed The description is comprehensive and complete, covering the problem, root cause, reproducer, implementation details, and verification steps. It exceeds the template requirements with thorough technical context.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/broker-async-stdout-writer

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 2 additional findings.

Open in Devin Review

@khaliqgant khaliqgant merged commit 3a967a5 into main May 11, 2026
46 checks passed
@khaliqgant khaliqgant deleted the fix/broker-async-stdout-writer branch May 11, 2026 19:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant