[None][fix] replace busy-poll sleep in get_async_noblock with zmq async poller#12189
[None][fix] replace busy-poll sleep in get_async_noblock with zmq async poller#12189edenfunf wants to merge 1 commit intoNVIDIA:mainfrom
Conversation
📝 WalkthroughWalkthroughOptimizes the asynchronous non-blocking receive path in Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment Tip CodeRabbit can generate a title for your PR based on the changes with custom instructions.Set the |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tensorrt_llm/executor/ipc.py`:
- Around line 318-333: The except zmq.Again block that converts socket timeouts
into asyncio timeouts should suppress exception chaining so the original
zmq.Again isn't shown as context; replace both occurrences of raising
asyncio.TimeoutError() in that block with "raise asyncio.TimeoutError() from
None" (the block around the async poller handling using self.socket and the
surrounding except zmq.Again) so tracebacks reflect an intentional timeout
rather than showing the zmq.Again context.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b82dce65-b34a-4a1c-8e06-fffda427512d
📒 Files selected for processing (1)
tensorrt_llm/executor/ipc.py
| except zmq.Again: | ||
| # No message available yet | ||
| if asyncio.get_event_loop().time() >= deadline: | ||
| remaining_ms = int( | ||
| (deadline - asyncio.get_event_loop().time()) * 1000) | ||
| if remaining_ms <= 0: | ||
| raise asyncio.TimeoutError() | ||
| # Use async poller to wait for data without busy-polling on a | ||
| # fixed interval, which avoids latency spikes from sleep(0.01) | ||
| async_poller = zmq.asyncio.Poller() | ||
| async_poller.register(self.socket, zmq.POLLIN) | ||
| try: | ||
| events = await async_poller.poll(timeout=remaining_ms) | ||
| finally: | ||
| async_poller.unregister(self.socket) | ||
| if not events: | ||
| raise asyncio.TimeoutError() |
There was a problem hiding this comment.
Use raise ... from None to suppress exception context in the except block.
Both raise asyncio.TimeoutError() statements (lines 323 and 333) are within the except zmq.Again block. Without explicit exception chaining, Python will attach the zmq.Again exception as context, which can be misleading in tracebacks since the timeout is an intentional conversion, not an error in exception handling.
Proposed fix
remaining_ms = int(
(deadline - asyncio.get_event_loop().time()) * 1000)
if remaining_ms <= 0:
- raise asyncio.TimeoutError()
+ raise asyncio.TimeoutError() from None
# Use async poller to wait for data without busy-polling on a
# fixed interval, which avoids latency spikes from sleep(0.01)
async_poller = zmq.asyncio.Poller()
async_poller.register(self.socket, zmq.POLLIN)
try:
events = await async_poller.poll(timeout=remaining_ms)
finally:
async_poller.unregister(self.socket)
if not events:
- raise asyncio.TimeoutError()
+ raise asyncio.TimeoutError() from None📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except zmq.Again: | |
| # No message available yet | |
| if asyncio.get_event_loop().time() >= deadline: | |
| remaining_ms = int( | |
| (deadline - asyncio.get_event_loop().time()) * 1000) | |
| if remaining_ms <= 0: | |
| raise asyncio.TimeoutError() | |
| # Use async poller to wait for data without busy-polling on a | |
| # fixed interval, which avoids latency spikes from sleep(0.01) | |
| async_poller = zmq.asyncio.Poller() | |
| async_poller.register(self.socket, zmq.POLLIN) | |
| try: | |
| events = await async_poller.poll(timeout=remaining_ms) | |
| finally: | |
| async_poller.unregister(self.socket) | |
| if not events: | |
| raise asyncio.TimeoutError() | |
| except zmq.Again: | |
| # No message available yet | |
| remaining_ms = int( | |
| (deadline - asyncio.get_event_loop().time()) * 1000) | |
| if remaining_ms <= 0: | |
| raise asyncio.TimeoutError() from None | |
| # Use async poller to wait for data without busy-polling on a | |
| # fixed interval, which avoids latency spikes from sleep(0.01) | |
| async_poller = zmq.asyncio.Poller() | |
| async_poller.register(self.socket, zmq.POLLIN) | |
| try: | |
| events = await async_poller.poll(timeout=remaining_ms) | |
| finally: | |
| async_poller.unregister(self.socket) | |
| if not events: | |
| raise asyncio.TimeoutError() from None |
🧰 Tools
🪛 Ruff (0.15.5)
[warning] 323-323: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
[warning] 333-333: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tensorrt_llm/executor/ipc.py` around lines 318 - 333, The except zmq.Again
block that converts socket timeouts into asyncio timeouts should suppress
exception chaining so the original zmq.Again isn't shown as context; replace
both occurrences of raising asyncio.TimeoutError() in that block with "raise
asyncio.TimeoutError() from None" (the block around the async poller handling
using self.socket and the surrounding except zmq.Again) so tracebacks reflect an
intentional timeout rather than showing the zmq.Again context.
…nc poller `get_async_noblock` used `asyncio.sleep(0.01)` as a fallback when `zmq.NOBLOCK` raised `zmq.Again`, introducing fixed 10ms delays on every receive attempt that found no data. This caused `aget_stats` and other callers to accumulate latency spikes proportional to the number of polling iterations required. Replace the fixed-interval sleep with a `zmq.asyncio.Poller` that blocks efficiently until the socket becomes readable or the remaining timeout expires. The poller integrates with asyncio's event loop, so the coroutine yields control without busy-waiting and wakes as soon as data arrives—eliminating the artificial 10ms granularity. The existing `zmq.NOBLOCK` receive path is preserved to avoid cancellation-induced message drops (the original motivation for the non-blocking approach); the poller only governs the wait period between attempts. Fixes NVIDIA#10955 Signed-off-by: Eden <aa9736195201@gmail.com>
4314950 to
f32a7d5
Compare
Background
ZeroMqQueue.get_async_noblockis used byGenerationExecutor.aget_statsand the RPC server/client paths to receive messages with a timeout. When the
socket has no data ready, the method enters a polling loop that catches
zmq.Againand callsasyncio.sleep(0.01)before retrying. This fixed10 ms sleep means every iteration that finds no data incurs at least a 10 ms
delay, causing
aget_statsto show >10 ms random latency spikes even whenstats are available moments later. The regression was first observed between
1.2.0rc4and1.2.0rc5whenzmq.NOBLOCKwas introduced (issue #10955).Summary
asyncio.sleep(0.01)in thezmq.Againhandler with azmq.asyncio.Pollerthat blocks until the socket becomes readable or theremaining timeout elapses.
scheduler and is resumed as soon as data arrives, with no artificial minimum
latency.
zmq.NOBLOCKflag on the actualrecvcalls is preserved. The originalmotivation for using NOBLOCK was to avoid
asyncio.wait_forcancelling ablocking recv (which can drop messages). This fix only replaces the wait
between attempts—the receive itself is still non-blocking—so message-drop
safety is maintained.
Performance Impact
Before: minimum observable latency = N × 10 ms (N = number of empty polls).
After: coroutine wakes within microseconds of data availability; no fixed-sleep
overhead.
Verification
Manually tested with a standalone script that:
TimeoutErroris raised in ≈ the requested timeout duration.(previously would have required rounding up to the next 10 ms boundary).
Existing unit tests in
tests/unittest/executor/test_ipc.py(
test_get_async_noblock_timeout,test_get_async_noblock_success,test_get_async_noblock_router) cover the corrected code path.Fixes #10955
Summary by CodeRabbit