Skip to content

Pinned memory staging WIP#136

Open
noa-neria wants to merge 6 commits intomasterfrom
pinned-memory-staging
Open

Pinned memory staging WIP#136
noa-neria wants to merge 6 commits intomasterfrom
pinned-memory-staging

Conversation

@noa-neria
Copy link
Copy Markdown
Collaborator

@noa-neria noa-neria commented Mar 19, 2026

Direct GPU Streaming via Pinned Staging Buffer

Overview

This branch adds NVIDIA CUDA direct streaming: instead of reading file data into a
CPU buffer and then copying it to the GPU, the C++ worker threads read each block
into a thread-local pinned (page-locked) host staging buffer and DMA it straight
to a pre-allocated PyTorch CUDA tensor via cuMemcpyHtoDAsync. The tensor lives in
device memory for its entire lifetime and is yielded directly to the caller — no
extra host-to-device copy, no intermediate NumPy array.

The yielded GPU tensor has an aligned address in GPU memory similar to the native Pytorch tensor.

This path is NVIDIA CUDA only. AMD ROCm and all other device types fall back to the
CPU path (read to CPU, then copy to device). Remote backends (S3, GCS, Azure) also
always use the CPU path regardless of device.

For distributed streaming the feature is transparent: no changes are required in
the caller (e.g. vLLM). The distributed streamer detects the device and switches
paths internally, and each rank receives GPU tensors directly without any API change.
For TP=1 (non-distributed) usage, integration changes in the caller are required
to pass a CUDA device string and consume the yielded GPU tensors directly.

Direct streaming to GPU also reduces the CPU memory requirements. The minimal additional device memory is two times the size of the largest tensor.

Benchmarks

Cluster: NVIDIA H200, 8 GPUs per node
Storage: Nebius shared filesystem

Qwen1.5-110B-Chat — TP=8 (25.91 GiB per GPU)

Run                  Load Time   Speedup
tp8 (baseline)        80.7s       1.0x
tp8_runai             58.6s       1.4x
tp8_runai_dist        25.5s       3.2x

Qwen1.5-110B-Chat — TP=4 (51.8 GiB per GPU)

Run                  Load Time   Speedup
tp4 (baseline)        73.9s       1.0x
tp4_runai             34.5s       2.1x
tp4_runai_dist        23.6s       3.1x

Qwen3-235B-A22B — TP=8 (54.92 GiB per GPU)

Run                  Load Time   Speedup
tp8 (baseline)       320.8s       1.0x
tp8_runai            129.3s       2.5x
tp8_runai_dist        47.3s       6.8x

TP=1 (non-distributed) streaming

FilesRequestsIteratorWithBuffer accepts a device argument. When the device is an
NVIDIA CUDA device (device.startswith("cuda") and torch.version.hip is None):

  • The staging buffer is allocated as torch.empty(..., device=device) instead of
    np.empty(...). Slices of this buffer become the yielded tensors.
  • Buffer size is capped at 95% of free GPU memory so allocation cannot OOM.
  • runai_request is called with cuda=True, routing C++ worker threads into
    Batch::read_cuda().
  • next_request() calls torch.cuda.synchronize() before reusing the buffer for the
    next batch, ensuring any async operations still holding a reference to the previous
    slice (e.g. dtype casts, user application ops) have completed.

In C++, each worker thread owns a thread_local CudaStagingBuffer: a pinned host
buffer (cuMemAllocHost) and a private CUDA stream (cuStreamCreate).
read_cuda reads the file in fs_block_bytesize chunks, copies each to the device
pointer via cuMemcpyHtoDAsync, and synchronizes the stream after every chunk so the
staging buffer can be immediately reused for the next read.


Distributed streaming

When _use_cuda_direct() is true (NVIDIA CUDA, local paths, no ROCm), the distributed
streamer takes a new code path with no API changes required in the caller:

  • stream_files() passes the real CUDA device string to file_streamer.stream_files()
    instead of "cpu", so tensors land in GPU memory from the start.
  • get_chunks() delegates to the new _get_chunks_cuda() method instead of the
    existing prefill/broadcast loop.

_get_chunks_cuda() broadcasts one tensor at a time across the distribution group:

  1. All ranks iterate over broadcasting ranks in round-robin order, keeping all ranks
    in lockstep.
  2. The broadcasting rank pulls the next tensor from file_streamer.get_chunks()
    (already on the GPU), fills a two-row metadata tensor
    [count | orig_req_idx, orig_chunk_idx, size, 0], and does two dist.broadcast
    calls: first the metadata, then the GPU tensor itself.
  3. Receiving ranks broadcast the metadata tensor, read count and size, call
    torch.cuda.synchronize() to ensure the previous receive buffer is no longer in
    use, then broadcast into a pre-allocated receive_buffer[:chunk_size] view and
    yield it.

The receive buffer is sized to hold one tensor at a time (max_chunk bytes), so GPU
memory usage on receiving ranks is proportional to the largest single tensor rather
than to the full batch buffer used by the CPU path.


Fallback conditions

The CUDA direct path is skipped and the original CPU path is used when:

  • device is None or does not start with "cuda"
  • ROCm/AMD GPU (torch.version.hip is not None)
  • CUDA not available (not torch.cuda.is_available())
  • Any file path is a remote URI (s3://, gs://, az://)

Summary by CodeRabbit

  • New Features

    • GPU-accelerated file streaming with an optional CUDA-direct fast path and per-request GPU destination support
    • Pinned host staging buffers and CUDA-aware buffer alignment for efficient host↔device transfers
    • Python bindings and runtime plumbing to request CUDA-mode transfers from user code
  • Tests

    • Added CUDA mock libs and extensive CUDA-focused unit/integration tests validating end-to-end correctness and alignment
  • Chores

    • Build environment updated to include CUDA headers for compile-time support

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Mar 19, 2026

Greptile Summary

This PR introduces an NVIDIA CUDA direct-streaming path for both single-rank and distributed model loading. Instead of reading file data into a pageable CPU buffer and then calling .to(device), C++ worker threads use a thread-local pinned (cuMemAllocHost) staging buffer and cuMemcpyHtoDAsync to DMA each block straight into a pre-allocated PyTorch CUDA tensor. The distributed path adds a new _get_chunks_cuda() method that broadcasts GPU tensors one-at-a-time in round-robin order using NCCL, avoiding the intermediate CPU prefill + scatter used by the CPU path. AMD ROCm and remote backends always fall back to the CPU path.

Key issues found:

  • P1 — Incorrect GPU destination for multi-file CUDA requests with exactly one tensor per file (cpp/streamer/impl/streamer/streamer.cc): is_per_tensor_cuda is gated on dsts.size() > paths.size(), which is false when every file contributes exactly one tensor. In that case file_cuda_dsts is never populated, and build_tasks falls back to the Assigner's linearly-advanced current_request_destination, which ignores padding between tensors and silently writes each file's tensor to the wrong GPU address.
  • P1 — test_1_success_cuda_direct_verifies_mock will crash on non-GPU machines (dist_test_distributed_streamer.py): torch.empty is patched to redirect CUDA allocations to CPU, but torch.zeros (used for batch_metadata_tensor / received_metadata_tensor inside _get_chunks_cuda) calls the C++ allocator directly and bypasses the patch, raising RuntimeError: No CUDA GPUs are available even when the mock library is loaded.
  • P2 — get_cuda_alignment() missing upper-bound validation (requests_iterator.py): The removed get_dist_buffer_alignment() enforced a 1 MiB ceiling and raised ValueError on out-of-range values. The new helper only enforces a lower bound of 1, allowing arbitrarily large alignment values that could cause significant GPU over-allocation.

Confidence Score: 2/5

  • Not safe to merge as-is — the per-tensor destination bug silently corrupts GPU weights for multi-file CUDA requests with one tensor per file, and the mock-path test is broken on CPU-only CI.
  • The CUDA streaming architecture is well-designed and the previous review comments about null returns and partial writes have been addressed. However, a logic error in the is_per_tensor_cuda condition in streamer.cc causes the wrong GPU destination to be used whenever all files in a request have exactly one tensor (a real-world edge case in split model files). Combined with a broken test that won't catch regressions on CI without a GPU, the PR carries meaningful correctness risk before the corner-case is fixed.
  • cpp/streamer/impl/streamer/streamer.cc (multi-file 1-tensor-per-file destination bug) and py/.../distributed_streamer/tests/dist_test_distributed_streamer.py (torch.zeros not patched in mock test)

Important Files Changed

Filename Overview
cpp/streamer/impl/streamer/streamer.cc Routes CUDA requests with a per-tensor dsts array, but is_per_tensor_cuda = cuda && (dsts.size() > paths.size()) silently misses the case where every file has exactly one tensor, causing file 1-N GPU destinations to be computed from wrong (unpadded) offsets.
cpp/streamer/impl/batch/batch.cc Adds read_cuda() that uses a thread-local pinned staging buffer and cuMemcpyHtoDAsync to stream directly to GPU. CUDA API return values are now checked (addressing previous review comments). The partial-write on stopped case is also properly handled by checking remaining == 0 before calling finished_until.
cpp/streamer/impl/batches/batches.cc Threads CUDA flag and per-tensor GPU pointer list through to Batch. Destination selection logic in build_tasks is correct for the multi-tensor-per-file case but has the off-by-padding bug when _cuda_tensor_dsts is empty (1-tensor-per-file edge case).
cpp/streamer/impl/cuda/cuda_loader.cc Cleanly implements lazy dlopen loading of the CUDA driver API with proper symbol resolution fallbacks, primary-context management, and singleton pattern. Well-guarded against missing symbols and driver unavailability.
py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py Adds _get_chunks_cuda() for tensor-by-tensor broadcast of GPU tensors. The round-robin locking protocol and synchronize-before-reuse pattern are sound. The shape mismatch between broadcasting rank's (1, N) tensor and receiving rank's (N,) slice is flagged in a previous thread. The self.rank vs self.original_group_rank logging inconsistency is harmless since self.rank is actually defined.
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py Adds padded buffer-stride support and CUDA tensor pointer computation. Core layout logic is correct; get_cuda_alignment() is missing the upper-bound guard that the previous get_dist_buffer_alignment() had.
py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py Splits get_chunks into _get_chunks_cuda and _get_chunks_cpu paths. NVIDIA detection, synchronize-before-buffer-reuse, and runai_request wiring are all correct. Clean refactor with no new logic issues introduced.
py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py New CUDA streaming tests are comprehensive. test_1_success_cuda_direct_verifies_mock patches torch.empty to redirect CUDA to CPU, but torch.zeros (used for metadata tensors inside _get_chunks_cuda) is not patched and will throw on any non-GPU machine where the mock is loaded.
cpp/cuda/cuda_mock/libcuda_mock.cc Well-implemented mock: cuMemcpyHtoDAsync performs a plain memcpy to enable data-correctness testing without GPU hardware. All streams/contexts return non-null sentinels. Call counter is atomic and thread-safe.
cpp/streamer/impl/batch/batch_cuda_test.cc Good coverage: random-size multi-tensor and aligned-offset tests. Skips automatically on real GPU machines to avoid mock/driver conflicts. Data-correctness verified byte-by-byte.

Sequence Diagram

sequenceDiagram
    participant PY as Python (SafetensorsStreamer)
    participant FS as FileStreamer
    participant RI as RequestsIterator
    participant CPP as C++ Streamer (runai_request)
    participant BAT as Batch::read_cuda
    participant GPU as GPU Memory

    PY->>RI: FileChunks(chunks, buffer_strides=padded)
    PY->>FS: stream_files(file_chunks, device="cuda:0")
    FS->>RI: with_memory_mode(device="cuda:0")
    RI->>GPU: torch.empty(buffer_size, dtype=uint8, device="cuda:0")
    RI-->>FS: cuda_tensor_ptrs (aligned per-tensor GPU ptrs)

    FS->>CPP: runai_request(paths, offsets, sizes, cuda_tensor_ptrs, cuda=1)
    CPP->>CPP: CudaDriver::get() — init CUDA context on calling thread

    loop Worker threads (per Batch)
        BAT->>BAT: g_cuda_staging.ensure() → cuMemAllocHost (pinned host buf)
        BAT->>BAT: reader->read(block) → staging_buf
        BAT->>GPU: cuMemcpyHtoDAsync(aligned_gpu_ptr, staging_buf, block_size)
        BAT->>BAT: cuStreamSynchronize → reuse staging_buf
    end

    FS->>FS: get_chunks() → _get_chunks_cuda()
    FS-->>PY: yield (file_path, chunk_idx, gpu_tensor.view(1,-1))

    Note over PY,GPU: Distributed path (CUDA direct)
    participant DS as DistributedStreamer
    participant R1 as Rank 0 (broadcaster)
    participant R2 as Rank 1 (receiver)

    DS->>R1: _get_chunks_cuda() — next tensor from file_streamer (already on GPU)
    R1->>R1: dist.broadcast(metadata, src=rank0)
    R2->>R2: dist.broadcast(metadata, src=rank0) — receive count+size
    R2->>R2: torch.cuda.synchronize() — ensure prev receive_buffer free
    R1->>R1: dist.broadcast(gpu_tensor, src=rank0)
    R2->>R2: dist.broadcast(receive_buffer[:chunk_size], src=rank0)
    R1-->>DS: yield orig_req_idx, orig_chunk_idx, gpu_tensor
    R2-->>DS: yield orig_req_idx, orig_chunk_idx, received_view
Loading

Last reviewed commit: "CR fixes"

Comment thread cpp/streamer/impl/batch/batch.cc Outdated
Comment thread cpp/streamer/impl/batch/batch.cc Outdated
Comment thread cpp/streamer/impl/batch/batch.cc Outdated
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 19, 2026

Walkthrough

Adds optional CUDA device-memory streaming: build/tooling to expose CUDA headers and a libcuda mock, a dlopen-based CUDA loader, C++ paths to read into GPU memory with async cuMemcpy, and Python changes to detect/use CUDA destinations with aligned GPU buffers.

Changes

Cohort / File(s) Summary
Infrastructure & Build
\.devcontainer/Dockerfile, cpp/WORKSPACE, cpp/third_party/cuda.BUILD, cpp/streamer/impl/cuda/BUILD, cpp/streamer/impl/batch/BUILD, cpp/streamer/impl/streamer/BUILD
Add multi-stage Docker stage to copy CUDA headers; declare local @cuda repo; add headers-only cuda_headers target; add cuda_loader target and wire cuda_loader into batch/streamer builds.
CUDA Mock
cpp/cuda/cuda_mock/BUILD, cpp/cuda/cuda_mock/libcuda.ldscript, cpp/cuda/cuda_mock/libcuda_mock.cc
New libcuda.so.1 target with linker script exporting selected driver symbols and test helper counters; implementation provides minimal ABI-compatible driver APIs (streams, host alloc/free, async HtoD, contexts) and call-count helpers.
CUDA Loader (C++)
cpp/streamer/impl/cuda/cuda_loader.h, cpp/streamer/impl/cuda/cuda_loader.cc
New dlopen/dlsym-based CudaDriver resolving required symbols (preferring _v2 variants), retains primary context, provides singleton get() and cleans up in destructor.
Batch / Read Path (C++)
cpp/streamer/impl/batch/batch.h, cpp/streamer/impl/batch/batch.cc, cpp/streamer/impl/batch/batch_cuda_test.cc
Batch gains bool cuda flag, thread-local pinned staging buffer, read_cuda() performing sequential reads, cuMemcpyHtoDAsync + stream sync, and tests for sanity and aligned destinations.
Batches Coordinator (C++)
cpp/streamer/impl/batches/batches.h, cpp/streamer/impl/batches/batches.cc
Batches constructor accepts cuda and per-tensor destination pointers; build_tasks selects per-request destination based on CUDA mode and propagates flag to worker batches.
Streamer API & C ABI (C/C++)
cpp/streamer/impl/streamer/streamer.h, cpp/streamer/impl/streamer/streamer.cc, cpp/streamer/streamer.h, cpp/streamer/streamer.cc, cpp/mock/streamer-mock.cc
Extend async_request/runai_request signatures with cuda flag; adjust dsts slicing for per-tensor CUDA pointers; initialize CudaDriver when cuda enabled.
Tests (C++)
cpp/streamer/streamer_s3_test.cc, cpp/streamer/streamer_test.cc
Update test invocations to include trailing cuda=0 for existing CPU paths.
Requests & Buffering (Python)
py/.../file_streamer/requests_iterator.py, py/.../file_streamer/file_streamer.py, py/.../file_streamer/tests/test_requests_iterator.py
Add CUDA-aware buffer allocation, alignment helpers (align_up, get_cuda_alignment), padded buffer_strides bookkeeping, GPU buffer slicing and cuda_tensor_ptrs generation, and alignment tests.
FileStreamer & Safetensors (Python)
py/.../file_streamer/file_streamer.py, py/.../safetensors_streamer/safetensors_streamer.py
FileStreamer detects NVIDIA CUDA and splits CPU/CUDA chunk paths; safetensors computes padded buffer_strides using CUDA alignment when on CUDA.
Distributed Streamer (Python)
py/.../distributed_streamer/distributed_streamer.py, py/.../distributed_streamer/tests/dist_test_distributed_streamer.py
Add CUDA-direct fast path for local files: allocate GPU receive buffers, broadcast chunks on-device, and tests that load mock libcuda and patch torch.cuda to verify cuMemcpyHtoDAsync usage and correctness.
Python ⇄ Native bindings
py/.../libstreamer/__init__.py, py/.../libstreamer/libstreamer.py
ctypes binding for runai_request extended with trailing ctypes.c_int cuda argument; libstreamer.runai_request gains cuda and cuda_tensor_ptrs params and builds destination pointers accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Worker Thread
    participant Batch as Batch
    participant CudaDriver as CudaDriver (dlopen)
    participant Host as Pinned Host Memory
    participant GPU as GPU Device Memory
    participant Stream as CUDA Stream

    Worker->>Batch: execute(config, stopped)
    Batch->>CudaDriver: CudaDriver::get()
    CudaDriver-->>Batch: CudaDriver* (symbols resolved / ctx retained)
    Batch->>CudaDriver: cuStreamCreate()
    CudaDriver-->>Stream: stream handle

    loop per chunk
        Batch->>Host: read file into pinned staging buffer
        Host-->>Batch: chunk bytes
        Batch->>CudaDriver: cuMemcpyHtoDAsync(dstDevice, hostPtr, size, stream)
        CudaDriver->>GPU: async copy enqueued
        Batch->>CudaDriver: cuStreamSynchronize(stream)
        CudaDriver-->>Batch: copy complete
        Batch->>Batch: finished_until(task.end, Success)
    end

    Batch->>CudaDriver: cuStreamDestroy_v2(stream)
    CudaDriver-->>Stream: destroy
    Batch-->>Worker: complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped through headers, dlopen'd a tune,

Mock cuda drums hum beneath the moon.
Pinned buffers cradle every shining byte,
Async hops, then sync—copies take flight.
A rabbit cheers: GPUs now dance at night! 🥕✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.10% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Pinned memory staging WIP' accurately describes the main change: adding CUDA-based pinned memory staging for asynchronous DMA transfers into GPU memory.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pinned-memory-staging
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can use TruffleHog to scan for secrets in your code with verification capabilities.

Add a TruffleHog config file (e.g. trufflehog-config.yml, trufflehog.yml) to your project to customize detectors and scanning behavior. The tool runs only when a config file is present.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (4)
.devcontainer/Dockerfile (1)

5-10: Add a non-root runtime user in the final image.

The Dockerfile has no USER directive, which means the container defaults to running as root. Consider adding a non-root user for improved security posture.

Suggested hardening patch
 FROM ubuntu:20.04
 
 # Copy CUDA headers for compile-time type checking of the driver API (cuda.h etc.).
 # The actual libcuda.so is loaded at runtime via dlopen, so no CUDA installation
 # is required on the build host or the final image.
 COPY --from=cuda-headers /usr/local/cuda/include/ /usr/local/cuda/include/
+
+# Drop root for runtime/dev shell usage
+RUN useradd --create-home --uid 10001 appuser
+USER appuser
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.devcontainer/Dockerfile around lines 5 - 10, Add a non-root runtime user by
creating a dedicated user and group in the Dockerfile (e.g., via
useradd/addgroup), set a home directory, ensure relevant files/directories (like
/usr/local/cuda/include/) are chowned to that user, and add a USER directive
near the end of the Dockerfile to switch from root to the new user; ensure any
build-time COPY steps that require root remain before the USER switch and that
runtime processes run under the new non-root account for improved security.
cpp/streamer/impl/batches/batches.h (1)

29-31: Consider accepting cuda_tensor_dsts by rvalue reference to avoid potential copy overhead.

The std::vector<void*> cuda_tensor_dsts = {} parameter is passed by value. For callers that can move the vector, this is fine, but for large vectors passed from lvalues, an unnecessary copy occurs. Consider using std::vector<void*>&& cuda_tensor_dsts = {} or providing an overload.

♻️ Alternative signature
     Batches(unsigned file_index,
            const std::vector<FileReadTask> & file_read_tasks,
            std::shared_ptr<const Config> config,
            std::shared_ptr<common::Responder> responder,
            const std::string & path,
            const common::s3::S3ClientWrapper::Params & params,
            const std::vector<size_t> & internal_sizes,
            bool cuda = false,
-           std::vector<void*> cuda_tensor_dsts = {});
+           std::vector<void*> cuda_tensor_dsts = {}); // Note: relies on copy elision or explicit std::move at call sites

If copies are a concern, you could use:

           std::vector<void*>&& cuda_tensor_dsts = {});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cpp/streamer/impl/batches/batches.h` around lines 29 - 31, The parameter
cuda_tensor_dsts in the function/constructor declaration that currently reads
"std::vector<void*> cuda_tensor_dsts = {}" should be changed to accept an rvalue
reference to avoid unnecessary copies from lvalues: replace it with
"std::vector<void*>&& cuda_tensor_dsts = {}" (or alternatively add an overload
taking "const std::vector<void*>&" and one taking "std::vector<void*>&&") and
update the corresponding implementation(s) that reference cuda_tensor_dsts
(e.g., the function/constructor in batches.h and its definition) to std::move
from the rvalue reference where appropriate.
py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py (1)

584-590: Consider whether torch.cuda.synchronize() is necessary before receive.

The torch.cuda.synchronize() call before dist.broadcast() on receiving ranks adds latency. The NCCL broadcast operation is already stream-synchronized. Unless there's a specific race condition being protected against (e.g., prior async ops on receive_buffer), this synchronization may be unnecessary overhead.

If intentional, a brief comment explaining why would help future maintainers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`
around lines 584 - 590, The explicit torch.cuda.synchronize() call before
dist.broadcast on the receiving path (around receive_buffer handling and the
broadcast call in distributed_streamer.py) introduces extra latency; either
remove the synchronize() to rely on NCCL/stream synchronization for
dist.broadcast or, if there is a specific async-write/read race you are
protecting, keep it but add a concise comment above the synchronize() explaining
the exact race and why a full device sync is required; update bookkeeping
(total_sync_time) accordingly if you remove the sync.
cpp/streamer/impl/streamer/streamer.cc (1)

150-165: Consider adding bounds validation for per-tensor CUDA destinations.

The slicing logic assumes dsts contains exactly sum(internal_sizes[i].size()) entries. If there's a mismatch (e.g., Python bindings pass wrong number of pointers), iterators could exceed dsts.end(), causing undefined behavior.

Since verify_requests already validates inputs, you could extend it to check:

if (cuda && dsts.size() > paths.size()) {
    size_t expected = std::accumulate(num_sizes.begin(), num_sizes.end(), 0u);
    ASSERT(dsts.size() == expected) << "CUDA mode: expected " << expected << " dsts, got " << dsts.size();
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cpp/streamer/impl/streamer/streamer.cc` around lines 150 - 165, The slicing
of per-tensor CUDA destinations can read past dsts and cause UB; add bounds
validation before or inside the loop (or better yet extend verify_requests) to
ensure when is_per_tensor_cuda (or cuda) is true that dsts.size() equals the sum
of internal_sizes[i].size() for all files and that tensor_offset + num_tensors
<= dsts.size() before calling file_cuda_dsts.assign; on mismatch ASSERT or
return an error (use the same ASSERT/log pattern as elsewhere) referencing dsts,
internal_sizes, tensor_offset, and Batches so the slicing cannot exceed
dsts.end().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cpp/streamer/impl/batch/batch.cc`:
- Around line 324-346: The ensure function in batch.cc does not check CUresult
from cuStreamCreate and cuMemAllocHost, which can leave ptr null while capacity
is set and cause undefined behavior; modify batch::ensure to capture and check
the CUresult of drv.cuStreamCreate(...) and drv.cuMemAllocHost(...), only set
capacity = needed after cuMemAllocHost succeeds, and return nullptr on any
failure (do not leave ptr/capacity inconsistent); then update the caller
read_cuda() to check the returned pointer from g_cuda_staging.ensure(...) and
throw or handle an error (e.g., throw
common::Exception(common::ResponseCode::UnknownError)) if ensure returns
nullptr.

In `@cpp/streamer/impl/cuda/cuda_loader.cc`:
- Around line 95-119: The loader currently retains a single primary context in
load() via cuDevicePrimaryCtxRetain and returns a process-wide singleton from
CudaDriver::get(), which prevents per-request device switching and causes
cuMemcpyHtoDAsync to operate against the wrong device; update the native ABI to
accept a device ordinal from runai_request and implement per-device context
handling by either (a) caching a map of device -> CUcontext and ensuring
cuCtxSetCurrent(ctx) is called before any stream operations (e.g., before
cuMemcpyHtoDAsync) or (b) calling cuDevicePrimaryCtxRetain/cuCtxSetCurrent
on-demand for the requested device, and ensure CudaDriver::get()/load() no
longer exposes a single fixed device context so each stream operation sets the
correct current context.

In `@cpp/WORKSPACE`:
- Around line 7-11: Replace the hard-coded new_local_repository("cuda",
path="/usr/local/cuda", build_file="//third_party:cuda.BUILD") with a
configurable approach: implement a small repository rule (or wrapper in
WORKSPACE) that looks for an environment variable (e.g., CUDA_HOME) and uses
that value as the path, falling back to "/usr/local/cuda" if unset;
alternatively document and demonstrate using Bazel's override mechanism (e.g.,
--override_repository=cuda=/your/cuda/path) and add a .bazelrc example so
callers can override the "cuda" local repository without editing WORKSPACE;
update the repository declaration referenced by name="cuda" and
build_file="//third_party:cuda.BUILD" and add a note in README or comments
explaining the CUDA_HOME and --override_repository options.

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py`:
- Around line 391-436: The test incorrectly asserts that the local
cuda_mock.get_cuMemcpyHtoDAsync_call_count() (call_count) is > 0 on every rank;
instead compute total_chunks = sum(len(req.chunks) for req in requests) and if
dist.get_world_size() > total_chunks skip or avoid the per-rank assertion,
otherwise perform an all-reduce of the local call_count (use dist.all_reduce on
a tensor/int to produce global_call_count) and assert global_call_count > 0;
update the assertion logic that references call_count and
cuda_mock.get_cuMemcpyHtoDAsync_call_count() accordingly so pure-receiver ranks
don’t cause false failures.

In `@py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py`:
- Around line 97-112: The code currently passes the original device (e.g.,
"cuda:0") into FilesRequestsIteratorWithBuffer.with_memory_mode unconditionally,
causing CUDA buffers to be allocated even when torch.cuda.is_available() is
False or when any request path is remote; fix it by computing a use_cuda_direct
boolean that requires _is_nvidia_cuda (or device startswith "cuda" and
torch.version.hip is None), torch.cuda.is_available(), and that all
file_stream_requests paths are local (no s3://, gs://, az://), then pass device
if use_cuda_direct is True otherwise pass "cpu" into
FilesRequestsIteratorWithBuffer.with_memory_mode; update references in this
block (device_str, _is_nvidia_cuda, file_stream_requests,
FilesRequestsIteratorWithBuffer.with_memory_mode) and ensure handle_object_store
is still called before the local-path check so paths reflect any translation.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 126-135: next_request() currently calls torch.cuda.synchronize()
which synchronizes the current CUDA device, but the buffer device is set in
__init__ (see self.buffer and self._is_nvidia_cuda); change the synchronization
to target the buffer's device to avoid races by calling
torch.cuda.synchronize(device=self.buffer.device) (or the equivalent device
spec) when self._is_nvidia_cuda is true and keep the logging of the elapsed
time; ensure you reference self.buffer.device (or the same attribute used to
store the explicit device in __init__) rather than relying on the current CUDA
device.

In `@py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py`:
- Around line 44-53: The CUDA fallback calls dst.data_ptr() on elements of dsts
but dsts is typed as List[memoryview], causing a latent AttributeError; update
the logic in runai_request so that when cuda is True you require
cuda_tensor_ptrs (raise a clear ValueError if cuda and cuda_tensor_ptrs is None)
and remove or disable the unreachable memoryview-based fallback that uses
dst.data_ptr(); also update the type hints to reflect that dsts (or the new
cuda_tensor_ptrs param) must be List[torch.Tensor] when cuda=True and adjust any
docstring/comments around internal_sizes and dsts accordingly.

In
`@py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py`:
- Around line 229-230: Update the inline comment above the alignment assignment
to reflect the correct default CUDA alignment: replace "default 256 bytes" with
"default 512 bytes" so it matches get_cuda_alignment() and
DEFAULT_CUDA_ALIGNMENT; this affects the comment near the assignment to
alignment which uses get_cuda_alignment() when device and
device.startswith("cuda").

---

Nitpick comments:
In @.devcontainer/Dockerfile:
- Around line 5-10: Add a non-root runtime user by creating a dedicated user and
group in the Dockerfile (e.g., via useradd/addgroup), set a home directory,
ensure relevant files/directories (like /usr/local/cuda/include/) are chowned to
that user, and add a USER directive near the end of the Dockerfile to switch
from root to the new user; ensure any build-time COPY steps that require root
remain before the USER switch and that runtime processes run under the new
non-root account for improved security.

In `@cpp/streamer/impl/batches/batches.h`:
- Around line 29-31: The parameter cuda_tensor_dsts in the function/constructor
declaration that currently reads "std::vector<void*> cuda_tensor_dsts = {}"
should be changed to accept an rvalue reference to avoid unnecessary copies from
lvalues: replace it with "std::vector<void*>&& cuda_tensor_dsts = {}" (or
alternatively add an overload taking "const std::vector<void*>&" and one taking
"std::vector<void*>&&") and update the corresponding implementation(s) that
reference cuda_tensor_dsts (e.g., the function/constructor in batches.h and its
definition) to std::move from the rvalue reference where appropriate.

In `@cpp/streamer/impl/streamer/streamer.cc`:
- Around line 150-165: The slicing of per-tensor CUDA destinations can read past
dsts and cause UB; add bounds validation before or inside the loop (or better
yet extend verify_requests) to ensure when is_per_tensor_cuda (or cuda) is true
that dsts.size() equals the sum of internal_sizes[i].size() for all files and
that tensor_offset + num_tensors <= dsts.size() before calling
file_cuda_dsts.assign; on mismatch ASSERT or return an error (use the same
ASSERT/log pattern as elsewhere) referencing dsts, internal_sizes,
tensor_offset, and Batches so the slicing cannot exceed dsts.end().

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`:
- Around line 584-590: The explicit torch.cuda.synchronize() call before
dist.broadcast on the receiving path (around receive_buffer handling and the
broadcast call in distributed_streamer.py) introduces extra latency; either
remove the synchronize() to rely on NCCL/stream synchronization for
dist.broadcast or, if there is a specific async-write/read race you are
protecting, keep it but add a concise comment above the synchronize() explaining
the exact race and why a full device sync is required; update bookkeeping
(total_sync_time) accordingly if you remove the sync.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 59617c99-6b02-4c51-b142-af834abb38dc

📥 Commits

Reviewing files that changed from the base of the PR and between f4ba176 and 53c7a21.

📒 Files selected for processing (31)
  • .devcontainer/Dockerfile
  • cpp/WORKSPACE
  • cpp/cuda/cuda_mock/BUILD
  • cpp/cuda/cuda_mock/libcuda.ldscript
  • cpp/cuda/cuda_mock/libcuda_mock.cc
  • cpp/mock/streamer-mock.cc
  • cpp/streamer/impl/batch/BUILD
  • cpp/streamer/impl/batch/batch.cc
  • cpp/streamer/impl/batch/batch.h
  • cpp/streamer/impl/batch/batch_cuda_test.cc
  • cpp/streamer/impl/batches/batches.cc
  • cpp/streamer/impl/batches/batches.h
  • cpp/streamer/impl/cuda/BUILD
  • cpp/streamer/impl/cuda/cuda_loader.cc
  • cpp/streamer/impl/cuda/cuda_loader.h
  • cpp/streamer/impl/streamer/BUILD
  • cpp/streamer/impl/streamer/streamer.cc
  • cpp/streamer/impl/streamer/streamer.h
  • cpp/streamer/streamer.cc
  • cpp/streamer/streamer.h
  • cpp/streamer/streamer_s3_test.cc
  • cpp/streamer/streamer_test.cc
  • cpp/third_party/cuda.BUILD
  • py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py
  • py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py
  • py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py
  • py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py
  • py/runai_model_streamer/runai_model_streamer/file_streamer/tests/test_requests_iterator.py
  • py/runai_model_streamer/runai_model_streamer/libstreamer/__init__.py
  • py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py
  • py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py

Comment thread cpp/streamer/impl/batch/batch.cc
Comment on lines +95 to +119
// Retain the primary context for the detected device once per process.
// Worker threads make it current via cuCtxSetCurrent(ctx) on first use.
// The matching cuDevicePrimaryCtxRelease(device) is called in ~CudaDriver().
auto cuDevicePrimaryCtxRetain =
load_sym<CUresult(*)(CUcontext *, CUdevice)>(handle, "cuDevicePrimaryCtxRetain");
if (!cuDevicePrimaryCtxRetain || cuDevicePrimaryCtxRetain(&d.ctx, device) != CUDA_SUCCESS)
{
LOG(WARNING) << "[RunAI Streamer] Could not retain CUDA primary context for device "
<< device << "; CUDA streaming disabled";
return {};
}
d.device = device;

LOG(INFO) << "[RunAI Streamer] CUDA driver loaded successfully";
return d;
}

} // anonymous namespace

const CudaDriver * CudaDriver::get()
{
static const CudaDriver driver = load();
static const bool available = driver.cuStreamCreate != nullptr;
return available ? &driver : nullptr;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, locate the CUDA loader file and understand the full context
find . -type f -name "cuda_loader.cc" | head -5

Repository: run-ai/runai-model-streamer

Length of output: 110


🏁 Script executed:

# Find the Python files mentioned in the review
find . -type f -name "file_streamer.py" -o -name "libstreamer.py" | head -10

Repository: run-ai/runai-model-streamer

Length of output: 222


🏁 Script executed:

# Get the full context of the cuda_loader.cc file to see load() function
if [ -f "cpp/streamer/impl/cuda/cuda_loader.cc" ]; then
  wc -l cpp/streamer/impl/cuda/cuda_loader.cc
fi

Repository: run-ai/runai-model-streamer

Length of output: 112


🏁 Script executed:

# Read the full cuda_loader.cc file
cat -n cpp/streamer/impl/cuda/cuda_loader.cc

Repository: run-ai/runai-model-streamer

Length of output: 5444


🏁 Script executed:

# Read file_streamer.py
cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py

Repository: run-ai/runai-model-streamer

Length of output: 9328


🏁 Script executed:

# Read libstreamer.py
cat -n py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py

Repository: run-ai/runai-model-streamer

Length of output: 5783


🏁 Script executed:

# Verify the native function signature by checking header file
find . -type f -name "cuda_loader.h" | head -1 | xargs cat -n 2>/dev/null || echo "Header not found, checking for streamer interface"

Repository: run-ai/runai-model-streamer

Length of output: 2086


🏁 Script executed:

# Check for any additional CUDA device handling or context management elsewhere
rg -i "cudevice|cucontext" --type cpp -A 2 -B 2 | head -100

Repository: run-ai/runai-model-streamer

Length of output: 7546


🏁 Script executed:

# Verify that no device ordinal is passed through the entire stack by checking 
# the C++ streamer implementation
find . -type f -name "*.cc" -path "*/streamer/impl/*" | grep -v test | head -10

Repository: run-ai/runai-model-streamer

Length of output: 466


🏁 Script executed:

# Look for where cuMemcpyHtoDAsync is actually called to understand the context issue
rg "cuMemcpyHtoDAsync" --type cpp -B 5 -A 5

Repository: run-ai/runai-model-streamer

Length of output: 6424


🏁 Script executed:

# Look at the batch.cc file more closely to understand the cuMemcpyHtoDAsync usage
cat -n cpp/streamer/impl/batch/batch.cc | head -150

Repository: run-ai/runai-model-streamer

Length of output: 5586


🏁 Script executed:

# Search for g_cuda_staging definition and context setup
rg "g_cuda_staging" --type cpp -B 3 -A 3

Repository: run-ai/runai-model-streamer

Length of output: 1568


🏁 Script executed:

# Check if there are any device-specific CUDA operations or per-device context management
rg "cuCtxSetCurrent|cuda::|device.*cuda|CUdevice" --type cpp | head -30

Repository: run-ai/runai-model-streamer

Length of output: 3110


🏁 Script executed:

# Check the full read_cuda implementation to see context switching
rg "read_cuda" --type cpp -A 30 cpp/streamer/impl/batch/batch.cc

Repository: run-ai/runai-model-streamer

Length of output: 2176


🏁 Script executed:

# Look for any per-device caching or alternative device context paths
rg "cuda::" --type cpp -B 2 -A 2 | head -50

Repository: run-ai/runai-model-streamer

Length of output: 2881


Singleton CUDA context doesn't support multi-device scheduling within a process.

load() retains one primary context for the device current at process startup, and get() caches it process-wide as a static singleton. The Python layer accepts per-request device ordinals (e.g., device="cuda:1"), but converts them to a boolean and passes only cuda=True/False to native code. The C++ batch reader gets the singleton driver and calls cuMemcpyHtoDAsync() on every stream, making it impossible to switch contexts when a later request targets a different GPU. This can cause GPU pointers allocated on device 1 to be written via the retained context for device 0, resulting in data corruption or crashes.

Pass the device ordinal through the native ABI (runai_request) and either maintain a per-device context cache in C++ or explicitly call cuCtxSetCurrent() for each stream's target device.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cpp/streamer/impl/cuda/cuda_loader.cc` around lines 95 - 119, The loader
currently retains a single primary context in load() via
cuDevicePrimaryCtxRetain and returns a process-wide singleton from
CudaDriver::get(), which prevents per-request device switching and causes
cuMemcpyHtoDAsync to operate against the wrong device; update the native ABI to
accept a device ordinal from runai_request and implement per-device context
handling by either (a) caching a map of device -> CUcontext and ensuring
cuCtxSetCurrent(ctx) is called before any stream operations (e.g., before
cuMemcpyHtoDAsync) or (b) calling cuDevicePrimaryCtxRetain/cuCtxSetCurrent
on-demand for the requested device, and ensure CudaDriver::get()/load() no
longer exposes a single fixed device context so each stream operation sets the
correct current context.

Comment thread cpp/WORKSPACE
Comment on lines +7 to +11
new_local_repository(
name = "cuda",
path = "/usr/local/cuda",
build_file = "//third_party:cuda.BUILD",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Checking hard-coded CUDA repository path in WORKSPACE:"
rg -n 'new_local_repository|path\s*=\s*"/usr/local/cuda"' cpp/WORKSPACE -n -C2

echo
echo "Checking whether current environment has /usr/local/cuda:"
if [ -d /usr/local/cuda ]; then
  echo "FOUND: /usr/local/cuda"
else
  echo "MISSING: /usr/local/cuda"
fi

echo
echo "Checking for alternate CUDA path configuration hooks in repo:"
rg -n 'CUDA_HOME|CUDA_PATH|cuda.*path|new_local_repository\(.*cuda' cpp -S

Repository: run-ai/runai-model-streamer

Length of output: 856


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Checking for .bazelrc or Bazel configuration files ==="
find cpp -name ".bazelrc*" -o -name "*.bazelrc" 2>/dev/null | head -20

echo
echo "=== Checking WORKSPACE file for environment variable usage ==="
head -20 cpp/WORKSPACE

echo
echo "=== Searching for documentation mentioning CUDA or build setup ==="
find . -type f \( -name "README*" -o -name "CONTRIBUTING*" -o -name "BUILD*" -o -name "*.md" \) -not -path "./.git/*" | xargs grep -l -i "cuda\|build.*setup\|installation" 2>/dev/null | head -10

echo
echo "=== Checking third_party/cuda.BUILD to understand dependency structure ==="
if [ -f cpp/third_party/cuda.BUILD ]; then
  cat cpp/third_party/cuda.BUILD
fi

echo
echo "=== Checking if there are any repository override patterns ==="
rg -n "repository_override|local_repository|override_repository|repository_cache" cpp -A2

Repository: run-ai/runai-model-streamer

Length of output: 2169


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Reading cpp/.bazelrc ==="
cat cpp/.bazelrc

echo
echo "=== Reading installation.md for CUDA setup instructions ==="
head -100 docs/src/installation.md

echo
echo "=== Reading README.md for build/CUDA references ==="
grep -i -C5 "cuda\|build\|install" README.md | head -50

echo
echo "=== Checking if toolchain/configure.bzl handles CUDA path ==="
if [ -f cpp/toolchain/configure.bzl ]; then
  cat cpp/toolchain/configure.bzl
fi

Repository: run-ai/runai-model-streamer

Length of output: 6076


Hard-coded CUDA path lacks override mechanism for different environments.

Line 9 hard-codes /usr/local/cuda with no programmatic way to customize it. While the comment acknowledges this ("Adjust the path if your CUDA installation is in a different location"), there's no environment variable, .bazelrc option, or documented setup procedure to support non-standard CUDA installations. Users must manually edit WORKSPACE, which breaks reproducibility and complicates CI/CD pipelines. Even with runtime dlopen for libcuda.so, the headers-only dependency still requires the path to exist at build time.

Provide a mechanism for CUDA path customization (e.g., via environment variable CUDA_HOME, .bazelrc override, or startup script).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cpp/WORKSPACE` around lines 7 - 11, Replace the hard-coded
new_local_repository("cuda", path="/usr/local/cuda",
build_file="//third_party:cuda.BUILD") with a configurable approach: implement a
small repository rule (or wrapper in WORKSPACE) that looks for an environment
variable (e.g., CUDA_HOME) and uses that value as the path, falling back to
"/usr/local/cuda" if unset; alternatively document and demonstrate using Bazel's
override mechanism (e.g., --override_repository=cuda=/your/cuda/path) and add a
.bazelrc example so callers can override the "cuda" local repository without
editing WORKSPACE; update the repository declaration referenced by name="cuda"
and build_file="//third_party:cuda.BUILD" and add a note in README or comments
explaining the CUDA_HOME and --override_repository options.

Comment on lines +391 to +436
cuda_mock = self._load_cuda_mock()
if cuda_mock is None:
self.skipTest("Mock libcuda.so.1 not available — set LD_LIBRARY_PATH to the mock build directory.")

file_specs = [{"size": 2580, "chunks": [500, 260, 260, 260, 260, 260, 260, 260, 260]}]
requests = self._prepare_file_requests(file_specs)
original_data_map = {}
for req in requests:
with open(req.path, "rb") as f:
original_data_map[req.id] = f.read()

cuda_mock.reset_cuMemcpyHtoDAsync_call_count()

# Redirect torch.empty(device="cuda:X") to CPU so tests run without a GPU.
_orig_empty = torch.empty
def _cpu_empty(*args, **kwargs):
if str(kwargs.get("device", "")).startswith("cuda"):
kwargs = {**kwargs, "device": "cpu"}
return _orig_empty(*args, **kwargs)

reconstructed_data_map = {req.id: [None] * len(req.chunks) for req in requests}
env_vars = {"RUNAI_STREAMER_DIST": "1", "RUNAI_STREAMER_DIST_BUFFER_MIN_BYTESIZE": "0"}
with patch.dict(os.environ, env_vars), \
patch("torch.cuda.is_available", return_value=True), \
patch("torch.empty", side_effect=_cpu_empty), \
patch("torch.cuda.mem_get_info", return_value=(2**33, 2**33)), \
patch("torch.cuda.synchronize"):
with DistributedStreamer() as streamer:
# Bypass the gloo+non-cpu backend check in set_is_distributed so
# distributed streaming is active even with device="cuda:0".
def _force_distributed(is_distributed, device):
streamer.is_distributed = (
is_distributed
and dist.is_initialized()
and dist.get_world_size() > 1
)
with patch.object(streamer, "set_is_distributed", side_effect=_force_distributed):
streamer.stream_files(requests, None, "cuda:0", True)
for req_id, chunk_idx, data_tensor in streamer.get_chunks():
reconstructed_data_map[req_id][chunk_idx] = data_tensor.cpu().numpy().tobytes()

call_count = cuda_mock.get_cuMemcpyHtoDAsync_call_count()
self.assertGreater(
call_count, 0,
f"Rank {self.rank}: expected cuMemcpyHtoDAsync to be called but count was {call_count}"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don't require every rank to issue a CUDA copy.

This assertion only holds when each rank owns at least one source chunk. With the fixed 9-chunk fixture, larger test runs can legitimately leave some ranks as pure receivers, so call_count stays 0 even though the CUDA-direct path is working. Consider asserting on an all-reduced sum, or skip when world_size exceeds the total chunk count.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py`
around lines 391 - 436, The test incorrectly asserts that the local
cuda_mock.get_cuMemcpyHtoDAsync_call_count() (call_count) is > 0 on every rank;
instead compute total_chunks = sum(len(req.chunks) for req in requests) and if
dist.get_world_size() > total_chunks skip or avoid the per-rank assertion,
otherwise perform an all-reduce of the local call_count (use dist.all_reduce on
a tensor/int to produce global_call_count) and assert global_call_count > 0;
update the assertion logic that references call_count and
cuda_mock.get_cuMemcpyHtoDAsync_call_count() accordingly so pure-receiver ranks
don’t cause false failures.

Comment on lines 97 to +112
self.device_str = device
# AMD ROCm also reports "cuda" devices but has no libcuda.so — only enable the
# direct-to-GPU C++ path for NVIDIA (torch.version.hip is set on ROCm builds).
self._is_nvidia_cuda = (
device is not None
and device.startswith("cuda")
and torch.version.hip is None
)

for file_stream_request in file_stream_requests:
self.total_size += sum(file_stream_request.chunks)
file_stream_request.path = self.handle_object_store(file_stream_request.path, credentials)

self.requests_iterator: FilesRequestsIteratorWithBuffer = FilesRequestsIteratorWithBuffer.with_memory_mode(file_stream_requests)

self.requests_iterator: FilesRequestsIteratorWithBuffer = FilesRequestsIteratorWithBuffer.with_memory_mode(
file_stream_requests, device=device
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, locate the file and read the context around lines 97-112
fd "file_streamer.py" | head -5

Repository: run-ai/runai-model-streamer

Length of output: 269


🏁 Script executed:

# Find and read the relevant section
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" | sed -n '90,120p'

Repository: run-ai/runai-model-streamer

Length of output: 1698


🏁 Script executed:

# Get the full file to understand the broader context
wc -l "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py"

Repository: run-ai/runai-model-streamer

Length of output: 150


🏁 Script executed:

# Search for with_memory_mode implementation
rg "with_memory_mode" --type py -A 5

Repository: run-ai/runai-model-streamer

Length of output: 1537


🏁 Script executed:

# Search for torch.cuda.is_available usage
rg "torch\.cuda\.is_available" --type py

Repository: run-ai/runai-model-streamer

Length of output: 907


🏁 Script executed:

# Look for remote path checks (s3, gs, azure)
rg "is_s3_path|is_gs_path|is_azure_path" --type py -B 2 -A 2

Repository: run-ai/runai-model-streamer

Length of output: 9045


🏁 Script executed:

# Read the with_memory_mode implementation in full
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py" | sed -n '1,150p'

Repository: run-ai/runai-model-streamer

Length of output: 7911


🏁 Script executed:

# Search for with_memory_mode method signature and implementation
ast-grep --pattern $'def with_memory_mode($$$) {
  $$$
}'

Repository: run-ai/runai-model-streamer

Length of output: 53


🏁 Script executed:

# Get more context around torch.cuda.is_available in requests_iterator
rg "with_memory_mode" --type py -B 2 -A 20

Repository: run-ai/runai-model-streamer

Length of output: 5615


🏁 Script executed:

# Check homogeneous_paths implementation
rg "def homogeneous_paths" --type py -A 10

Repository: run-ai/runai-model-streamer

Length of output: 1254


🏁 Script executed:

# Check what happens in runai_request with cuda parameter
rg "def runai_request" --type py -A 10

Repository: run-ai/runai-model-streamer

Length of output: 1198


🏁 Script executed:

# Check if handle_object_store modifies device or returns info about remote paths
rg "def handle_object_store" --type py -A 10

Repository: run-ai/runai-model-streamer

Length of output: 1395


🏁 Script executed:

# Check if with_memory_cap is called and what it does
rg "with_memory_cap" --type py -B 2 -A 15

Repository: run-ai/runai-model-streamer

Length of output: 29960


🏁 Script executed:

# Check the full file_streamer.py around lines 88-130 for full context
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" | sed -n '85,130p'

Repository: run-ai/runai-model-streamer

Length of output: 2387


🏁 Script executed:

# Check if there are any tests that validate the CUDA availability check
rg "torch.cuda.is_available" --type py -B 5 -A 5

Repository: run-ai/runai-model-streamer

Length of output: 8159


🏁 Script executed:

# Verify if device parameter is ever validated or conditioned before passing to iterator
rg "device.*=" "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" -B 2 -A 2

Repository: run-ai/runai-model-streamer

Length of output: 1319


🏁 Script executed:

# Check if there's any validation of torch.cuda.is_available before the with_memory_mode call
rg "stream_files" --type py -A 30 | grep -A 30 "def stream_files"

Repository: run-ai/runai-model-streamer

Length of output: 19773


🏁 Script executed:

# Verify what happens if torch.cuda.is_available() returns False with cuda device
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py" | sed -n '74,110p'

Repository: run-ai/runai-model-streamer

Length of output: 2526


Gate CUDA-direct mode before creating the request buffer.

The iterator receives the CUDA device unconditionally whenever the caller passes cuda:*, even for remote paths (s3://, gs://, az://) or when torch.cuda.is_available() returns False. This allocates a CUDA buffer and later drives runai_request(..., cuda=True) instead of falling back to CPU as documented. Compute a use_cuda_direct flag that also checks torch.cuda.is_available() and validates all input paths are local, then pass "cpu" to the iterator when this flag is False.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py`
around lines 97 - 112, The code currently passes the original device (e.g.,
"cuda:0") into FilesRequestsIteratorWithBuffer.with_memory_mode unconditionally,
causing CUDA buffers to be allocated even when torch.cuda.is_available() is
False or when any request path is remote; fix it by computing a use_cuda_direct
boolean that requires _is_nvidia_cuda (or device startswith "cuda" and
torch.version.hip is None), torch.cuda.is_available(), and that all
file_stream_requests paths are local (no s3://, gs://, az://), then pass device
if use_cuda_direct is True otherwise pass "cpu" into
FilesRequestsIteratorWithBuffer.with_memory_mode; update references in this
block (device_str, _is_nvidia_cuda, file_stream_requests,
FilesRequestsIteratorWithBuffer.with_memory_mode) and ensure handle_object_store
is still called before the local-path check so paths reflect any translation.

Comment on lines +126 to +135
if self._is_nvidia_cuda:
# Buffer is on GPU: drain all CUDA streams before reusing it for new
# writes. Guards against two classes of async operations still in flight:
# 1. NCCL broadcasts reading from the yielded tensor slices.
# 2. Async user-application ops (e.g. sharding) on those slices.
import time
_t0 = time.perf_counter()
torch.cuda.synchronize()
_sync_ms = (time.perf_counter() - _t0) * 1000
logger.info(f"[RunAI Streamer] cuda synchronize in next_request took {_sync_ms:.1f} ms")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -type f -name "requests_iterator.py" 2>/dev/null | head -5

Repository: run-ai/runai-model-streamer

Length of output: 53


🏁 Script executed:

cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -150

Repository: run-ai/runai-model-streamer

Length of output: 7911


🏁 Script executed:

cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | sed -n '100,200p'

Repository: run-ai/runai-model-streamer

Length of output: 6241


🏁 Script executed:

# Search for class definition and device-related code
rg -n "class FilesRequestsIteratorWithBuffer|self\._device|cuda:.*device|torch\.cuda\.synchronize" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py

Repository: run-ai/runai-model-streamer

Length of output: 153


🏁 Script executed:

# Check __init__ method to see how device is initialized
rg -A 30 "def __init__" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -60

Repository: run-ai/runai-model-streamer

Length of output: 3059


🏁 Script executed:

# Look for buffer allocation and device context
rg -B 5 -A 5 "torch.cuda.synchronize|with torch.cuda.device|current_device|cuda.current_device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py

Repository: run-ai/runai-model-streamer

Length of output: 704


🏁 Script executed:

# Check if device is stored anywhere in the class
rg -n "self\.device|self\._device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py

Repository: run-ai/runai-model-streamer

Length of output: 53


🏁 Script executed:

# Verify buffer.device access pattern in the code
rg -n "\.device|device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -20

Repository: run-ai/runai-model-streamer

Length of output: 1152


🏁 Script executed:

# Check if there's device context management anywhere
rg -n "torch.cuda.set_device|with torch.cuda.device|current_device" py/runai_model_streamer/runai_model_streamer/file_streamer/

Repository: run-ai/runai-model-streamer

Length of output: 53


Synchronize the buffer's device, not the current device.

torch.cuda.synchronize() without a device argument waits on torch.cuda.current_device(), but the buffer is created on an explicit device passed to __init__ (line 93, 97). If the current device differs from the buffer's device, next_request() can reuse the buffer while work on the actual destination device is still in flight, causing data races. Use torch.cuda.synchronize(device=self.buffer.device) instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 126 - 135, next_request() currently calls torch.cuda.synchronize()
which synchronizes the current CUDA device, but the buffer device is set in
__init__ (see self.buffer and self._is_nvidia_cuda); change the synchronization
to target the buffer's device to avoid races by calling
torch.cuda.synchronize(device=self.buffer.device) (or the equivalent device
spec) when self._is_nvidia_cuda is true and keep the logging of the elapsed
time; ensure you reference self.buffer.device (or the same attribute used to
store the explicit device in __init__) rather than relying on the current CUDA
device.

Comment thread py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py Outdated
Comment on lines +229 to +230
# Compute alignment for CUDA devices (default 256 bytes, disabled on CPU).
alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Comment mentions 256 bytes but default is 512.

The inline comment states "default 256 bytes" but get_cuda_alignment() in requests_iterator.py returns DEFAULT_CUDA_ALIGNMENT which is 512 bytes according to the snippet. Update the comment to match the actual default.

📝 Proposed fix
-        # Compute alignment for CUDA devices (default 256 bytes, disabled on CPU).
+        # Compute alignment for CUDA devices (default 512 bytes, disabled on CPU).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Compute alignment for CUDA devices (default 256 bytes, disabled on CPU).
alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1
# Compute alignment for CUDA devices (default 512 bytes, disabled on CPU).
alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py`
around lines 229 - 230, Update the inline comment above the alignment assignment
to reflect the correct default CUDA alignment: replace "default 256 bytes" with
"default 512 bytes" so it matches get_cuda_alignment() and
DEFAULT_CUDA_ALIGNMENT; this affects the comment near the assignment to
alignment which uses get_cuda_alignment() when device and
device.startswith("cuda").

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
cpp/streamer/impl/cuda/cuda_loader.cc (1)

96-119: ⚠️ Potential issue | 🔴 Critical

Process-wide singleton context still cannot safely handle mixed-device requests.

Line 117 caches a single CudaDriver (ctx/device) per process. If later requests in the same process target another GPU, CUDA ops (e.g., async HtoD copies) can run under the wrong current context, which can corrupt data or crash.

Please pass device ordinal through the native ABI and switch/select per-device context before stream/copy operations, instead of binding context once at first get().

#!/bin/bash
set -euo pipefail

# 1) Confirm singleton + single ctx/device model.
rg -n -C2 'static const CudaDriver driver|CudaDriver::get\(|CUcontext ctx|CUdevice  device' \
  cpp/streamer/impl/cuda/cuda_loader.cc cpp/streamer/impl/cuda/cuda_loader.h

# 2) Check whether native ABI carries only boolean CUDA flag (no device ordinal).
rg -n -C3 'runai_request|bool[[:space:]]+cuda|read_cuda' cpp py

# 3) Check Python call-sites where device info may be reduced before native call.
rg -n -C3 'runai_request\(|device=.*cuda|cuda=True|cuda=False' py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cpp/streamer/impl/cuda/cuda_loader.cc` around lines 96 - 119, The code
currently caches a single process-wide CudaDriver (loaded in load() and returned
by CudaDriver::get) that retains one CUcontext/CUdevice via
cuDevicePrimaryCtxRetain, which breaks when different device ordinals are later
requested; change the design to accept a device ordinal through the native ABI
and select/switch to the corresponding per-device context before any stream/copy
operations instead of relying on the singleton ctx/device in CudaDriver::get:
add a per-device lookup (or map) keyed by device ordinal, ensure
cuDevicePrimaryCtxRetain/cuDevicePrimaryCtxRelease are called per-device, and
update call sites to pass the device ordinal so functions that use
d.ctx/d.device always set the current context (e.g., via cuCtxSetCurrent) for
that requested device before performing CUDA work.
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (1)

127-135: ⚠️ Potential issue | 🟠 Major

Synchronize the buffer’s CUDA device explicitly before reuse.

At Line 133, torch.cuda.synchronize() targets the current device, not necessarily self.buffer.device. This can still race if devices diverge. Use torch.cuda.synchronize(device=self.buffer.device).

#!/bin/bash
# Verify synchronize call site and whether explicit device is used.
rg -n -C2 'torch\.cuda\.synchronize\(' py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py

# Verify buffer device is explicitly allocated from constructor device.
rg -n -C3 'self\.buffer\s*=\s*torch\.empty|device=device' py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py

Expected result: the synchronize call should include device=self.buffer.device in this method.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 127 - 135, The cuda synchronization currently calls
torch.cuda.synchronize() which synchronizes the current device and can race if
the buffer lives on another device; update the sync in the next_request path
(guarded by self._is_nvidia_cuda) to call
torch.cuda.synchronize(device=self.buffer.device) (or the integer/device value
from self.buffer.device) so the buffer's CUDA device is synchronized before
reuse and keep the existing timing/logging (logger.info) around that call.
🧹 Nitpick comments (1)
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (1)

42-45: Rename id parameter to avoid builtin shadowing.

Line 42 shadows Python’s builtin id, which is flagged by Ruff (A002). Rename to file_id for clarity and lint cleanliness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 42 - 45, The __init__ method currently uses a parameter named id
which shadows Python's builtin; rename the parameter to file_id and update the
assignment to self.file_id (and any other references inside the class: methods,
attributes, or property accessors) to avoid builtin shadowing. Change the
constructor signature in RequestsIterator.__init__ (and any instantiation sites
that pass id to this constructor) to use file_id, and run tests/linter to ensure
all usages (including imports or calls to RequestsIterator(...)) are updated
accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Line 190: The lint error is caused by an unnecessary f-string prefix on the
literal f"MemoryCapMode is Limited, but no limit supplied" in
requests_iterator.py; remove the leading f to make it a plain string literal
(i.e., replace the f-prefixed message used where the code emits this
error/warning) so Ruff F541 is satisfied while preserving the original message
and usage.
- Around line 27-30: The get_cuda_alignment function currently converts
RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR directly to int which will raise
ValueError for non-integer env values; update get_cuda_alignment to safely parse
the env var inside a try/except (catch ValueError and TypeError), falling back
to DEFAULT_CUDA_ALIGNMENT when parsing fails or the env var is missing, then
keep the existing logic that returns val if val > 1 else 1; reference
get_cuda_alignment, RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, and
DEFAULT_CUDA_ALIGNMENT when making the change.
- Around line 175-199: The cap-by-VRAM branch can reduce memory_limit below the
largest per-file stride causing empty requests and silent termination; after
computing memory_limit (in the MemoryCapMode.unlimited and MemoryCapMode.limited
branches where free_cuda_memory is applied), compare memory_limit against the
largest_chunk (compute largest_chunk = max(max(file_chunks.effective_strides,
default=0) for file_chunks in files_chunks)) and if memory_limit < largest_chunk
raise RunaiStreamerMemoryLimitException with a clear message; update the logic
around MemoryCapMode.unlimited and MemoryCapMode.limited to perform this
post-cap guard so callers aren’t left with unusable tiny buffers.

---

Duplicate comments:
In `@cpp/streamer/impl/cuda/cuda_loader.cc`:
- Around line 96-119: The code currently caches a single process-wide CudaDriver
(loaded in load() and returned by CudaDriver::get) that retains one
CUcontext/CUdevice via cuDevicePrimaryCtxRetain, which breaks when different
device ordinals are later requested; change the design to accept a device
ordinal through the native ABI and select/switch to the corresponding per-device
context before any stream/copy operations instead of relying on the singleton
ctx/device in CudaDriver::get: add a per-device lookup (or map) keyed by device
ordinal, ensure cuDevicePrimaryCtxRetain/cuDevicePrimaryCtxRelease are called
per-device, and update call sites to pass the device ordinal so functions that
use d.ctx/d.device always set the current context (e.g., via cuCtxSetCurrent)
for that requested device before performing CUDA work.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 127-135: The cuda synchronization currently calls
torch.cuda.synchronize() which synchronizes the current device and can race if
the buffer lives on another device; update the sync in the next_request path
(guarded by self._is_nvidia_cuda) to call
torch.cuda.synchronize(device=self.buffer.device) (or the integer/device value
from self.buffer.device) so the buffer's CUDA device is synchronized before
reuse and keep the existing timing/logging (logger.info) around that call.

---

Nitpick comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 42-45: The __init__ method currently uses a parameter named id
which shadows Python's builtin; rename the parameter to file_id and update the
assignment to self.file_id (and any other references inside the class: methods,
attributes, or property accessors) to avoid builtin shadowing. Change the
constructor signature in RequestsIterator.__init__ (and any instantiation sites
that pass id to this constructor) to use file_id, and run tests/linter to ensure
all usages (including imports or calls to RequestsIterator(...)) are updated
accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 918af71a-3c14-49f3-bd9c-09f914ed0a6c

📥 Commits

Reviewing files that changed from the base of the PR and between 53c7a21 and 4ef5381.

📒 Files selected for processing (5)
  • cpp/streamer/impl/batch/batch.cc
  • cpp/streamer/impl/cuda/cuda_loader.cc
  • cpp/streamer/impl/cuda/cuda_loader.h
  • py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py
  • py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py
  • cpp/streamer/impl/batch/batch.cc

Comment on lines +27 to +30
def get_cuda_alignment() -> int:
"""Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
return val if val > 1 else 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard invalid CUDA alignment env values to avoid init-time crashes.

At Line 29, a non-integer RUNAI_STREAMER_CUDA_ALIGNMENT will raise ValueError and abort streaming setup. Prefer safe parsing with fallback.

Suggested fix
 def get_cuda_alignment() -> int:
     """Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
-    val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
+    raw = os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))
+    try:
+        val = int(raw)
+    except (TypeError, ValueError):
+        logger.warning(
+            f"[RunAI Streamer] Invalid {RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR}={raw!r}; "
+            f"falling back to {DEFAULT_CUDA_ALIGNMENT}"
+        )
+        val = DEFAULT_CUDA_ALIGNMENT
     return val if val > 1 else 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 27 - 30, The get_cuda_alignment function currently converts
RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR directly to int which will raise
ValueError for non-integer env values; update get_cuda_alignment to safely parse
the env var inside a try/except (catch ValueError and TypeError), falling back
to DEFAULT_CUDA_ALIGNMENT when parsing fails or the env var is missing, then
keep the existing logic that returns val if val > 1 else 1; reference
get_cuda_alignment, RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, and
DEFAULT_CUDA_ALIGNMENT when making the change.

Comment on lines 175 to +199
if memory_mode == MemoryCapMode.unlimited:
memory_limit = sum(sum(file.chunks) for file in files_chunks)
# Use padded strides for the total size so the buffer is large enough.
total_size = sum(sum(file.effective_strides) for file in files_chunks)
if free_cuda_memory is not None:
# For NVIDIA CUDA, cap the buffer at available GPU memory so that we
# don't OOM during allocation. If the model is larger than free VRAM,
# the buffer is reused in chunks just like the limited-memory path.
memory_limit = min(total_size, free_cuda_memory)
else:
memory_limit = total_size
elif memory_mode == MemoryCapMode.largest_chunk:
memory_limit = max(max(file_chunks.chunks) for file_chunks in files_chunks)
memory_limit = max(max(file_chunks.effective_strides) for file_chunks in files_chunks)
elif memory_mode == MemoryCapMode.limited:
if user_memory_limit is None:
raise RunaiStreamerMemoryLimitException(
f"MemoryCapMode is Limited, but no limit supplied"
)
largest_chunk = max((max(file_chunks.chunks, default=0) for file_chunks in files_chunks), default=0)
largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0)
if user_memory_limit < largest_chunk:
raise RunaiStreamerMemoryLimitException(
f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}"
)
memory_limit = min(user_memory_limit, sum(sum(file.chunks) for file in files_chunks))

return FilesRequestsIteratorWithBuffer(memory_limit, files_chunks)
memory_limit = min(user_memory_limit, sum(sum(file.effective_strides) for file in files_chunks))
if free_cuda_memory is not None:
memory_limit = min(memory_limit, free_cuda_memory)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Prevent silent no-progress when CUDA cap falls below largest chunk stride.

After capping by free VRAM, memory_limit may become smaller than the largest effective_stride, causing empty requests and early termination. Add a post-cap guard and raise RunaiStreamerMemoryLimitException explicitly.

Suggested fix
     def with_memory_cap(
         memory_mode: MemoryCapMode,
         files_chunks: List[FileChunks],
         user_memory_limit: Optional[int] = None,
         device: str = "cpu",
     ) -> FilesRequestsIteratorWithBuffer:
         memory_limit = 0
+        largest_stride = max(
+            (max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks),
+            default=0,
+        )
         is_nvidia_cuda = (
             device is not None
             and device.startswith("cuda")
             and torch.version.hip is None
         )
@@
         elif memory_mode == MemoryCapMode.largest_chunk:
-            memory_limit = max(max(file_chunks.effective_strides) for file_chunks in files_chunks)
+            memory_limit = largest_stride
@@
-            largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0)
-            if user_memory_limit < largest_chunk:
+            if user_memory_limit < largest_stride:
                 raise RunaiStreamerMemoryLimitException(
-                    f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}"
+                    f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_stride}"
                 )
             memory_limit = min(user_memory_limit, sum(sum(file.effective_strides) for file in files_chunks))
             if free_cuda_memory is not None:
                 memory_limit = min(memory_limit, free_cuda_memory)
+
+        if memory_limit < largest_stride:
+            raise RunaiStreamerMemoryLimitException(
+                f"Effective memory limit {memory_limit} is smaller than largest chunk stride {largest_stride}"
+            )
🧰 Tools
🪛 Ruff (0.15.6)

[error] 190-190: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 175 - 199, The cap-by-VRAM branch can reduce memory_limit below the
largest per-file stride causing empty requests and silent termination; after
computing memory_limit (in the MemoryCapMode.unlimited and MemoryCapMode.limited
branches where free_cuda_memory is applied), compare memory_limit against the
largest_chunk (compute largest_chunk = max(max(file_chunks.effective_strides,
default=0) for file_chunks in files_chunks)) and if memory_limit < largest_chunk
raise RunaiStreamerMemoryLimitException with a clear message; update the logic
around MemoryCapMode.unlimited and MemoryCapMode.limited to perform this
post-cap guard so callers aren’t left with unusable tiny buffers.

elif memory_mode == MemoryCapMode.limited:
if user_memory_limit is None:
raise RunaiStreamerMemoryLimitException(
f"MemoryCapMode is Limited, but no limit supplied"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove extraneous f-string prefix to satisfy linting.

Line 190 uses f"..." without placeholders (Ruff F541). Use a plain string literal.

Suggested fix
-                    f"MemoryCapMode is Limited, but no limit supplied"
+                    "MemoryCapMode is Limited, but no limit supplied"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
f"MemoryCapMode is Limited, but no limit supplied"
"MemoryCapMode is Limited, but no limit supplied"
🧰 Tools
🪛 Ruff (0.15.6)

[error] 190-190: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
at line 190, The lint error is caused by an unnecessary f-string prefix on the
literal f"MemoryCapMode is Limited, but no limit supplied" in
requests_iterator.py; remove the leading f to make it a plain string literal
(i.e., replace the f-prefixed message used where the code emits this
error/warning) so Ruff F541 is satisfied while preserving the original message
and usage.

Comment on lines +554 to +589
dist.broadcast(gpu_tensor, global_broadcasting_rank, group=self.distribution_group)
total_data_bcast_time += timer() - t0
chunks_to_read[0] -= 1
total_broadcast_chunks += 1
_t_yield = timer()
yield orig_req_idx, orig_chunk_idx, gpu_tensor
_dt = timer() - _t_yield
total_inter_yield_time += _dt
max_inter_yield_time = max(max_inter_yield_time, _dt)
else:
logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: No more tensors to broadcast")

else:
t0 = timer()
dist.broadcast(received_metadata_tensor, global_broadcasting_rank, group=self.distribution_group)
total_meta_bcast_time += timer() - t0

t0 = timer()
count = received_metadata_tensor[0, 0].item()
total_meta_item_time += timer() - t0
if count == 0:
logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: No tensors from rank {global_broadcasting_rank}")
continue

orig_req_idx = received_metadata_tensor[1, 0].item()
orig_chunk_idx = received_metadata_tensor[1, 1].item()
chunk_size = received_metadata_tensor[1, 2].item()

logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: Receiving tensor {orig_req_idx}:{orig_chunk_idx} size {humanize.naturalsize(chunk_size)} from rank {global_broadcasting_rank}")

received_view = receive_buffer[:chunk_size]
t0 = timer()
torch.cuda.synchronize()
total_sync_time += timer() - t0
t0 = timer()
dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Tensor shape mismatch breaks dist.broadcast between broadcasting and receiving ranks

dist.broadcast requires every participating rank to have a tensor with the same shape. The broadcasting rank sends gpu_tensor whose shape is (1, N) (because file_streamer._get_chunks_cuda() yields chunk_tensor.view(1, -1)), but receiving ranks allocate receive_buffer as a flat 1-D tensor and pass receive_buffer[:chunk_size] with shape (N,). NCCL/Gloo will reject the broadcast at runtime due to this shape mismatch.

The simplest fix is to reshape the receive slice to match:

# Before the broadcast on the receiving rank:
received_view = receive_buffer[:chunk_size].view(1, -1)   # shape (1, chunk_size)
torch.cuda.synchronize()
dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group)
chunks_to_read[0] -= 1
total_broadcast_chunks += 1
_t_yield = timer()
yield orig_req_idx, orig_chunk_idx, received_view

Alternatively, the view(1, -1) call in file_streamer._get_chunks_cuda() (line 157 of file_streamer.py) could be dropped so both sides use 1-D tensors, but that would change the shape contract seen by downstream consumers.

Comment on lines 135 to +163
// cancel responder in case of an error - cancelled response will not delay sending the next request
utils::ScopeGuard __responder_release([&](){_responder->cancel();});

// When cuda=true and dsts has one entry per tensor (rather than one per file),
// pass only the first element to the Assigner so its size-check passes.
// The Assigner-computed destinations are overridden in Batches::build_tasks for CUDA.
const bool is_per_tensor_cuda = cuda && (dsts.size() > paths.size());
std::vector<void*> assigner_dsts = is_per_tensor_cuda ? std::vector<void*>{dsts[0]} : dsts;

// divide reading between workers
Assigner assigner(paths, file_offsets, bytesizes, dsts, _config);
Assigner assigner(paths, file_offsets, bytesizes, assigner_dsts, _config);

std::vector<Workload> workloads(assigner.num_workloads());

// Create batches for each file

size_t tensor_offset = 0;
for (size_t i = 0; i < paths.size(); ++i)
{
auto params = handle_s3(i, paths[i], credentials);
LOG(DEBUG) << "Creating batches for file index " << i << " path: " << paths[i];
Batches batches(i, assigner.file_assignments(i), _config, _responder, paths[i], params, internal_sizes[i]);

// Slice the flat per-tensor dsts into a per-file vector for this file's Batches.
std::vector<void*> file_cuda_dsts;
if (is_per_tensor_cuda)
{
const size_t num_tensors = internal_sizes[i].size();
file_cuda_dsts.assign(dsts.begin() + tensor_offset, dsts.begin() + tensor_offset + num_tensors);
tensor_offset += num_tensors;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Per-tensor CUDA destinations broken for multi-file requests where every file has exactly one tensor

The is_per_tensor_cuda guard uses dsts.size() > paths.size() to decide whether to treat dsts as a flat per-tensor pointer array, but this check returns false whenever every file has exactly one tensor (e.g. two single-weight files). In that case total_dsts == paths.size(), so:

  • assigner_dsts = dsts (unchanged, not truncated to {dsts[0]})
  • file_cuda_dsts for every file is left empty → _cuda_tensor_dsts.empty() == true
  • build_tasks falls back to current_request_destination, which the Assigner derives from dsts[0] advanced by actual (unpadded) byte sizes

Because cuda_tensor_ptrs are spaced by padded strides (align_up(size, alignment)), the Assigner's linear offset lands at dsts[0] + actual_size_0 instead of the correct dsts[1] = dsts[0] + aligned_stride_0. Every tensor in files 1…N-1 is written to the wrong GPU address, silently corrupting the model weights.

Reproducer: two safetensors files each containing a single tensor whose byte size is not a multiple of the alignment (e.g. 500 bytes with RUNAI_STREAMER_CUDA_ALIGNMENT=512).

The fix is to treat is_per_tensor_cuda as always true when cuda != 0, since the Python side always emits one pointer per tensor for CUDA requests:

// dsts always contains one pointer per tensor for CUDA requests.
const bool is_per_tensor_cuda = (cuda != 0);
std::vector<void*> assigner_dsts = is_per_tensor_cuda ? std::vector<void*>{dsts[0]} : dsts;

and correspondingly always populate file_cuda_dsts for every file:

if (cuda != 0)
{
    const size_t num_tensors = internal_sizes[i].size();
    file_cuda_dsts.assign(dsts.begin() + tensor_offset,
                          dsts.begin() + tensor_offset + num_tensors);
    tensor_offset += num_tensors;
}

Comment on lines +394 to +437

file_specs = [{"size": 2580, "chunks": [500, 260, 260, 260, 260, 260, 260, 260, 260]}]
requests = self._prepare_file_requests(file_specs)
original_data_map = {}
for req in requests:
with open(req.path, "rb") as f:
original_data_map[req.id] = f.read()

cuda_mock.reset_cuMemcpyHtoDAsync_call_count()

# Redirect torch.empty(device="cuda:X") to CPU so tests run without a GPU.
_orig_empty = torch.empty
def _cpu_empty(*args, **kwargs):
if str(kwargs.get("device", "")).startswith("cuda"):
kwargs = {**kwargs, "device": "cpu"}
return _orig_empty(*args, **kwargs)

reconstructed_data_map = {req.id: [None] * len(req.chunks) for req in requests}
env_vars = {"RUNAI_STREAMER_DIST": "1", "RUNAI_STREAMER_DIST_BUFFER_MIN_BYTESIZE": "0"}
with patch.dict(os.environ, env_vars), \
patch("torch.cuda.is_available", return_value=True), \
patch("torch.empty", side_effect=_cpu_empty), \
patch("torch.cuda.mem_get_info", return_value=(2**33, 2**33)), \
patch("torch.cuda.synchronize"):
with DistributedStreamer() as streamer:
# Bypass the gloo+non-cpu backend check in set_is_distributed so
# distributed streaming is active even with device="cuda:0".
def _force_distributed(is_distributed, device):
streamer.is_distributed = (
is_distributed
and dist.is_initialized()
and dist.get_world_size() > 1
)
with patch.object(streamer, "set_is_distributed", side_effect=_force_distributed):
streamer.stream_files(requests, None, "cuda:0", True)
for req_id, chunk_idx, data_tensor in streamer.get_chunks():
reconstructed_data_map[req_id][chunk_idx] = data_tensor.cpu().numpy().tobytes()

call_count = cuda_mock.get_cuMemcpyHtoDAsync_call_count()
self.assertGreater(
call_count, 0,
f"Rank {self.rank}: expected cuMemcpyHtoDAsync to be called but count was {call_count}"
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 torch.zeros CUDA allocation not intercepted — test will fail on non-GPU machines even with mock loaded

The test patches torch.empty to redirect CUDA tensors to CPU, but _get_chunks_cuda in distributed_streamer.py allocates two metadata tensors via torch.zeros:

batch_metadata_tensor    = torch.zeros(2, 4, dtype=torch.int64, device=self.device)   # device="cuda:0"
received_metadata_tensor = torch.zeros(2, 4, dtype=torch.int64, device=self.device)

In PyTorch, torch.zeros is a C++ extension that does not route through the Python-level torch.empty; it calls the C++ allocator directly. So the patch("torch.empty", side_effect=_cpu_empty) leaves these torch.zeros calls untouched, and on a machine without a real CUDA device they raise RuntimeError: No CUDA GPUs are available, even when the mock libcuda.so.1 is present.

Fix: patch torch.zeros with the same redirect, or use unittest.mock.patch.object on torch:

with patch("torch.zeros", side_effect=lambda *a, **kw: _orig_empty(*a, **{**kw, "device": "cpu"}) if str(kw.get("device","")).startswith("cuda") else torch.zeros.__wrapped__(*a, **kw)):
    ...

A simpler and more robust alternative is to patch torch.device so self.device resolves to torch.device("cpu"), which causes both torch.empty and torch.zeros (and any future allocations) to land on CPU without per-function patching.

Comment on lines +27 to +30
def get_cuda_alignment() -> int:
"""Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
return val if val > 1 else 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing upper-bound validation — regression from the removed get_dist_buffer_alignment()

The old helper raised a ValueError for alignment values outside [0, MAX_DIST_BUFFER_ALIGNMENT] (1 MiB). The new function silently accepts any positive integer. Setting RUNAI_STREAMER_CUDA_ALIGNMENT=1073741824 (1 GiB) would cause over-allocation of the staging buffer by up to alignment - 1 bytes per tensor, potentially leading to OOM on the GPU.

Suggested change
def get_cuda_alignment() -> int:
"""Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
return val if val > 1 else 1
def get_cuda_alignment() -> int:
"""Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
MAX_CUDA_ALIGNMENT = 1024 * 1024 # 1 MiB — sanity ceiling
val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
if val < 0 or val > MAX_CUDA_ALIGNMENT:
raise ValueError(
f"Invalid {RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR}: {val}, "
f"must be between 0 and {MAX_CUDA_ALIGNMENT}"
)
return val if val > 1 else 1

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (2)

127-137: ⚠️ Potential issue | 🟠 Major

Still synchronize the staging buffer by self.buffer.device.

Bare torch.cuda.synchronize() has the same current-device problem here. If the iterator allocates on cuda:1 while current_device() is cuda:0, next_request() can recycle the GPU buffer before direct reads or downstream NCCL consumers on the real device finish.

Suggested fix
-            torch.cuda.synchronize()
+            torch.cuda.synchronize(device=self.buffer.device)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 127 - 137, The cuda sync call in next_request currently uses a bare
torch.cuda.synchronize(), which only syncs the current device and can race if
buffers live on another GPU; update the logic in next_request (the block gated
by self._is_nvidia_cuda and self.file_buffers) to call
torch.cuda.synchronize(device=self.buffer.device) or otherwise derive the
correct torch.device from self.buffer.device (ensuring it is an int or
torch.device) so the staging buffer's device is synchronized before recycling
the buffer; keep the timing/logging (_t0, _sync_ms, logger.info) but use the
buffer-specific device in the synchronize call.

176-203: ⚠️ Potential issue | 🟠 Major

Still reject VRAM caps that fall below the largest padded stride.

free_cuda_memory is applied after the user-limit check, so memory_limit can still drop below the largest effective_stride. FilesRequestsIterator.next_request() then returns an empty request, and the direct path either terminates immediately or fails later in distributed mode.

Suggested fix
         memory_limit = 0
+        largest_stride = max(
+            (max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks),
+            default=0,
+        )
@@
-            largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0)
-            if user_memory_limit < largest_chunk:
+            if user_memory_limit < largest_stride:
                 raise RunaiStreamerMemoryLimitException(
-                    f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}"
+                    f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_stride}"
                 )
@@
         elif memory_mode == MemoryCapMode.limited:
             ...
             if free_cuda_memory is not None:
                 memory_limit = min(memory_limit, free_cuda_memory)
+
+        if memory_limit < largest_stride:
+            raise RunaiStreamerMemoryLimitException(
+                f"Effective memory limit {memory_limit} cannot be smaller than "
+                f"the largest padded stride {largest_stride}"
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 176 - 203, Compute the largest padded stride once (largest_chunk =
max(max(file_chunks.effective_strides, default=0) for file_chunks in
files_chunks)) and ensure after applying any GPU cap (free_cuda_memory) the
resolved memory_limit is never below largest_chunk; if the min(...) that applies
free_cuda_memory would make memory_limit < largest_chunk, raise
RunaiStreamerMemoryLimitException with a clear message. Update the logic around
MemoryCapMode.unlimited, MemoryCapMode.limited (and MemoryCapMode.largest_chunk
if needed) so the GPU-cap step uses the precomputed largest_chunk check before
returning FilesRequestsIteratorWithBuffer(memory_limit,...).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`:
- Around line 372-382: The code re-invokes _use_cuda_direct() right before
calling file_streamer.stream_files which can flip after file paths are mutated;
fix this by computing and caching the CUDA-direct decision once (e.g. set
self._cuda_direct_enabled = False in __init__ and then assign it to the actual
decision early) and use that cached flag to determine read_device (and in the
other similar call site around the later stream_files usage) instead of calling
_use_cuda_direct() again; update references to use self._cuda_direct_enabled
when selecting the CPU vs CUDA path so remote-to-local cache mutations in
FileStreamer do not change the chosen path.
- Around line 588-593: The code calls torch.cuda.synchronize() which syncs the
current device, not necessarily the device of receive_buffer; change the sync to
target receive_buffer's device before reusing it (e.g. call
torch.cuda.synchronize(receive_buffer.device) or
torch.cuda.synchronize(self.device) / torch.cuda.synchronize(self.device.index)
as appropriate) so that NCCL work on the buffer's device completes before
calling dist.broadcast(received_view, global_broadcasting_rank,
group=self.distribution_group); keep the existing timing accounting
(total_sync_time) around that synchronized call.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 42-44: The constructor parameter name `id` in
RequestsIterator.__init__ shadows the builtin and must be renamed to satisfy
Ruff A002; change the parameter (e.g., to `chunk_id` or `id_`) in the __init__
signature and update any uses inside __init__ to assign the value to the
existing attribute `self.id` (leave `self.id` unchanged to preserve the public
API), ensuring all local references in requests_iterator.py to the old parameter
name are updated accordingly.

---

Duplicate comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 127-137: The cuda sync call in next_request currently uses a bare
torch.cuda.synchronize(), which only syncs the current device and can race if
buffers live on another GPU; update the logic in next_request (the block gated
by self._is_nvidia_cuda and self.file_buffers) to call
torch.cuda.synchronize(device=self.buffer.device) or otherwise derive the
correct torch.device from self.buffer.device (ensuring it is an int or
torch.device) so the staging buffer's device is synchronized before recycling
the buffer; keep the timing/logging (_t0, _sync_ms, logger.info) but use the
buffer-specific device in the synchronize call.
- Around line 176-203: Compute the largest padded stride once (largest_chunk =
max(max(file_chunks.effective_strides, default=0) for file_chunks in
files_chunks)) and ensure after applying any GPU cap (free_cuda_memory) the
resolved memory_limit is never below largest_chunk; if the min(...) that applies
free_cuda_memory would make memory_limit < largest_chunk, raise
RunaiStreamerMemoryLimitException with a clear message. Update the logic around
MemoryCapMode.unlimited, MemoryCapMode.limited (and MemoryCapMode.largest_chunk
if needed) so the GPU-cap step uses the precomputed largest_chunk check before
returning FilesRequestsIteratorWithBuffer(memory_limit,...).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a7fd87b5-8f8e-4067-bd51-9b610bff01e4

📥 Commits

Reviewing files that changed from the base of the PR and between 4ef5381 and 79240e0.

📒 Files selected for processing (3)
  • py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py
  • py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py
  • py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py

Comment on lines +372 to +382
read_device = device if self._use_cuda_direct() else "cpu"
original_memory_limit = os.environ.get("RUNAI_STREAMER_MEMORY_LIMIT")
try:
# for distributed streaming only - change default memory limit to unlimited
# Change default memory limit to unlimited so the file_streamer reads as
# much as possible in one pass. For CUDA, with_memory_cap caps the buffer
# at free GPU memory automatically, preventing OOM.
if self.original_group_rank == 0:
logger.debug(f"[RunAI Streamer][Distributed] Setting memory limit to unlimited")
if original_memory_limit == None:
logger.debug(f"[RunAI Streamer][Distributed] Setting memory limit to unlimited, read device: {read_device}")
if original_memory_limit is None:
os.environ["RUNAI_STREAMER_MEMORY_LIMIT"] = "-1"
self.file_streamer.stream_files(self.rank_file_chunks_list, credentials, "cpu")
self.file_streamer.stream_files(self.rank_file_chunks_list, credentials, read_device)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cache the CUDA-direct decision instead of recomputing it from mutable paths.

stream_files() decides read_device before FileStreamer.stream_files(...), but py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py mutates remote URIs into local cache paths. Re-evaluating _use_cuda_direct() here can therefore flip a remote request from CPU mode into _get_chunks_cuda(), which breaks the “remote backends always use the CPU path” behavior and can make NCCL ranks try to broadcast CPU chunks as CUDA tensors.

Suggested fix
-        read_device = device if self._use_cuda_direct() else "cpu"
+        self._cuda_direct_enabled = self._use_cuda_direct()
+        read_device = device if self._cuda_direct_enabled else "cpu"
@@
-        if self._use_cuda_direct():
+        if self._cuda_direct_enabled:
             yield from self._get_chunks_cuda()
             return

Initialize self._cuda_direct_enabled = False in __init__ as well.

Also applies to: 396-398

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`
around lines 372 - 382, The code re-invokes _use_cuda_direct() right before
calling file_streamer.stream_files which can flip after file paths are mutated;
fix this by computing and caching the CUDA-direct decision once (e.g. set
self._cuda_direct_enabled = False in __init__ and then assign it to the actual
decision early) and use that cached flag to determine read_device (and in the
other similar call site around the later stream_files usage) instead of calling
_use_cuda_direct() again; update references to use self._cuda_direct_enabled
when selecting the CPU vs CUDA path so remote-to-local cache mutations in
FileStreamer do not change the chosen path.

Comment on lines +588 to +593
received_view = receive_buffer[:chunk_size]
t0 = timer()
torch.cuda.synchronize()
total_sync_time += timer() - t0
t0 = timer()
dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py | head -n 600 | tail -n 50

Repository: run-ai/runai-model-streamer

Length of output: 1894


🏁 Script executed:

sed -n '550,620p' py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py

Repository: run-ai/runai-model-streamer

Length of output: 4466


🏁 Script executed:

sed -n '1,600p' py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py | grep -n "receive_buffer" | head -20

Repository: run-ai/runai-model-streamer

Length of output: 572


🌐 Web query:

torch.cuda.synchronize() documentation device parameter behavior

💡 Result:

torch.cuda.synchronize(device=None) blocks the host (CPU) until all queued CUDA work on the specified CUDA device has finished—i.e., it “wait[s] for all kernels in all streams on a CUDA device to complete.” [1]

device parameter behavior:

  • Type: torch.device or int (CUDA device index). [1]
  • Default (None): synchronizes the current CUDA device, as returned by torch.cuda.current_device(). [1]
  • Scope: synchronizes all streams on that device (not a single stream). [1]
  • Caveat (multi-thread / multi-GPU): there has been a reported issue where calling torch.cuda.synchronize() in one Python thread can block CUDA execution in other threads even when they are using other devices. [2]

For per-stream synchronization, use torch.cuda.Stream.synchronize() instead (it targets a specific stream rather than “all streams on a device”). [3]

Sources: [1] PyTorch API docs, [2] PyTorch GitHub issue #24963, [3] PyTorch Forums discussion.


Synchronize the correct device before reusing receive_buffer.

torch.cuda.synchronize() without a device parameter waits on torch.cuda.current_device(), not receive_buffer.device (which is self.device). If the current device differs from the actual buffer device, the subsequent dist.broadcast() can overwrite the buffer while NCCL work on the real destination device is still in flight.

Suggested fix
-                        torch.cuda.synchronize()
+                        torch.cuda.synchronize(device=receive_buffer.device)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`
around lines 588 - 593, The code calls torch.cuda.synchronize() which syncs the
current device, not necessarily the device of receive_buffer; change the sync to
target receive_buffer's device before reusing it (e.g. call
torch.cuda.synchronize(receive_buffer.device) or
torch.cuda.synchronize(self.device) / torch.cuda.synchronize(self.device.index)
as appropriate) so that NCCL work on the buffer's device completes before
calling dist.broadcast(received_view, global_broadcasting_rank,
group=self.distribution_group); keep the existing timing accounting
(total_sync_time) around that synchronized call.

Comment on lines +42 to 44
def __init__(self, id: int, path: str, offset: int, chunks: List[int],
buffer_strides: Optional[List[int]] = None) -> None:
self.id = id # the id of the file chunk must be unique in the context of a single stream_files request
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Rename the id parameter to satisfy Ruff A002.

This shadows Python’s builtin id and will keep lint red. Renaming just the constructor argument is enough; self.id can stay if that attribute is part of the existing API.

Suggested fix
-    def __init__(self, id: int, path: str, offset: int, chunks: List[int],
+    def __init__(self, file_id: int, path: str, offset: int, chunks: List[int],
                  buffer_strides: Optional[List[int]] = None) -> None:
-        self.id = id
+        self.id = file_id
🧰 Tools
🪛 Ruff (0.15.6)

[error] 42-42: Function argument id is shadowing a Python builtin

(A002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 42 - 44, The constructor parameter name `id` in
RequestsIterator.__init__ shadows the builtin and must be renamed to satisfy
Ruff A002; change the parameter (e.g., to `chunk_id` or `id_`) in the __init__
signature and update any uses inside __init__ to assign the value to the
existing attribute `self.id` (leave `self.id` unchanged to preserve the public
API), ensuring all local references in requests_iterator.py to the old parameter
name are updated accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant