Skip to content

fix(runtime): avoid bounded dispatch channel stalls#5015

Draft
NathanFlurry wants to merge 1 commit into
counter-latency/sqlite-generation-fencefrom
counter-latency/unbounded-dispatch-channels
Draft

fix(runtime): avoid bounded dispatch channel stalls#5015
NathanFlurry wants to merge 1 commit into
counter-latency/sqlite-generation-fencefrom
counter-latency/unbounded-dispatch-channels

Conversation

@NathanFlurry
Copy link
Copy Markdown
Member

Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 11, 2026

Code Review: PR #5015fix(runtime): avoid bounded dispatch channel stalls

Overall assessment: The fix is mechanically correct and well-scoped. Converting bounded channels to unbounded removes the deadlock risk where a slow consumer causes back-pressure that blocks a sender holding a shared resource. One real issue was introduced, and a few areas deserve attention before merge.


Issue 1 — Spurious async fn on forward_tunnel_message (introduced by this PR)

File: engine/packages/pegboard-gateway2/src/shared_state.rs

In the base branch, forward_tunnel_message was async because it called msg_tx.send(...).await. After this PR removes .await (the send is now synchronous), the async fn annotation was left in place but the body no longer contains any await point. The function generates an unnecessary Future wrapper on every call and is misleading to future readers.

// Current (PR) — misleading:
async fn forward_tunnel_message(...) -> Option<...> {
    // ... no .await anywhere in the body
    if let Err(send_err) = msg_tx.send(msg.message_kind) { ... }
}

// Should be:
fn forward_tunnel_message(...) -> Option<...> { ... }

The call sites that still use .await on this function will also need their .await suffixes removed.


Issue 2 — Unbounded channels hide back-pressure; memory growth risk in universaldb transaction drivers

Files: engine/packages/universaldb/src/driver/postgres/transaction.rs, engine/packages/universaldb/src/driver/rocksdb/transaction.rs

The transaction task channels (formerly capped at 100) now accept an unbounded number of commands. Each TransactionCommand variant carries heap-allocated Vec<u8> key/value data plus oneshot::Sender response channels. If the transaction task falls behind (e.g. under DB contention, serializable conflict retries), senders can flood the channel with unbounded pending commands.

The CLAUDE.md design constraint for actor-owned dispatch inboxes explicitly states: "Do not await bounded mpsc::Sender::send. Use try_reserve helpers and return actor.overloaded." While these are DB transaction bridges rather than actor dispatch, the underlying concern is identical. A bounded channel with try_send and an explicit transaction.busy error would give callers actionable feedback and bound memory. Alternatively, if transactions are always short-lived and the caller count is bounded, that assumption should be documented inline.


Issue 3 — Python client tokio::spawn silently swallows send errors

Files: rivetkit-python/client/src/simple/async/handle.rs, rivetkit-python/client/src/simple/sync/handle.rs

The event send now uses .ok() to suppress the SendError, but the error arm constructs a py_runtime_err! that is immediately discarded:

tx.send(event).map_err(|e| {
    py_runtime_err!("Failed to send via inner tx: {}", e)
}).ok();

This dead code in the error arm is confusing. Either log the drop or simplify:

if tx.send(event).is_err() {
    tracing::debug!("event dropped: receiver closed");
}

This also aligns with the CLAUDE.md convention of using tracing for best-effort diagnostics rather than silently dropping.


Issue 4 — _message_rx immediately dropped in test creates a broken channel (pre-existing, not introduced by this PR)

File: engine/packages/guard-core/tests/streaming_response.rs

let (message_tx, _message_rx) = mpsc::unbounded_channel::<String>();

The receiver is bound to _message_rx which is dropped immediately (the _ prefix causes immediate drop). Any message_tx.send(...) will immediately return Err because the receiver is gone. This is pre-existing behavior and works for the current test pattern, but is fragile. Consider naming it _no_receiver or adding a comment explaining the pattern.


Positive observations

  • The fix correctly identifies that bounded_sender.send(...).await in a shared-state dispatch loop is the root cause of the stall. Conversion to unbounded is the right approach for channels where the receiver is a short-lived handler task.
  • Removing .await on UnboundedSender::send is correct — the call is now synchronous.
  • The UDB migration CLI channel (tx_entries/rx_entries) is a good fit for unbounded: the reader uses recv_many with transaction-bounded batches and the writer is a sole consumer in a dedicated tokio::spawn.
  • Tunnel forwarding channels in pegboard-gateway1 and pegboard-gateway2 are reasonable candidates for unbounded because the receiver is a per-request task with a bounded lifetime and the sender no longer holds locks across the send.

Convention check

  • No rivet.gg references introduced.
  • No _ => fall-through enum match arms introduced.
  • No println!/eprintln! introduced in non-test Rust code.
  • No std::sync::Mutex or Mutex<HashMap> introduced.
  • Tracing fields use structured form correctly in existing log calls.
  • No inline imports introduced.

@MasterPtato MasterPtato force-pushed the counter-latency/unbounded-dispatch-channels branch from 9c656c0 to a499917 Compare May 14, 2026 17:26
@MasterPtato MasterPtato force-pushed the counter-latency/sqlite-generation-fence branch from 9de6238 to 71dd250 Compare May 14, 2026 17:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant