Skip to content

[None][fix] replace busy-poll sleep in get_async_noblock with zmq async poller#12189

Open
edenfunf wants to merge 1 commit intoNVIDIA:mainfrom
edenfunf:fix/ipc-async-noblock-busy-poll
Open

[None][fix] replace busy-poll sleep in get_async_noblock with zmq async poller#12189
edenfunf wants to merge 1 commit intoNVIDIA:mainfrom
edenfunf:fix/ipc-async-noblock-busy-poll

Conversation

@edenfunf
Copy link
Contributor

@edenfunf edenfunf commented Mar 13, 2026

Background

ZeroMqQueue.get_async_noblock is used by GenerationExecutor.aget_stats
and the RPC server/client paths to receive messages with a timeout. When the
socket has no data ready, the method enters a polling loop that catches
zmq.Again and calls asyncio.sleep(0.01) before retrying. This fixed
10 ms sleep means every iteration that finds no data incurs at least a 10 ms
delay, causing aget_stats to show >10 ms random latency spikes even when
stats are available moments later. The regression was first observed between
1.2.0rc4 and 1.2.0rc5 when zmq.NOBLOCK was introduced (issue #10955).

Summary

  • Replace asyncio.sleep(0.01) in the zmq.Again handler with a
    zmq.asyncio.Poller that blocks until the socket becomes readable or the
    remaining timeout elapses.
  • The poller integrates with asyncio's event loop: the coroutine yields to the
    scheduler and is resumed as soon as data arrives, with no artificial minimum
    latency.
  • The zmq.NOBLOCK flag on the actual recv calls is preserved. The original
    motivation for using NOBLOCK was to avoid asyncio.wait_for cancelling a
    blocking recv (which can drop messages). This fix only replaces the wait
    between attempts
    —the receive itself is still non-blocking—so message-drop
    safety is maintained.

Performance Impact

Before: minimum observable latency = N × 10 ms (N = number of empty polls).
After: coroutine wakes within microseconds of data availability; no fixed-sleep
overhead.

Verification

Manually tested with a standalone script that:

  1. Confirms TimeoutError is raised in ≈ the requested timeout duration.
  2. Confirms data sent after a 50 ms delay is received in ≈ 53 ms end-to-end
    (previously would have required rounding up to the next 10 ms boundary).

Existing unit tests in tests/unittest/executor/test_ipc.py
(test_get_async_noblock_timeout, test_get_async_noblock_success,
test_get_async_noblock_router) cover the corrected code path.

Fixes #10955

Summary by CodeRabbit

  • Refactor
    • Enhanced async data reception with improved timeout management and more efficient handling of incoming data.

@edenfunf edenfunf requested a review from a team as a code owner March 13, 2026 06:39
@edenfunf edenfunf requested a review from Superjomn March 13, 2026 06:39
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 13, 2026

📝 Walkthrough

Walkthrough

Optimizes the asynchronous non-blocking receive path in get_async_noblock by replacing fixed sleep intervals with an asynchronous ZeroMQ Poller. Introduces deadline-based timeout handling that raises asyncio.TimeoutError when the time window elapses, while maintaining existing behavior for ROUTER sockets and HMAC validation.

Changes

Cohort / File(s) Summary
Async Receive Optimization
tensorrt_llm/executor/ipc.py
Modified get_async_noblock to use async ZeroMQ Poller with deadline-derived timeout window instead of fixed sleep, improving responsiveness and eliminating 10ms delays from blocking calls.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: replacing busy-poll sleep with zmq async poller in get_async_noblock method.
Description check ✅ Passed The PR description is comprehensive, covering background, summary, performance impact, and verification. It exceeds template requirements by providing detailed context about the issue and solution.
Linked Issues check ✅ Passed The PR directly addresses issue #10955 by replacing the fixed 10ms asyncio.sleep with zmq.asyncio.Poller, eliminating artificial latency spikes while preserving message-drop safety as required.
Out of Scope Changes check ✅ Passed All changes are scoped to fixing the get_async_noblock method in tensorrt_llm/executor/ipc.py, directly addressing the performance regression reported in issue #10955. No unrelated changes detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes with custom instructions.

Set the reviews.auto_title_instructions setting to generate a title for your PR based on the changes in the PR with custom instructions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tensorrt_llm/executor/ipc.py`:
- Around line 318-333: The except zmq.Again block that converts socket timeouts
into asyncio timeouts should suppress exception chaining so the original
zmq.Again isn't shown as context; replace both occurrences of raising
asyncio.TimeoutError() in that block with "raise asyncio.TimeoutError() from
None" (the block around the async poller handling using self.socket and the
surrounding except zmq.Again) so tracebacks reflect an intentional timeout
rather than showing the zmq.Again context.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b82dce65-b34a-4a1c-8e06-fffda427512d

📥 Commits

Reviewing files that changed from the base of the PR and between 60091ff and 4314950.

📒 Files selected for processing (1)
  • tensorrt_llm/executor/ipc.py

Comment on lines 318 to 333
except zmq.Again:
# No message available yet
if asyncio.get_event_loop().time() >= deadline:
remaining_ms = int(
(deadline - asyncio.get_event_loop().time()) * 1000)
if remaining_ms <= 0:
raise asyncio.TimeoutError()
# Use async poller to wait for data without busy-polling on a
# fixed interval, which avoids latency spikes from sleep(0.01)
async_poller = zmq.asyncio.Poller()
async_poller.register(self.socket, zmq.POLLIN)
try:
events = await async_poller.poll(timeout=remaining_ms)
finally:
async_poller.unregister(self.socket)
if not events:
raise asyncio.TimeoutError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use raise ... from None to suppress exception context in the except block.

Both raise asyncio.TimeoutError() statements (lines 323 and 333) are within the except zmq.Again block. Without explicit exception chaining, Python will attach the zmq.Again exception as context, which can be misleading in tracebacks since the timeout is an intentional conversion, not an error in exception handling.

Proposed fix
                 remaining_ms = int(
                     (deadline - asyncio.get_event_loop().time()) * 1000)
                 if remaining_ms <= 0:
-                    raise asyncio.TimeoutError()
+                    raise asyncio.TimeoutError() from None
                 # Use async poller to wait for data without busy-polling on a
                 # fixed interval, which avoids latency spikes from sleep(0.01)
                 async_poller = zmq.asyncio.Poller()
                 async_poller.register(self.socket, zmq.POLLIN)
                 try:
                     events = await async_poller.poll(timeout=remaining_ms)
                 finally:
                     async_poller.unregister(self.socket)
                 if not events:
-                    raise asyncio.TimeoutError()
+                    raise asyncio.TimeoutError() from None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except zmq.Again:
# No message available yet
if asyncio.get_event_loop().time() >= deadline:
remaining_ms = int(
(deadline - asyncio.get_event_loop().time()) * 1000)
if remaining_ms <= 0:
raise asyncio.TimeoutError()
# Use async poller to wait for data without busy-polling on a
# fixed interval, which avoids latency spikes from sleep(0.01)
async_poller = zmq.asyncio.Poller()
async_poller.register(self.socket, zmq.POLLIN)
try:
events = await async_poller.poll(timeout=remaining_ms)
finally:
async_poller.unregister(self.socket)
if not events:
raise asyncio.TimeoutError()
except zmq.Again:
# No message available yet
remaining_ms = int(
(deadline - asyncio.get_event_loop().time()) * 1000)
if remaining_ms <= 0:
raise asyncio.TimeoutError() from None
# Use async poller to wait for data without busy-polling on a
# fixed interval, which avoids latency spikes from sleep(0.01)
async_poller = zmq.asyncio.Poller()
async_poller.register(self.socket, zmq.POLLIN)
try:
events = await async_poller.poll(timeout=remaining_ms)
finally:
async_poller.unregister(self.socket)
if not events:
raise asyncio.TimeoutError() from None
🧰 Tools
🪛 Ruff (0.15.5)

[warning] 323-323: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


[warning] 333-333: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tensorrt_llm/executor/ipc.py` around lines 318 - 333, The except zmq.Again
block that converts socket timeouts into asyncio timeouts should suppress
exception chaining so the original zmq.Again isn't shown as context; replace
both occurrences of raising asyncio.TimeoutError() in that block with "raise
asyncio.TimeoutError() from None" (the block around the async poller handling
using self.socket and the surrounding except zmq.Again) so tracebacks reflect an
intentional timeout rather than showing the zmq.Again context.

…nc poller

`get_async_noblock` used `asyncio.sleep(0.01)` as a fallback when
`zmq.NOBLOCK` raised `zmq.Again`, introducing fixed 10ms delays on
every receive attempt that found no data. This caused `aget_stats` and
other callers to accumulate latency spikes proportional to the number
of polling iterations required.

Replace the fixed-interval sleep with a `zmq.asyncio.Poller` that
blocks efficiently until the socket becomes readable or the remaining
timeout expires. The poller integrates with asyncio's event loop, so
the coroutine yields control without busy-waiting and wakes as soon as
data arrives—eliminating the artificial 10ms granularity.

The existing `zmq.NOBLOCK` receive path is preserved to avoid
cancellation-induced message drops (the original motivation for the
non-blocking approach); the poller only governs the wait period between
attempts.

Fixes NVIDIA#10955

Signed-off-by: Eden <aa9736195201@gmail.com>
@edenfunf edenfunf force-pushed the fix/ipc-async-noblock-busy-poll branch from 4314950 to f32a7d5 Compare March 13, 2026 06:51
@svc-trtllm-gh-bot svc-trtllm-gh-bot added the Community want to contribute PRs initiated from Community label Mar 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Community want to contribute PRs initiated from Community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Performance]: Calling GenerationExecutor.aget_stats can introduce >10ms delays

2 participants