Conversation
Greptile SummaryThis PR introduces an NVIDIA CUDA direct-streaming path for both single-rank and distributed model loading. Instead of reading file data into a pageable CPU buffer and then calling Key issues found:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant PY as Python (SafetensorsStreamer)
participant FS as FileStreamer
participant RI as RequestsIterator
participant CPP as C++ Streamer (runai_request)
participant BAT as Batch::read_cuda
participant GPU as GPU Memory
PY->>RI: FileChunks(chunks, buffer_strides=padded)
PY->>FS: stream_files(file_chunks, device="cuda:0")
FS->>RI: with_memory_mode(device="cuda:0")
RI->>GPU: torch.empty(buffer_size, dtype=uint8, device="cuda:0")
RI-->>FS: cuda_tensor_ptrs (aligned per-tensor GPU ptrs)
FS->>CPP: runai_request(paths, offsets, sizes, cuda_tensor_ptrs, cuda=1)
CPP->>CPP: CudaDriver::get() — init CUDA context on calling thread
loop Worker threads (per Batch)
BAT->>BAT: g_cuda_staging.ensure() → cuMemAllocHost (pinned host buf)
BAT->>BAT: reader->read(block) → staging_buf
BAT->>GPU: cuMemcpyHtoDAsync(aligned_gpu_ptr, staging_buf, block_size)
BAT->>BAT: cuStreamSynchronize → reuse staging_buf
end
FS->>FS: get_chunks() → _get_chunks_cuda()
FS-->>PY: yield (file_path, chunk_idx, gpu_tensor.view(1,-1))
Note over PY,GPU: Distributed path (CUDA direct)
participant DS as DistributedStreamer
participant R1 as Rank 0 (broadcaster)
participant R2 as Rank 1 (receiver)
DS->>R1: _get_chunks_cuda() — next tensor from file_streamer (already on GPU)
R1->>R1: dist.broadcast(metadata, src=rank0)
R2->>R2: dist.broadcast(metadata, src=rank0) — receive count+size
R2->>R2: torch.cuda.synchronize() — ensure prev receive_buffer free
R1->>R1: dist.broadcast(gpu_tensor, src=rank0)
R2->>R2: dist.broadcast(receive_buffer[:chunk_size], src=rank0)
R1-->>DS: yield orig_req_idx, orig_chunk_idx, gpu_tensor
R2-->>DS: yield orig_req_idx, orig_chunk_idx, received_view
Last reviewed commit: "CR fixes" |
WalkthroughAdds optional CUDA device-memory streaming: build/tooling to expose CUDA headers and a libcuda mock, a dlopen-based CUDA loader, C++ paths to read into GPU memory with async cuMemcpy, and Python changes to detect/use CUDA destinations with aligned GPU buffers. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as Worker Thread
participant Batch as Batch
participant CudaDriver as CudaDriver (dlopen)
participant Host as Pinned Host Memory
participant GPU as GPU Device Memory
participant Stream as CUDA Stream
Worker->>Batch: execute(config, stopped)
Batch->>CudaDriver: CudaDriver::get()
CudaDriver-->>Batch: CudaDriver* (symbols resolved / ctx retained)
Batch->>CudaDriver: cuStreamCreate()
CudaDriver-->>Stream: stream handle
loop per chunk
Batch->>Host: read file into pinned staging buffer
Host-->>Batch: chunk bytes
Batch->>CudaDriver: cuMemcpyHtoDAsync(dstDevice, hostPtr, size, stream)
CudaDriver->>GPU: async copy enqueued
Batch->>CudaDriver: cuStreamSynchronize(stream)
CudaDriver-->>Batch: copy complete
Batch->>Batch: finished_until(task.end, Success)
end
Batch->>CudaDriver: cuStreamDestroy_v2(stream)
CudaDriver-->>Stream: destroy
Batch-->>Worker: complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment Tip CodeRabbit can use TruffleHog to scan for secrets in your code with verification capabilities.Add a TruffleHog config file (e.g. trufflehog-config.yml, trufflehog.yml) to your project to customize detectors and scanning behavior. The tool runs only when a config file is present. |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (4)
.devcontainer/Dockerfile (1)
5-10: Add a non-root runtime user in the final image.The Dockerfile has no
USERdirective, which means the container defaults to running as root. Consider adding a non-root user for improved security posture.Suggested hardening patch
FROM ubuntu:20.04 # Copy CUDA headers for compile-time type checking of the driver API (cuda.h etc.). # The actual libcuda.so is loaded at runtime via dlopen, so no CUDA installation # is required on the build host or the final image. COPY --from=cuda-headers /usr/local/cuda/include/ /usr/local/cuda/include/ + +# Drop root for runtime/dev shell usage +RUN useradd --create-home --uid 10001 appuser +USER appuser🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.devcontainer/Dockerfile around lines 5 - 10, Add a non-root runtime user by creating a dedicated user and group in the Dockerfile (e.g., via useradd/addgroup), set a home directory, ensure relevant files/directories (like /usr/local/cuda/include/) are chowned to that user, and add a USER directive near the end of the Dockerfile to switch from root to the new user; ensure any build-time COPY steps that require root remain before the USER switch and that runtime processes run under the new non-root account for improved security.cpp/streamer/impl/batches/batches.h (1)
29-31: Consider acceptingcuda_tensor_dstsby rvalue reference to avoid potential copy overhead.The
std::vector<void*> cuda_tensor_dsts = {}parameter is passed by value. For callers that can move the vector, this is fine, but for large vectors passed from lvalues, an unnecessary copy occurs. Consider usingstd::vector<void*>&& cuda_tensor_dsts = {}or providing an overload.♻️ Alternative signature
Batches(unsigned file_index, const std::vector<FileReadTask> & file_read_tasks, std::shared_ptr<const Config> config, std::shared_ptr<common::Responder> responder, const std::string & path, const common::s3::S3ClientWrapper::Params & params, const std::vector<size_t> & internal_sizes, bool cuda = false, - std::vector<void*> cuda_tensor_dsts = {}); + std::vector<void*> cuda_tensor_dsts = {}); // Note: relies on copy elision or explicit std::move at call sitesIf copies are a concern, you could use:
std::vector<void*>&& cuda_tensor_dsts = {});🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cpp/streamer/impl/batches/batches.h` around lines 29 - 31, The parameter cuda_tensor_dsts in the function/constructor declaration that currently reads "std::vector<void*> cuda_tensor_dsts = {}" should be changed to accept an rvalue reference to avoid unnecessary copies from lvalues: replace it with "std::vector<void*>&& cuda_tensor_dsts = {}" (or alternatively add an overload taking "const std::vector<void*>&" and one taking "std::vector<void*>&&") and update the corresponding implementation(s) that reference cuda_tensor_dsts (e.g., the function/constructor in batches.h and its definition) to std::move from the rvalue reference where appropriate.py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py (1)
584-590: Consider whethertorch.cuda.synchronize()is necessary before receive.The
torch.cuda.synchronize()call beforedist.broadcast()on receiving ranks adds latency. The NCCL broadcast operation is already stream-synchronized. Unless there's a specific race condition being protected against (e.g., prior async ops onreceive_buffer), this synchronization may be unnecessary overhead.If intentional, a brief comment explaining why would help future maintainers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py` around lines 584 - 590, The explicit torch.cuda.synchronize() call before dist.broadcast on the receiving path (around receive_buffer handling and the broadcast call in distributed_streamer.py) introduces extra latency; either remove the synchronize() to rely on NCCL/stream synchronization for dist.broadcast or, if there is a specific async-write/read race you are protecting, keep it but add a concise comment above the synchronize() explaining the exact race and why a full device sync is required; update bookkeeping (total_sync_time) accordingly if you remove the sync.cpp/streamer/impl/streamer/streamer.cc (1)
150-165: Consider adding bounds validation for per-tensor CUDA destinations.The slicing logic assumes
dstscontains exactlysum(internal_sizes[i].size())entries. If there's a mismatch (e.g., Python bindings pass wrong number of pointers), iterators could exceeddsts.end(), causing undefined behavior.Since
verify_requestsalready validates inputs, you could extend it to check:if (cuda && dsts.size() > paths.size()) { size_t expected = std::accumulate(num_sizes.begin(), num_sizes.end(), 0u); ASSERT(dsts.size() == expected) << "CUDA mode: expected " << expected << " dsts, got " << dsts.size(); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cpp/streamer/impl/streamer/streamer.cc` around lines 150 - 165, The slicing of per-tensor CUDA destinations can read past dsts and cause UB; add bounds validation before or inside the loop (or better yet extend verify_requests) to ensure when is_per_tensor_cuda (or cuda) is true that dsts.size() equals the sum of internal_sizes[i].size() for all files and that tensor_offset + num_tensors <= dsts.size() before calling file_cuda_dsts.assign; on mismatch ASSERT or return an error (use the same ASSERT/log pattern as elsewhere) referencing dsts, internal_sizes, tensor_offset, and Batches so the slicing cannot exceed dsts.end().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cpp/streamer/impl/batch/batch.cc`:
- Around line 324-346: The ensure function in batch.cc does not check CUresult
from cuStreamCreate and cuMemAllocHost, which can leave ptr null while capacity
is set and cause undefined behavior; modify batch::ensure to capture and check
the CUresult of drv.cuStreamCreate(...) and drv.cuMemAllocHost(...), only set
capacity = needed after cuMemAllocHost succeeds, and return nullptr on any
failure (do not leave ptr/capacity inconsistent); then update the caller
read_cuda() to check the returned pointer from g_cuda_staging.ensure(...) and
throw or handle an error (e.g., throw
common::Exception(common::ResponseCode::UnknownError)) if ensure returns
nullptr.
In `@cpp/streamer/impl/cuda/cuda_loader.cc`:
- Around line 95-119: The loader currently retains a single primary context in
load() via cuDevicePrimaryCtxRetain and returns a process-wide singleton from
CudaDriver::get(), which prevents per-request device switching and causes
cuMemcpyHtoDAsync to operate against the wrong device; update the native ABI to
accept a device ordinal from runai_request and implement per-device context
handling by either (a) caching a map of device -> CUcontext and ensuring
cuCtxSetCurrent(ctx) is called before any stream operations (e.g., before
cuMemcpyHtoDAsync) or (b) calling cuDevicePrimaryCtxRetain/cuCtxSetCurrent
on-demand for the requested device, and ensure CudaDriver::get()/load() no
longer exposes a single fixed device context so each stream operation sets the
correct current context.
In `@cpp/WORKSPACE`:
- Around line 7-11: Replace the hard-coded new_local_repository("cuda",
path="/usr/local/cuda", build_file="//third_party:cuda.BUILD") with a
configurable approach: implement a small repository rule (or wrapper in
WORKSPACE) that looks for an environment variable (e.g., CUDA_HOME) and uses
that value as the path, falling back to "/usr/local/cuda" if unset;
alternatively document and demonstrate using Bazel's override mechanism (e.g.,
--override_repository=cuda=/your/cuda/path) and add a .bazelrc example so
callers can override the "cuda" local repository without editing WORKSPACE;
update the repository declaration referenced by name="cuda" and
build_file="//third_party:cuda.BUILD" and add a note in README or comments
explaining the CUDA_HOME and --override_repository options.
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py`:
- Around line 391-436: The test incorrectly asserts that the local
cuda_mock.get_cuMemcpyHtoDAsync_call_count() (call_count) is > 0 on every rank;
instead compute total_chunks = sum(len(req.chunks) for req in requests) and if
dist.get_world_size() > total_chunks skip or avoid the per-rank assertion,
otherwise perform an all-reduce of the local call_count (use dist.all_reduce on
a tensor/int to produce global_call_count) and assert global_call_count > 0;
update the assertion logic that references call_count and
cuda_mock.get_cuMemcpyHtoDAsync_call_count() accordingly so pure-receiver ranks
don’t cause false failures.
In `@py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py`:
- Around line 97-112: The code currently passes the original device (e.g.,
"cuda:0") into FilesRequestsIteratorWithBuffer.with_memory_mode unconditionally,
causing CUDA buffers to be allocated even when torch.cuda.is_available() is
False or when any request path is remote; fix it by computing a use_cuda_direct
boolean that requires _is_nvidia_cuda (or device startswith "cuda" and
torch.version.hip is None), torch.cuda.is_available(), and that all
file_stream_requests paths are local (no s3://, gs://, az://), then pass device
if use_cuda_direct is True otherwise pass "cpu" into
FilesRequestsIteratorWithBuffer.with_memory_mode; update references in this
block (device_str, _is_nvidia_cuda, file_stream_requests,
FilesRequestsIteratorWithBuffer.with_memory_mode) and ensure handle_object_store
is still called before the local-path check so paths reflect any translation.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 126-135: next_request() currently calls torch.cuda.synchronize()
which synchronizes the current CUDA device, but the buffer device is set in
__init__ (see self.buffer and self._is_nvidia_cuda); change the synchronization
to target the buffer's device to avoid races by calling
torch.cuda.synchronize(device=self.buffer.device) (or the equivalent device
spec) when self._is_nvidia_cuda is true and keep the logging of the elapsed
time; ensure you reference self.buffer.device (or the same attribute used to
store the explicit device in __init__) rather than relying on the current CUDA
device.
In `@py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py`:
- Around line 44-53: The CUDA fallback calls dst.data_ptr() on elements of dsts
but dsts is typed as List[memoryview], causing a latent AttributeError; update
the logic in runai_request so that when cuda is True you require
cuda_tensor_ptrs (raise a clear ValueError if cuda and cuda_tensor_ptrs is None)
and remove or disable the unreachable memoryview-based fallback that uses
dst.data_ptr(); also update the type hints to reflect that dsts (or the new
cuda_tensor_ptrs param) must be List[torch.Tensor] when cuda=True and adjust any
docstring/comments around internal_sizes and dsts accordingly.
In
`@py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py`:
- Around line 229-230: Update the inline comment above the alignment assignment
to reflect the correct default CUDA alignment: replace "default 256 bytes" with
"default 512 bytes" so it matches get_cuda_alignment() and
DEFAULT_CUDA_ALIGNMENT; this affects the comment near the assignment to
alignment which uses get_cuda_alignment() when device and
device.startswith("cuda").
---
Nitpick comments:
In @.devcontainer/Dockerfile:
- Around line 5-10: Add a non-root runtime user by creating a dedicated user and
group in the Dockerfile (e.g., via useradd/addgroup), set a home directory,
ensure relevant files/directories (like /usr/local/cuda/include/) are chowned to
that user, and add a USER directive near the end of the Dockerfile to switch
from root to the new user; ensure any build-time COPY steps that require root
remain before the USER switch and that runtime processes run under the new
non-root account for improved security.
In `@cpp/streamer/impl/batches/batches.h`:
- Around line 29-31: The parameter cuda_tensor_dsts in the function/constructor
declaration that currently reads "std::vector<void*> cuda_tensor_dsts = {}"
should be changed to accept an rvalue reference to avoid unnecessary copies from
lvalues: replace it with "std::vector<void*>&& cuda_tensor_dsts = {}" (or
alternatively add an overload taking "const std::vector<void*>&" and one taking
"std::vector<void*>&&") and update the corresponding implementation(s) that
reference cuda_tensor_dsts (e.g., the function/constructor in batches.h and its
definition) to std::move from the rvalue reference where appropriate.
In `@cpp/streamer/impl/streamer/streamer.cc`:
- Around line 150-165: The slicing of per-tensor CUDA destinations can read past
dsts and cause UB; add bounds validation before or inside the loop (or better
yet extend verify_requests) to ensure when is_per_tensor_cuda (or cuda) is true
that dsts.size() equals the sum of internal_sizes[i].size() for all files and
that tensor_offset + num_tensors <= dsts.size() before calling
file_cuda_dsts.assign; on mismatch ASSERT or return an error (use the same
ASSERT/log pattern as elsewhere) referencing dsts, internal_sizes,
tensor_offset, and Batches so the slicing cannot exceed dsts.end().
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`:
- Around line 584-590: The explicit torch.cuda.synchronize() call before
dist.broadcast on the receiving path (around receive_buffer handling and the
broadcast call in distributed_streamer.py) introduces extra latency; either
remove the synchronize() to rely on NCCL/stream synchronization for
dist.broadcast or, if there is a specific async-write/read race you are
protecting, keep it but add a concise comment above the synchronize() explaining
the exact race and why a full device sync is required; update bookkeeping
(total_sync_time) accordingly if you remove the sync.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 59617c99-6b02-4c51-b142-af834abb38dc
📒 Files selected for processing (31)
.devcontainer/Dockerfilecpp/WORKSPACEcpp/cuda/cuda_mock/BUILDcpp/cuda/cuda_mock/libcuda.ldscriptcpp/cuda/cuda_mock/libcuda_mock.cccpp/mock/streamer-mock.cccpp/streamer/impl/batch/BUILDcpp/streamer/impl/batch/batch.cccpp/streamer/impl/batch/batch.hcpp/streamer/impl/batch/batch_cuda_test.cccpp/streamer/impl/batches/batches.cccpp/streamer/impl/batches/batches.hcpp/streamer/impl/cuda/BUILDcpp/streamer/impl/cuda/cuda_loader.cccpp/streamer/impl/cuda/cuda_loader.hcpp/streamer/impl/streamer/BUILDcpp/streamer/impl/streamer/streamer.cccpp/streamer/impl/streamer/streamer.hcpp/streamer/streamer.cccpp/streamer/streamer.hcpp/streamer/streamer_s3_test.cccpp/streamer/streamer_test.cccpp/third_party/cuda.BUILDpy/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.pypy/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.pypy/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.pypy/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pypy/runai_model_streamer/runai_model_streamer/file_streamer/tests/test_requests_iterator.pypy/runai_model_streamer/runai_model_streamer/libstreamer/__init__.pypy/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.pypy/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py
| // Retain the primary context for the detected device once per process. | ||
| // Worker threads make it current via cuCtxSetCurrent(ctx) on first use. | ||
| // The matching cuDevicePrimaryCtxRelease(device) is called in ~CudaDriver(). | ||
| auto cuDevicePrimaryCtxRetain = | ||
| load_sym<CUresult(*)(CUcontext *, CUdevice)>(handle, "cuDevicePrimaryCtxRetain"); | ||
| if (!cuDevicePrimaryCtxRetain || cuDevicePrimaryCtxRetain(&d.ctx, device) != CUDA_SUCCESS) | ||
| { | ||
| LOG(WARNING) << "[RunAI Streamer] Could not retain CUDA primary context for device " | ||
| << device << "; CUDA streaming disabled"; | ||
| return {}; | ||
| } | ||
| d.device = device; | ||
|
|
||
| LOG(INFO) << "[RunAI Streamer] CUDA driver loaded successfully"; | ||
| return d; | ||
| } | ||
|
|
||
| } // anonymous namespace | ||
|
|
||
| const CudaDriver * CudaDriver::get() | ||
| { | ||
| static const CudaDriver driver = load(); | ||
| static const bool available = driver.cuStreamCreate != nullptr; | ||
| return available ? &driver : nullptr; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate the CUDA loader file and understand the full context
find . -type f -name "cuda_loader.cc" | head -5Repository: run-ai/runai-model-streamer
Length of output: 110
🏁 Script executed:
# Find the Python files mentioned in the review
find . -type f -name "file_streamer.py" -o -name "libstreamer.py" | head -10Repository: run-ai/runai-model-streamer
Length of output: 222
🏁 Script executed:
# Get the full context of the cuda_loader.cc file to see load() function
if [ -f "cpp/streamer/impl/cuda/cuda_loader.cc" ]; then
wc -l cpp/streamer/impl/cuda/cuda_loader.cc
fiRepository: run-ai/runai-model-streamer
Length of output: 112
🏁 Script executed:
# Read the full cuda_loader.cc file
cat -n cpp/streamer/impl/cuda/cuda_loader.ccRepository: run-ai/runai-model-streamer
Length of output: 5444
🏁 Script executed:
# Read file_streamer.py
cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.pyRepository: run-ai/runai-model-streamer
Length of output: 9328
🏁 Script executed:
# Read libstreamer.py
cat -n py/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.pyRepository: run-ai/runai-model-streamer
Length of output: 5783
🏁 Script executed:
# Verify the native function signature by checking header file
find . -type f -name "cuda_loader.h" | head -1 | xargs cat -n 2>/dev/null || echo "Header not found, checking for streamer interface"Repository: run-ai/runai-model-streamer
Length of output: 2086
🏁 Script executed:
# Check for any additional CUDA device handling or context management elsewhere
rg -i "cudevice|cucontext" --type cpp -A 2 -B 2 | head -100Repository: run-ai/runai-model-streamer
Length of output: 7546
🏁 Script executed:
# Verify that no device ordinal is passed through the entire stack by checking
# the C++ streamer implementation
find . -type f -name "*.cc" -path "*/streamer/impl/*" | grep -v test | head -10Repository: run-ai/runai-model-streamer
Length of output: 466
🏁 Script executed:
# Look for where cuMemcpyHtoDAsync is actually called to understand the context issue
rg "cuMemcpyHtoDAsync" --type cpp -B 5 -A 5Repository: run-ai/runai-model-streamer
Length of output: 6424
🏁 Script executed:
# Look at the batch.cc file more closely to understand the cuMemcpyHtoDAsync usage
cat -n cpp/streamer/impl/batch/batch.cc | head -150Repository: run-ai/runai-model-streamer
Length of output: 5586
🏁 Script executed:
# Search for g_cuda_staging definition and context setup
rg "g_cuda_staging" --type cpp -B 3 -A 3Repository: run-ai/runai-model-streamer
Length of output: 1568
🏁 Script executed:
# Check if there are any device-specific CUDA operations or per-device context management
rg "cuCtxSetCurrent|cuda::|device.*cuda|CUdevice" --type cpp | head -30Repository: run-ai/runai-model-streamer
Length of output: 3110
🏁 Script executed:
# Check the full read_cuda implementation to see context switching
rg "read_cuda" --type cpp -A 30 cpp/streamer/impl/batch/batch.ccRepository: run-ai/runai-model-streamer
Length of output: 2176
🏁 Script executed:
# Look for any per-device caching or alternative device context paths
rg "cuda::" --type cpp -B 2 -A 2 | head -50Repository: run-ai/runai-model-streamer
Length of output: 2881
Singleton CUDA context doesn't support multi-device scheduling within a process.
load() retains one primary context for the device current at process startup, and get() caches it process-wide as a static singleton. The Python layer accepts per-request device ordinals (e.g., device="cuda:1"), but converts them to a boolean and passes only cuda=True/False to native code. The C++ batch reader gets the singleton driver and calls cuMemcpyHtoDAsync() on every stream, making it impossible to switch contexts when a later request targets a different GPU. This can cause GPU pointers allocated on device 1 to be written via the retained context for device 0, resulting in data corruption or crashes.
Pass the device ordinal through the native ABI (runai_request) and either maintain a per-device context cache in C++ or explicitly call cuCtxSetCurrent() for each stream's target device.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cpp/streamer/impl/cuda/cuda_loader.cc` around lines 95 - 119, The loader
currently retains a single primary context in load() via
cuDevicePrimaryCtxRetain and returns a process-wide singleton from
CudaDriver::get(), which prevents per-request device switching and causes
cuMemcpyHtoDAsync to operate against the wrong device; update the native ABI to
accept a device ordinal from runai_request and implement per-device context
handling by either (a) caching a map of device -> CUcontext and ensuring
cuCtxSetCurrent(ctx) is called before any stream operations (e.g., before
cuMemcpyHtoDAsync) or (b) calling cuDevicePrimaryCtxRetain/cuCtxSetCurrent
on-demand for the requested device, and ensure CudaDriver::get()/load() no
longer exposes a single fixed device context so each stream operation sets the
correct current context.
| new_local_repository( | ||
| name = "cuda", | ||
| path = "/usr/local/cuda", | ||
| build_file = "//third_party:cuda.BUILD", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Checking hard-coded CUDA repository path in WORKSPACE:"
rg -n 'new_local_repository|path\s*=\s*"/usr/local/cuda"' cpp/WORKSPACE -n -C2
echo
echo "Checking whether current environment has /usr/local/cuda:"
if [ -d /usr/local/cuda ]; then
echo "FOUND: /usr/local/cuda"
else
echo "MISSING: /usr/local/cuda"
fi
echo
echo "Checking for alternate CUDA path configuration hooks in repo:"
rg -n 'CUDA_HOME|CUDA_PATH|cuda.*path|new_local_repository\(.*cuda' cpp -SRepository: run-ai/runai-model-streamer
Length of output: 856
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Checking for .bazelrc or Bazel configuration files ==="
find cpp -name ".bazelrc*" -o -name "*.bazelrc" 2>/dev/null | head -20
echo
echo "=== Checking WORKSPACE file for environment variable usage ==="
head -20 cpp/WORKSPACE
echo
echo "=== Searching for documentation mentioning CUDA or build setup ==="
find . -type f \( -name "README*" -o -name "CONTRIBUTING*" -o -name "BUILD*" -o -name "*.md" \) -not -path "./.git/*" | xargs grep -l -i "cuda\|build.*setup\|installation" 2>/dev/null | head -10
echo
echo "=== Checking third_party/cuda.BUILD to understand dependency structure ==="
if [ -f cpp/third_party/cuda.BUILD ]; then
cat cpp/third_party/cuda.BUILD
fi
echo
echo "=== Checking if there are any repository override patterns ==="
rg -n "repository_override|local_repository|override_repository|repository_cache" cpp -A2Repository: run-ai/runai-model-streamer
Length of output: 2169
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Reading cpp/.bazelrc ==="
cat cpp/.bazelrc
echo
echo "=== Reading installation.md for CUDA setup instructions ==="
head -100 docs/src/installation.md
echo
echo "=== Reading README.md for build/CUDA references ==="
grep -i -C5 "cuda\|build\|install" README.md | head -50
echo
echo "=== Checking if toolchain/configure.bzl handles CUDA path ==="
if [ -f cpp/toolchain/configure.bzl ]; then
cat cpp/toolchain/configure.bzl
fiRepository: run-ai/runai-model-streamer
Length of output: 6076
Hard-coded CUDA path lacks override mechanism for different environments.
Line 9 hard-codes /usr/local/cuda with no programmatic way to customize it. While the comment acknowledges this ("Adjust the path if your CUDA installation is in a different location"), there's no environment variable, .bazelrc option, or documented setup procedure to support non-standard CUDA installations. Users must manually edit WORKSPACE, which breaks reproducibility and complicates CI/CD pipelines. Even with runtime dlopen for libcuda.so, the headers-only dependency still requires the path to exist at build time.
Provide a mechanism for CUDA path customization (e.g., via environment variable CUDA_HOME, .bazelrc override, or startup script).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cpp/WORKSPACE` around lines 7 - 11, Replace the hard-coded
new_local_repository("cuda", path="/usr/local/cuda",
build_file="//third_party:cuda.BUILD") with a configurable approach: implement a
small repository rule (or wrapper in WORKSPACE) that looks for an environment
variable (e.g., CUDA_HOME) and uses that value as the path, falling back to
"/usr/local/cuda" if unset; alternatively document and demonstrate using Bazel's
override mechanism (e.g., --override_repository=cuda=/your/cuda/path) and add a
.bazelrc example so callers can override the "cuda" local repository without
editing WORKSPACE; update the repository declaration referenced by name="cuda"
and build_file="//third_party:cuda.BUILD" and add a note in README or comments
explaining the CUDA_HOME and --override_repository options.
| cuda_mock = self._load_cuda_mock() | ||
| if cuda_mock is None: | ||
| self.skipTest("Mock libcuda.so.1 not available — set LD_LIBRARY_PATH to the mock build directory.") | ||
|
|
||
| file_specs = [{"size": 2580, "chunks": [500, 260, 260, 260, 260, 260, 260, 260, 260]}] | ||
| requests = self._prepare_file_requests(file_specs) | ||
| original_data_map = {} | ||
| for req in requests: | ||
| with open(req.path, "rb") as f: | ||
| original_data_map[req.id] = f.read() | ||
|
|
||
| cuda_mock.reset_cuMemcpyHtoDAsync_call_count() | ||
|
|
||
| # Redirect torch.empty(device="cuda:X") to CPU so tests run without a GPU. | ||
| _orig_empty = torch.empty | ||
| def _cpu_empty(*args, **kwargs): | ||
| if str(kwargs.get("device", "")).startswith("cuda"): | ||
| kwargs = {**kwargs, "device": "cpu"} | ||
| return _orig_empty(*args, **kwargs) | ||
|
|
||
| reconstructed_data_map = {req.id: [None] * len(req.chunks) for req in requests} | ||
| env_vars = {"RUNAI_STREAMER_DIST": "1", "RUNAI_STREAMER_DIST_BUFFER_MIN_BYTESIZE": "0"} | ||
| with patch.dict(os.environ, env_vars), \ | ||
| patch("torch.cuda.is_available", return_value=True), \ | ||
| patch("torch.empty", side_effect=_cpu_empty), \ | ||
| patch("torch.cuda.mem_get_info", return_value=(2**33, 2**33)), \ | ||
| patch("torch.cuda.synchronize"): | ||
| with DistributedStreamer() as streamer: | ||
| # Bypass the gloo+non-cpu backend check in set_is_distributed so | ||
| # distributed streaming is active even with device="cuda:0". | ||
| def _force_distributed(is_distributed, device): | ||
| streamer.is_distributed = ( | ||
| is_distributed | ||
| and dist.is_initialized() | ||
| and dist.get_world_size() > 1 | ||
| ) | ||
| with patch.object(streamer, "set_is_distributed", side_effect=_force_distributed): | ||
| streamer.stream_files(requests, None, "cuda:0", True) | ||
| for req_id, chunk_idx, data_tensor in streamer.get_chunks(): | ||
| reconstructed_data_map[req_id][chunk_idx] = data_tensor.cpu().numpy().tobytes() | ||
|
|
||
| call_count = cuda_mock.get_cuMemcpyHtoDAsync_call_count() | ||
| self.assertGreater( | ||
| call_count, 0, | ||
| f"Rank {self.rank}: expected cuMemcpyHtoDAsync to be called but count was {call_count}" | ||
| ) |
There was a problem hiding this comment.
Don't require every rank to issue a CUDA copy.
This assertion only holds when each rank owns at least one source chunk. With the fixed 9-chunk fixture, larger test runs can legitimately leave some ranks as pure receivers, so call_count stays 0 even though the CUDA-direct path is working. Consider asserting on an all-reduced sum, or skip when world_size exceeds the total chunk count.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/tests/dist_test_distributed_streamer.py`
around lines 391 - 436, The test incorrectly asserts that the local
cuda_mock.get_cuMemcpyHtoDAsync_call_count() (call_count) is > 0 on every rank;
instead compute total_chunks = sum(len(req.chunks) for req in requests) and if
dist.get_world_size() > total_chunks skip or avoid the per-rank assertion,
otherwise perform an all-reduce of the local call_count (use dist.all_reduce on
a tensor/int to produce global_call_count) and assert global_call_count > 0;
update the assertion logic that references call_count and
cuda_mock.get_cuMemcpyHtoDAsync_call_count() accordingly so pure-receiver ranks
don’t cause false failures.
| self.device_str = device | ||
| # AMD ROCm also reports "cuda" devices but has no libcuda.so — only enable the | ||
| # direct-to-GPU C++ path for NVIDIA (torch.version.hip is set on ROCm builds). | ||
| self._is_nvidia_cuda = ( | ||
| device is not None | ||
| and device.startswith("cuda") | ||
| and torch.version.hip is None | ||
| ) | ||
|
|
||
| for file_stream_request in file_stream_requests: | ||
| self.total_size += sum(file_stream_request.chunks) | ||
| file_stream_request.path = self.handle_object_store(file_stream_request.path, credentials) | ||
|
|
||
| self.requests_iterator: FilesRequestsIteratorWithBuffer = FilesRequestsIteratorWithBuffer.with_memory_mode(file_stream_requests) | ||
|
|
||
| self.requests_iterator: FilesRequestsIteratorWithBuffer = FilesRequestsIteratorWithBuffer.with_memory_mode( | ||
| file_stream_requests, device=device | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate the file and read the context around lines 97-112
fd "file_streamer.py" | head -5Repository: run-ai/runai-model-streamer
Length of output: 269
🏁 Script executed:
# Find and read the relevant section
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" | sed -n '90,120p'Repository: run-ai/runai-model-streamer
Length of output: 1698
🏁 Script executed:
# Get the full file to understand the broader context
wc -l "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py"Repository: run-ai/runai-model-streamer
Length of output: 150
🏁 Script executed:
# Search for with_memory_mode implementation
rg "with_memory_mode" --type py -A 5Repository: run-ai/runai-model-streamer
Length of output: 1537
🏁 Script executed:
# Search for torch.cuda.is_available usage
rg "torch\.cuda\.is_available" --type pyRepository: run-ai/runai-model-streamer
Length of output: 907
🏁 Script executed:
# Look for remote path checks (s3, gs, azure)
rg "is_s3_path|is_gs_path|is_azure_path" --type py -B 2 -A 2Repository: run-ai/runai-model-streamer
Length of output: 9045
🏁 Script executed:
# Read the with_memory_mode implementation in full
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py" | sed -n '1,150p'Repository: run-ai/runai-model-streamer
Length of output: 7911
🏁 Script executed:
# Search for with_memory_mode method signature and implementation
ast-grep --pattern $'def with_memory_mode($$$) {
$$$
}'Repository: run-ai/runai-model-streamer
Length of output: 53
🏁 Script executed:
# Get more context around torch.cuda.is_available in requests_iterator
rg "with_memory_mode" --type py -B 2 -A 20Repository: run-ai/runai-model-streamer
Length of output: 5615
🏁 Script executed:
# Check homogeneous_paths implementation
rg "def homogeneous_paths" --type py -A 10Repository: run-ai/runai-model-streamer
Length of output: 1254
🏁 Script executed:
# Check what happens in runai_request with cuda parameter
rg "def runai_request" --type py -A 10Repository: run-ai/runai-model-streamer
Length of output: 1198
🏁 Script executed:
# Check if handle_object_store modifies device or returns info about remote paths
rg "def handle_object_store" --type py -A 10Repository: run-ai/runai-model-streamer
Length of output: 1395
🏁 Script executed:
# Check if with_memory_cap is called and what it does
rg "with_memory_cap" --type py -B 2 -A 15Repository: run-ai/runai-model-streamer
Length of output: 29960
🏁 Script executed:
# Check the full file_streamer.py around lines 88-130 for full context
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" | sed -n '85,130p'Repository: run-ai/runai-model-streamer
Length of output: 2387
🏁 Script executed:
# Check if there are any tests that validate the CUDA availability check
rg "torch.cuda.is_available" --type py -B 5 -A 5Repository: run-ai/runai-model-streamer
Length of output: 8159
🏁 Script executed:
# Verify if device parameter is ever validated or conditioned before passing to iterator
rg "device.*=" "py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py" -B 2 -A 2Repository: run-ai/runai-model-streamer
Length of output: 1319
🏁 Script executed:
# Check if there's any validation of torch.cuda.is_available before the with_memory_mode call
rg "stream_files" --type py -A 30 | grep -A 30 "def stream_files"Repository: run-ai/runai-model-streamer
Length of output: 19773
🏁 Script executed:
# Verify what happens if torch.cuda.is_available() returns False with cuda device
cat -n "py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py" | sed -n '74,110p'Repository: run-ai/runai-model-streamer
Length of output: 2526
Gate CUDA-direct mode before creating the request buffer.
The iterator receives the CUDA device unconditionally whenever the caller passes cuda:*, even for remote paths (s3://, gs://, az://) or when torch.cuda.is_available() returns False. This allocates a CUDA buffer and later drives runai_request(..., cuda=True) instead of falling back to CPU as documented. Compute a use_cuda_direct flag that also checks torch.cuda.is_available() and validates all input paths are local, then pass "cpu" to the iterator when this flag is False.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py`
around lines 97 - 112, The code currently passes the original device (e.g.,
"cuda:0") into FilesRequestsIteratorWithBuffer.with_memory_mode unconditionally,
causing CUDA buffers to be allocated even when torch.cuda.is_available() is
False or when any request path is remote; fix it by computing a use_cuda_direct
boolean that requires _is_nvidia_cuda (or device startswith "cuda" and
torch.version.hip is None), torch.cuda.is_available(), and that all
file_stream_requests paths are local (no s3://, gs://, az://), then pass device
if use_cuda_direct is True otherwise pass "cpu" into
FilesRequestsIteratorWithBuffer.with_memory_mode; update references in this
block (device_str, _is_nvidia_cuda, file_stream_requests,
FilesRequestsIteratorWithBuffer.with_memory_mode) and ensure handle_object_store
is still called before the local-path check so paths reflect any translation.
| if self._is_nvidia_cuda: | ||
| # Buffer is on GPU: drain all CUDA streams before reusing it for new | ||
| # writes. Guards against two classes of async operations still in flight: | ||
| # 1. NCCL broadcasts reading from the yielded tensor slices. | ||
| # 2. Async user-application ops (e.g. sharding) on those slices. | ||
| import time | ||
| _t0 = time.perf_counter() | ||
| torch.cuda.synchronize() | ||
| _sync_ms = (time.perf_counter() - _t0) * 1000 | ||
| logger.info(f"[RunAI Streamer] cuda synchronize in next_request took {_sync_ms:.1f} ms") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /tmp && find . -type f -name "requests_iterator.py" 2>/dev/null | head -5Repository: run-ai/runai-model-streamer
Length of output: 53
🏁 Script executed:
cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -150Repository: run-ai/runai-model-streamer
Length of output: 7911
🏁 Script executed:
cat -n py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | sed -n '100,200p'Repository: run-ai/runai-model-streamer
Length of output: 6241
🏁 Script executed:
# Search for class definition and device-related code
rg -n "class FilesRequestsIteratorWithBuffer|self\._device|cuda:.*device|torch\.cuda\.synchronize" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pyRepository: run-ai/runai-model-streamer
Length of output: 153
🏁 Script executed:
# Check __init__ method to see how device is initialized
rg -A 30 "def __init__" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -60Repository: run-ai/runai-model-streamer
Length of output: 3059
🏁 Script executed:
# Look for buffer allocation and device context
rg -B 5 -A 5 "torch.cuda.synchronize|with torch.cuda.device|current_device|cuda.current_device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pyRepository: run-ai/runai-model-streamer
Length of output: 704
🏁 Script executed:
# Check if device is stored anywhere in the class
rg -n "self\.device|self\._device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pyRepository: run-ai/runai-model-streamer
Length of output: 53
🏁 Script executed:
# Verify buffer.device access pattern in the code
rg -n "\.device|device" py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py | head -20Repository: run-ai/runai-model-streamer
Length of output: 1152
🏁 Script executed:
# Check if there's device context management anywhere
rg -n "torch.cuda.set_device|with torch.cuda.device|current_device" py/runai_model_streamer/runai_model_streamer/file_streamer/Repository: run-ai/runai-model-streamer
Length of output: 53
Synchronize the buffer's device, not the current device.
torch.cuda.synchronize() without a device argument waits on torch.cuda.current_device(), but the buffer is created on an explicit device passed to __init__ (line 93, 97). If the current device differs from the buffer's device, next_request() can reuse the buffer while work on the actual destination device is still in flight, causing data races. Use torch.cuda.synchronize(device=self.buffer.device) instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 126 - 135, next_request() currently calls torch.cuda.synchronize()
which synchronizes the current CUDA device, but the buffer device is set in
__init__ (see self.buffer and self._is_nvidia_cuda); change the synchronization
to target the buffer's device to avoid races by calling
torch.cuda.synchronize(device=self.buffer.device) (or the equivalent device
spec) when self._is_nvidia_cuda is true and keep the logging of the elapsed
time; ensure you reference self.buffer.device (or the same attribute used to
store the explicit device in __init__) rather than relying on the current CUDA
device.
| # Compute alignment for CUDA devices (default 256 bytes, disabled on CPU). | ||
| alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1 |
There was a problem hiding this comment.
Comment mentions 256 bytes but default is 512.
The inline comment states "default 256 bytes" but get_cuda_alignment() in requests_iterator.py returns DEFAULT_CUDA_ALIGNMENT which is 512 bytes according to the snippet. Update the comment to match the actual default.
📝 Proposed fix
- # Compute alignment for CUDA devices (default 256 bytes, disabled on CPU).
+ # Compute alignment for CUDA devices (default 512 bytes, disabled on CPU).📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Compute alignment for CUDA devices (default 256 bytes, disabled on CPU). | |
| alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1 | |
| # Compute alignment for CUDA devices (default 512 bytes, disabled on CPU). | |
| alignment = get_cuda_alignment() if (device and device.startswith("cuda")) else 1 |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py`
around lines 229 - 230, Update the inline comment above the alignment assignment
to reflect the correct default CUDA alignment: replace "default 256 bytes" with
"default 512 bytes" so it matches get_cuda_alignment() and
DEFAULT_CUDA_ALIGNMENT; this affects the comment near the assignment to
alignment which uses get_cuda_alignment() when device and
device.startswith("cuda").
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
cpp/streamer/impl/cuda/cuda_loader.cc (1)
96-119:⚠️ Potential issue | 🔴 CriticalProcess-wide singleton context still cannot safely handle mixed-device requests.
Line 117 caches a single
CudaDriver(ctx/device) per process. If later requests in the same process target another GPU, CUDA ops (e.g., async HtoD copies) can run under the wrong current context, which can corrupt data or crash.Please pass device ordinal through the native ABI and switch/select per-device context before stream/copy operations, instead of binding context once at first
get().#!/bin/bash set -euo pipefail # 1) Confirm singleton + single ctx/device model. rg -n -C2 'static const CudaDriver driver|CudaDriver::get\(|CUcontext ctx|CUdevice device' \ cpp/streamer/impl/cuda/cuda_loader.cc cpp/streamer/impl/cuda/cuda_loader.h # 2) Check whether native ABI carries only boolean CUDA flag (no device ordinal). rg -n -C3 'runai_request|bool[[:space:]]+cuda|read_cuda' cpp py # 3) Check Python call-sites where device info may be reduced before native call. rg -n -C3 'runai_request\(|device=.*cuda|cuda=True|cuda=False' py🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cpp/streamer/impl/cuda/cuda_loader.cc` around lines 96 - 119, The code currently caches a single process-wide CudaDriver (loaded in load() and returned by CudaDriver::get) that retains one CUcontext/CUdevice via cuDevicePrimaryCtxRetain, which breaks when different device ordinals are later requested; change the design to accept a device ordinal through the native ABI and select/switch to the corresponding per-device context before any stream/copy operations instead of relying on the singleton ctx/device in CudaDriver::get: add a per-device lookup (or map) keyed by device ordinal, ensure cuDevicePrimaryCtxRetain/cuDevicePrimaryCtxRelease are called per-device, and update call sites to pass the device ordinal so functions that use d.ctx/d.device always set the current context (e.g., via cuCtxSetCurrent) for that requested device before performing CUDA work.py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (1)
127-135:⚠️ Potential issue | 🟠 MajorSynchronize the buffer’s CUDA device explicitly before reuse.
At Line 133,
torch.cuda.synchronize()targets the current device, not necessarilyself.buffer.device. This can still race if devices diverge. Usetorch.cuda.synchronize(device=self.buffer.device).#!/bin/bash # Verify synchronize call site and whether explicit device is used. rg -n -C2 'torch\.cuda\.synchronize\(' py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py # Verify buffer device is explicitly allocated from constructor device. rg -n -C3 'self\.buffer\s*=\s*torch\.empty|device=device' py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pyExpected result: the synchronize call should include
device=self.buffer.devicein this method.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py` around lines 127 - 135, The cuda synchronization currently calls torch.cuda.synchronize() which synchronizes the current device and can race if the buffer lives on another device; update the sync in the next_request path (guarded by self._is_nvidia_cuda) to call torch.cuda.synchronize(device=self.buffer.device) (or the integer/device value from self.buffer.device) so the buffer's CUDA device is synchronized before reuse and keep the existing timing/logging (logger.info) around that call.
🧹 Nitpick comments (1)
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (1)
42-45: Renameidparameter to avoid builtin shadowing.Line 42 shadows Python’s builtin
id, which is flagged by Ruff (A002). Rename tofile_idfor clarity and lint cleanliness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py` around lines 42 - 45, The __init__ method currently uses a parameter named id which shadows Python's builtin; rename the parameter to file_id and update the assignment to self.file_id (and any other references inside the class: methods, attributes, or property accessors) to avoid builtin shadowing. Change the constructor signature in RequestsIterator.__init__ (and any instantiation sites that pass id to this constructor) to use file_id, and run tests/linter to ensure all usages (including imports or calls to RequestsIterator(...)) are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Line 190: The lint error is caused by an unnecessary f-string prefix on the
literal f"MemoryCapMode is Limited, but no limit supplied" in
requests_iterator.py; remove the leading f to make it a plain string literal
(i.e., replace the f-prefixed message used where the code emits this
error/warning) so Ruff F541 is satisfied while preserving the original message
and usage.
- Around line 27-30: The get_cuda_alignment function currently converts
RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR directly to int which will raise
ValueError for non-integer env values; update get_cuda_alignment to safely parse
the env var inside a try/except (catch ValueError and TypeError), falling back
to DEFAULT_CUDA_ALIGNMENT when parsing fails or the env var is missing, then
keep the existing logic that returns val if val > 1 else 1; reference
get_cuda_alignment, RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, and
DEFAULT_CUDA_ALIGNMENT when making the change.
- Around line 175-199: The cap-by-VRAM branch can reduce memory_limit below the
largest per-file stride causing empty requests and silent termination; after
computing memory_limit (in the MemoryCapMode.unlimited and MemoryCapMode.limited
branches where free_cuda_memory is applied), compare memory_limit against the
largest_chunk (compute largest_chunk = max(max(file_chunks.effective_strides,
default=0) for file_chunks in files_chunks)) and if memory_limit < largest_chunk
raise RunaiStreamerMemoryLimitException with a clear message; update the logic
around MemoryCapMode.unlimited and MemoryCapMode.limited to perform this
post-cap guard so callers aren’t left with unusable tiny buffers.
---
Duplicate comments:
In `@cpp/streamer/impl/cuda/cuda_loader.cc`:
- Around line 96-119: The code currently caches a single process-wide CudaDriver
(loaded in load() and returned by CudaDriver::get) that retains one
CUcontext/CUdevice via cuDevicePrimaryCtxRetain, which breaks when different
device ordinals are later requested; change the design to accept a device
ordinal through the native ABI and select/switch to the corresponding per-device
context before any stream/copy operations instead of relying on the singleton
ctx/device in CudaDriver::get: add a per-device lookup (or map) keyed by device
ordinal, ensure cuDevicePrimaryCtxRetain/cuDevicePrimaryCtxRelease are called
per-device, and update call sites to pass the device ordinal so functions that
use d.ctx/d.device always set the current context (e.g., via cuCtxSetCurrent)
for that requested device before performing CUDA work.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 127-135: The cuda synchronization currently calls
torch.cuda.synchronize() which synchronizes the current device and can race if
the buffer lives on another device; update the sync in the next_request path
(guarded by self._is_nvidia_cuda) to call
torch.cuda.synchronize(device=self.buffer.device) (or the integer/device value
from self.buffer.device) so the buffer's CUDA device is synchronized before
reuse and keep the existing timing/logging (logger.info) around that call.
---
Nitpick comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 42-45: The __init__ method currently uses a parameter named id
which shadows Python's builtin; rename the parameter to file_id and update the
assignment to self.file_id (and any other references inside the class: methods,
attributes, or property accessors) to avoid builtin shadowing. Change the
constructor signature in RequestsIterator.__init__ (and any instantiation sites
that pass id to this constructor) to use file_id, and run tests/linter to ensure
all usages (including imports or calls to RequestsIterator(...)) are updated
accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 918af71a-3c14-49f3-bd9c-09f914ed0a6c
📒 Files selected for processing (5)
cpp/streamer/impl/batch/batch.cccpp/streamer/impl/cuda/cuda_loader.cccpp/streamer/impl/cuda/cuda_loader.hpy/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pypy/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py
🚧 Files skipped from review as they are similar to previous changes (2)
- py/runai_model_streamer/runai_model_streamer/safetensors_streamer/safetensors_streamer.py
- cpp/streamer/impl/batch/batch.cc
| def get_cuda_alignment() -> int: | ||
| """Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable.""" | ||
| val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))) | ||
| return val if val > 1 else 1 |
There was a problem hiding this comment.
Guard invalid CUDA alignment env values to avoid init-time crashes.
At Line 29, a non-integer RUNAI_STREAMER_CUDA_ALIGNMENT will raise ValueError and abort streaming setup. Prefer safe parsing with fallback.
Suggested fix
def get_cuda_alignment() -> int:
"""Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable."""
- val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT)))
+ raw = os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))
+ try:
+ val = int(raw)
+ except (TypeError, ValueError):
+ logger.warning(
+ f"[RunAI Streamer] Invalid {RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR}={raw!r}; "
+ f"falling back to {DEFAULT_CUDA_ALIGNMENT}"
+ )
+ val = DEFAULT_CUDA_ALIGNMENT
return val if val > 1 else 1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 27 - 30, The get_cuda_alignment function currently converts
RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR directly to int which will raise
ValueError for non-integer env values; update get_cuda_alignment to safely parse
the env var inside a try/except (catch ValueError and TypeError), falling back
to DEFAULT_CUDA_ALIGNMENT when parsing fails or the env var is missing, then
keep the existing logic that returns val if val > 1 else 1; reference
get_cuda_alignment, RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, and
DEFAULT_CUDA_ALIGNMENT when making the change.
| if memory_mode == MemoryCapMode.unlimited: | ||
| memory_limit = sum(sum(file.chunks) for file in files_chunks) | ||
| # Use padded strides for the total size so the buffer is large enough. | ||
| total_size = sum(sum(file.effective_strides) for file in files_chunks) | ||
| if free_cuda_memory is not None: | ||
| # For NVIDIA CUDA, cap the buffer at available GPU memory so that we | ||
| # don't OOM during allocation. If the model is larger than free VRAM, | ||
| # the buffer is reused in chunks just like the limited-memory path. | ||
| memory_limit = min(total_size, free_cuda_memory) | ||
| else: | ||
| memory_limit = total_size | ||
| elif memory_mode == MemoryCapMode.largest_chunk: | ||
| memory_limit = max(max(file_chunks.chunks) for file_chunks in files_chunks) | ||
| memory_limit = max(max(file_chunks.effective_strides) for file_chunks in files_chunks) | ||
| elif memory_mode == MemoryCapMode.limited: | ||
| if user_memory_limit is None: | ||
| raise RunaiStreamerMemoryLimitException( | ||
| f"MemoryCapMode is Limited, but no limit supplied" | ||
| ) | ||
| largest_chunk = max((max(file_chunks.chunks, default=0) for file_chunks in files_chunks), default=0) | ||
| largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0) | ||
| if user_memory_limit < largest_chunk: | ||
| raise RunaiStreamerMemoryLimitException( | ||
| f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}" | ||
| ) | ||
| memory_limit = min(user_memory_limit, sum(sum(file.chunks) for file in files_chunks)) | ||
|
|
||
| return FilesRequestsIteratorWithBuffer(memory_limit, files_chunks) | ||
| memory_limit = min(user_memory_limit, sum(sum(file.effective_strides) for file in files_chunks)) | ||
| if free_cuda_memory is not None: | ||
| memory_limit = min(memory_limit, free_cuda_memory) |
There was a problem hiding this comment.
Prevent silent no-progress when CUDA cap falls below largest chunk stride.
After capping by free VRAM, memory_limit may become smaller than the largest effective_stride, causing empty requests and early termination. Add a post-cap guard and raise RunaiStreamerMemoryLimitException explicitly.
Suggested fix
def with_memory_cap(
memory_mode: MemoryCapMode,
files_chunks: List[FileChunks],
user_memory_limit: Optional[int] = None,
device: str = "cpu",
) -> FilesRequestsIteratorWithBuffer:
memory_limit = 0
+ largest_stride = max(
+ (max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks),
+ default=0,
+ )
is_nvidia_cuda = (
device is not None
and device.startswith("cuda")
and torch.version.hip is None
)
@@
elif memory_mode == MemoryCapMode.largest_chunk:
- memory_limit = max(max(file_chunks.effective_strides) for file_chunks in files_chunks)
+ memory_limit = largest_stride
@@
- largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0)
- if user_memory_limit < largest_chunk:
+ if user_memory_limit < largest_stride:
raise RunaiStreamerMemoryLimitException(
- f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}"
+ f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_stride}"
)
memory_limit = min(user_memory_limit, sum(sum(file.effective_strides) for file in files_chunks))
if free_cuda_memory is not None:
memory_limit = min(memory_limit, free_cuda_memory)
+
+ if memory_limit < largest_stride:
+ raise RunaiStreamerMemoryLimitException(
+ f"Effective memory limit {memory_limit} is smaller than largest chunk stride {largest_stride}"
+ )🧰 Tools
🪛 Ruff (0.15.6)
[error] 190-190: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 175 - 199, The cap-by-VRAM branch can reduce memory_limit below the
largest per-file stride causing empty requests and silent termination; after
computing memory_limit (in the MemoryCapMode.unlimited and MemoryCapMode.limited
branches where free_cuda_memory is applied), compare memory_limit against the
largest_chunk (compute largest_chunk = max(max(file_chunks.effective_strides,
default=0) for file_chunks in files_chunks)) and if memory_limit < largest_chunk
raise RunaiStreamerMemoryLimitException with a clear message; update the logic
around MemoryCapMode.unlimited and MemoryCapMode.limited to perform this
post-cap guard so callers aren’t left with unusable tiny buffers.
| elif memory_mode == MemoryCapMode.limited: | ||
| if user_memory_limit is None: | ||
| raise RunaiStreamerMemoryLimitException( | ||
| f"MemoryCapMode is Limited, but no limit supplied" |
There was a problem hiding this comment.
Remove extraneous f-string prefix to satisfy linting.
Line 190 uses f"..." without placeholders (Ruff F541). Use a plain string literal.
Suggested fix
- f"MemoryCapMode is Limited, but no limit supplied"
+ "MemoryCapMode is Limited, but no limit supplied"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| f"MemoryCapMode is Limited, but no limit supplied" | |
| "MemoryCapMode is Limited, but no limit supplied" |
🧰 Tools
🪛 Ruff (0.15.6)
[error] 190-190: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
at line 190, The lint error is caused by an unnecessary f-string prefix on the
literal f"MemoryCapMode is Limited, but no limit supplied" in
requests_iterator.py; remove the leading f to make it a plain string literal
(i.e., replace the f-prefixed message used where the code emits this
error/warning) so Ruff F541 is satisfied while preserving the original message
and usage.
| dist.broadcast(gpu_tensor, global_broadcasting_rank, group=self.distribution_group) | ||
| total_data_bcast_time += timer() - t0 | ||
| chunks_to_read[0] -= 1 | ||
| total_broadcast_chunks += 1 | ||
| _t_yield = timer() | ||
| yield orig_req_idx, orig_chunk_idx, gpu_tensor | ||
| _dt = timer() - _t_yield | ||
| total_inter_yield_time += _dt | ||
| max_inter_yield_time = max(max_inter_yield_time, _dt) | ||
| else: | ||
| logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: No more tensors to broadcast") | ||
|
|
||
| else: | ||
| t0 = timer() | ||
| dist.broadcast(received_metadata_tensor, global_broadcasting_rank, group=self.distribution_group) | ||
| total_meta_bcast_time += timer() - t0 | ||
|
|
||
| t0 = timer() | ||
| count = received_metadata_tensor[0, 0].item() | ||
| total_meta_item_time += timer() - t0 | ||
| if count == 0: | ||
| logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: No tensors from rank {global_broadcasting_rank}") | ||
| continue | ||
|
|
||
| orig_req_idx = received_metadata_tensor[1, 0].item() | ||
| orig_chunk_idx = received_metadata_tensor[1, 1].item() | ||
| chunk_size = received_metadata_tensor[1, 2].item() | ||
|
|
||
| logger.debug(f"[RunAI Streamer][Distributed] Rank {self.original_group_rank}: Receiving tensor {orig_req_idx}:{orig_chunk_idx} size {humanize.naturalsize(chunk_size)} from rank {global_broadcasting_rank}") | ||
|
|
||
| received_view = receive_buffer[:chunk_size] | ||
| t0 = timer() | ||
| torch.cuda.synchronize() | ||
| total_sync_time += timer() - t0 | ||
| t0 = timer() | ||
| dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group) |
There was a problem hiding this comment.
Tensor shape mismatch breaks
dist.broadcast between broadcasting and receiving ranks
dist.broadcast requires every participating rank to have a tensor with the same shape. The broadcasting rank sends gpu_tensor whose shape is (1, N) (because file_streamer._get_chunks_cuda() yields chunk_tensor.view(1, -1)), but receiving ranks allocate receive_buffer as a flat 1-D tensor and pass receive_buffer[:chunk_size] with shape (N,). NCCL/Gloo will reject the broadcast at runtime due to this shape mismatch.
The simplest fix is to reshape the receive slice to match:
# Before the broadcast on the receiving rank:
received_view = receive_buffer[:chunk_size].view(1, -1) # shape (1, chunk_size)
torch.cuda.synchronize()
dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group)
chunks_to_read[0] -= 1
total_broadcast_chunks += 1
_t_yield = timer()
yield orig_req_idx, orig_chunk_idx, received_viewAlternatively, the view(1, -1) call in file_streamer._get_chunks_cuda() (line 157 of file_streamer.py) could be dropped so both sides use 1-D tensors, but that would change the shape contract seen by downstream consumers.
| // cancel responder in case of an error - cancelled response will not delay sending the next request | ||
| utils::ScopeGuard __responder_release([&](){_responder->cancel();}); | ||
|
|
||
| // When cuda=true and dsts has one entry per tensor (rather than one per file), | ||
| // pass only the first element to the Assigner so its size-check passes. | ||
| // The Assigner-computed destinations are overridden in Batches::build_tasks for CUDA. | ||
| const bool is_per_tensor_cuda = cuda && (dsts.size() > paths.size()); | ||
| std::vector<void*> assigner_dsts = is_per_tensor_cuda ? std::vector<void*>{dsts[0]} : dsts; | ||
|
|
||
| // divide reading between workers | ||
| Assigner assigner(paths, file_offsets, bytesizes, dsts, _config); | ||
| Assigner assigner(paths, file_offsets, bytesizes, assigner_dsts, _config); | ||
|
|
||
| std::vector<Workload> workloads(assigner.num_workloads()); | ||
|
|
||
| // Create batches for each file | ||
|
|
||
| size_t tensor_offset = 0; | ||
| for (size_t i = 0; i < paths.size(); ++i) | ||
| { | ||
| auto params = handle_s3(i, paths[i], credentials); | ||
| LOG(DEBUG) << "Creating batches for file index " << i << " path: " << paths[i]; | ||
| Batches batches(i, assigner.file_assignments(i), _config, _responder, paths[i], params, internal_sizes[i]); | ||
|
|
||
| // Slice the flat per-tensor dsts into a per-file vector for this file's Batches. | ||
| std::vector<void*> file_cuda_dsts; | ||
| if (is_per_tensor_cuda) | ||
| { | ||
| const size_t num_tensors = internal_sizes[i].size(); | ||
| file_cuda_dsts.assign(dsts.begin() + tensor_offset, dsts.begin() + tensor_offset + num_tensors); | ||
| tensor_offset += num_tensors; | ||
| } |
There was a problem hiding this comment.
Per-tensor CUDA destinations broken for multi-file requests where every file has exactly one tensor
The is_per_tensor_cuda guard uses dsts.size() > paths.size() to decide whether to treat dsts as a flat per-tensor pointer array, but this check returns false whenever every file has exactly one tensor (e.g. two single-weight files). In that case total_dsts == paths.size(), so:
assigner_dsts = dsts(unchanged, not truncated to{dsts[0]})file_cuda_dstsfor every file is left empty →_cuda_tensor_dsts.empty() == truebuild_tasksfalls back tocurrent_request_destination, which the Assigner derives fromdsts[0]advanced by actual (unpadded) byte sizes
Because cuda_tensor_ptrs are spaced by padded strides (align_up(size, alignment)), the Assigner's linear offset lands at dsts[0] + actual_size_0 instead of the correct dsts[1] = dsts[0] + aligned_stride_0. Every tensor in files 1…N-1 is written to the wrong GPU address, silently corrupting the model weights.
Reproducer: two safetensors files each containing a single tensor whose byte size is not a multiple of the alignment (e.g. 500 bytes with RUNAI_STREAMER_CUDA_ALIGNMENT=512).
The fix is to treat is_per_tensor_cuda as always true when cuda != 0, since the Python side always emits one pointer per tensor for CUDA requests:
// dsts always contains one pointer per tensor for CUDA requests.
const bool is_per_tensor_cuda = (cuda != 0);
std::vector<void*> assigner_dsts = is_per_tensor_cuda ? std::vector<void*>{dsts[0]} : dsts;and correspondingly always populate file_cuda_dsts for every file:
if (cuda != 0)
{
const size_t num_tensors = internal_sizes[i].size();
file_cuda_dsts.assign(dsts.begin() + tensor_offset,
dsts.begin() + tensor_offset + num_tensors);
tensor_offset += num_tensors;
}|
|
||
| file_specs = [{"size": 2580, "chunks": [500, 260, 260, 260, 260, 260, 260, 260, 260]}] | ||
| requests = self._prepare_file_requests(file_specs) | ||
| original_data_map = {} | ||
| for req in requests: | ||
| with open(req.path, "rb") as f: | ||
| original_data_map[req.id] = f.read() | ||
|
|
||
| cuda_mock.reset_cuMemcpyHtoDAsync_call_count() | ||
|
|
||
| # Redirect torch.empty(device="cuda:X") to CPU so tests run without a GPU. | ||
| _orig_empty = torch.empty | ||
| def _cpu_empty(*args, **kwargs): | ||
| if str(kwargs.get("device", "")).startswith("cuda"): | ||
| kwargs = {**kwargs, "device": "cpu"} | ||
| return _orig_empty(*args, **kwargs) | ||
|
|
||
| reconstructed_data_map = {req.id: [None] * len(req.chunks) for req in requests} | ||
| env_vars = {"RUNAI_STREAMER_DIST": "1", "RUNAI_STREAMER_DIST_BUFFER_MIN_BYTESIZE": "0"} | ||
| with patch.dict(os.environ, env_vars), \ | ||
| patch("torch.cuda.is_available", return_value=True), \ | ||
| patch("torch.empty", side_effect=_cpu_empty), \ | ||
| patch("torch.cuda.mem_get_info", return_value=(2**33, 2**33)), \ | ||
| patch("torch.cuda.synchronize"): | ||
| with DistributedStreamer() as streamer: | ||
| # Bypass the gloo+non-cpu backend check in set_is_distributed so | ||
| # distributed streaming is active even with device="cuda:0". | ||
| def _force_distributed(is_distributed, device): | ||
| streamer.is_distributed = ( | ||
| is_distributed | ||
| and dist.is_initialized() | ||
| and dist.get_world_size() > 1 | ||
| ) | ||
| with patch.object(streamer, "set_is_distributed", side_effect=_force_distributed): | ||
| streamer.stream_files(requests, None, "cuda:0", True) | ||
| for req_id, chunk_idx, data_tensor in streamer.get_chunks(): | ||
| reconstructed_data_map[req_id][chunk_idx] = data_tensor.cpu().numpy().tobytes() | ||
|
|
||
| call_count = cuda_mock.get_cuMemcpyHtoDAsync_call_count() | ||
| self.assertGreater( | ||
| call_count, 0, | ||
| f"Rank {self.rank}: expected cuMemcpyHtoDAsync to be called but count was {call_count}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
torch.zeros CUDA allocation not intercepted — test will fail on non-GPU machines even with mock loaded
The test patches torch.empty to redirect CUDA tensors to CPU, but _get_chunks_cuda in distributed_streamer.py allocates two metadata tensors via torch.zeros:
batch_metadata_tensor = torch.zeros(2, 4, dtype=torch.int64, device=self.device) # device="cuda:0"
received_metadata_tensor = torch.zeros(2, 4, dtype=torch.int64, device=self.device)In PyTorch, torch.zeros is a C++ extension that does not route through the Python-level torch.empty; it calls the C++ allocator directly. So the patch("torch.empty", side_effect=_cpu_empty) leaves these torch.zeros calls untouched, and on a machine without a real CUDA device they raise RuntimeError: No CUDA GPUs are available, even when the mock libcuda.so.1 is present.
Fix: patch torch.zeros with the same redirect, or use unittest.mock.patch.object on torch:
with patch("torch.zeros", side_effect=lambda *a, **kw: _orig_empty(*a, **{**kw, "device": "cpu"}) if str(kw.get("device","")).startswith("cuda") else torch.zeros.__wrapped__(*a, **kw)):
...A simpler and more robust alternative is to patch torch.device so self.device resolves to torch.device("cpu"), which causes both torch.empty and torch.zeros (and any future allocations) to land on CPU without per-function patching.
| def get_cuda_alignment() -> int: | ||
| """Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable.""" | ||
| val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))) | ||
| return val if val > 1 else 1 |
There was a problem hiding this comment.
Missing upper-bound validation — regression from the removed
get_dist_buffer_alignment()
The old helper raised a ValueError for alignment values outside [0, MAX_DIST_BUFFER_ALIGNMENT] (1 MiB). The new function silently accepts any positive integer. Setting RUNAI_STREAMER_CUDA_ALIGNMENT=1073741824 (1 GiB) would cause over-allocation of the staging buffer by up to alignment - 1 bytes per tensor, potentially leading to OOM on the GPU.
| def get_cuda_alignment() -> int: | |
| """Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable.""" | |
| val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))) | |
| return val if val > 1 else 1 | |
| def get_cuda_alignment() -> int: | |
| """Return the CUDA buffer alignment in bytes (default 512). Set to 0 or 1 to disable.""" | |
| MAX_CUDA_ALIGNMENT = 1024 * 1024 # 1 MiB — sanity ceiling | |
| val = int(os.getenv(RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR, str(DEFAULT_CUDA_ALIGNMENT))) | |
| if val < 0 or val > MAX_CUDA_ALIGNMENT: | |
| raise ValueError( | |
| f"Invalid {RUNAI_STREAMER_CUDA_ALIGNMENT_ENV_VAR}: {val}, " | |
| f"must be between 0 and {MAX_CUDA_ALIGNMENT}" | |
| ) | |
| return val if val > 1 else 1 |
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py (2)
127-137:⚠️ Potential issue | 🟠 MajorStill synchronize the staging buffer by
self.buffer.device.Bare
torch.cuda.synchronize()has the same current-device problem here. If the iterator allocates oncuda:1whilecurrent_device()iscuda:0,next_request()can recycle the GPU buffer before direct reads or downstream NCCL consumers on the real device finish.Suggested fix
- torch.cuda.synchronize() + torch.cuda.synchronize(device=self.buffer.device)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py` around lines 127 - 137, The cuda sync call in next_request currently uses a bare torch.cuda.synchronize(), which only syncs the current device and can race if buffers live on another GPU; update the logic in next_request (the block gated by self._is_nvidia_cuda and self.file_buffers) to call torch.cuda.synchronize(device=self.buffer.device) or otherwise derive the correct torch.device from self.buffer.device (ensuring it is an int or torch.device) so the staging buffer's device is synchronized before recycling the buffer; keep the timing/logging (_t0, _sync_ms, logger.info) but use the buffer-specific device in the synchronize call.
176-203:⚠️ Potential issue | 🟠 MajorStill reject VRAM caps that fall below the largest padded stride.
free_cuda_memoryis applied after the user-limit check, somemory_limitcan still drop below the largesteffective_stride.FilesRequestsIterator.next_request()then returns an empty request, and the direct path either terminates immediately or fails later in distributed mode.Suggested fix
memory_limit = 0 + largest_stride = max( + (max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), + default=0, + ) @@ - largest_chunk = max((max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks), default=0) - if user_memory_limit < largest_chunk: + if user_memory_limit < largest_stride: raise RunaiStreamerMemoryLimitException( - f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_chunk}" + f"Memory limit supplied: {user_memory_limit} cannot be smaller than: {largest_stride}" ) @@ elif memory_mode == MemoryCapMode.limited: ... if free_cuda_memory is not None: memory_limit = min(memory_limit, free_cuda_memory) + + if memory_limit < largest_stride: + raise RunaiStreamerMemoryLimitException( + f"Effective memory limit {memory_limit} cannot be smaller than " + f"the largest padded stride {largest_stride}" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py` around lines 176 - 203, Compute the largest padded stride once (largest_chunk = max(max(file_chunks.effective_strides, default=0) for file_chunks in files_chunks)) and ensure after applying any GPU cap (free_cuda_memory) the resolved memory_limit is never below largest_chunk; if the min(...) that applies free_cuda_memory would make memory_limit < largest_chunk, raise RunaiStreamerMemoryLimitException with a clear message. Update the logic around MemoryCapMode.unlimited, MemoryCapMode.limited (and MemoryCapMode.largest_chunk if needed) so the GPU-cap step uses the precomputed largest_chunk check before returning FilesRequestsIteratorWithBuffer(memory_limit,...).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`:
- Around line 372-382: The code re-invokes _use_cuda_direct() right before
calling file_streamer.stream_files which can flip after file paths are mutated;
fix this by computing and caching the CUDA-direct decision once (e.g. set
self._cuda_direct_enabled = False in __init__ and then assign it to the actual
decision early) and use that cached flag to determine read_device (and in the
other similar call site around the later stream_files usage) instead of calling
_use_cuda_direct() again; update references to use self._cuda_direct_enabled
when selecting the CPU vs CUDA path so remote-to-local cache mutations in
FileStreamer do not change the chosen path.
- Around line 588-593: The code calls torch.cuda.synchronize() which syncs the
current device, not necessarily the device of receive_buffer; change the sync to
target receive_buffer's device before reusing it (e.g. call
torch.cuda.synchronize(receive_buffer.device) or
torch.cuda.synchronize(self.device) / torch.cuda.synchronize(self.device.index)
as appropriate) so that NCCL work on the buffer's device completes before
calling dist.broadcast(received_view, global_broadcasting_rank,
group=self.distribution_group); keep the existing timing accounting
(total_sync_time) around that synchronized call.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 42-44: The constructor parameter name `id` in
RequestsIterator.__init__ shadows the builtin and must be renamed to satisfy
Ruff A002; change the parameter (e.g., to `chunk_id` or `id_`) in the __init__
signature and update any uses inside __init__ to assign the value to the
existing attribute `self.id` (leave `self.id` unchanged to preserve the public
API), ensuring all local references in requests_iterator.py to the old parameter
name are updated accordingly.
---
Duplicate comments:
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`:
- Around line 127-137: The cuda sync call in next_request currently uses a bare
torch.cuda.synchronize(), which only syncs the current device and can race if
buffers live on another GPU; update the logic in next_request (the block gated
by self._is_nvidia_cuda and self.file_buffers) to call
torch.cuda.synchronize(device=self.buffer.device) or otherwise derive the
correct torch.device from self.buffer.device (ensuring it is an int or
torch.device) so the staging buffer's device is synchronized before recycling
the buffer; keep the timing/logging (_t0, _sync_ms, logger.info) but use the
buffer-specific device in the synchronize call.
- Around line 176-203: Compute the largest padded stride once (largest_chunk =
max(max(file_chunks.effective_strides, default=0) for file_chunks in
files_chunks)) and ensure after applying any GPU cap (free_cuda_memory) the
resolved memory_limit is never below largest_chunk; if the min(...) that applies
free_cuda_memory would make memory_limit < largest_chunk, raise
RunaiStreamerMemoryLimitException with a clear message. Update the logic around
MemoryCapMode.unlimited, MemoryCapMode.limited (and MemoryCapMode.largest_chunk
if needed) so the GPU-cap step uses the precomputed largest_chunk check before
returning FilesRequestsIteratorWithBuffer(memory_limit,...).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a7fd87b5-8f8e-4067-bd51-9b610bff01e4
📒 Files selected for processing (3)
py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.pypy/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.pypy/runai_model_streamer/runai_model_streamer/libstreamer/libstreamer.py
| read_device = device if self._use_cuda_direct() else "cpu" | ||
| original_memory_limit = os.environ.get("RUNAI_STREAMER_MEMORY_LIMIT") | ||
| try: | ||
| # for distributed streaming only - change default memory limit to unlimited | ||
| # Change default memory limit to unlimited so the file_streamer reads as | ||
| # much as possible in one pass. For CUDA, with_memory_cap caps the buffer | ||
| # at free GPU memory automatically, preventing OOM. | ||
| if self.original_group_rank == 0: | ||
| logger.debug(f"[RunAI Streamer][Distributed] Setting memory limit to unlimited") | ||
| if original_memory_limit == None: | ||
| logger.debug(f"[RunAI Streamer][Distributed] Setting memory limit to unlimited, read device: {read_device}") | ||
| if original_memory_limit is None: | ||
| os.environ["RUNAI_STREAMER_MEMORY_LIMIT"] = "-1" | ||
| self.file_streamer.stream_files(self.rank_file_chunks_list, credentials, "cpu") | ||
| self.file_streamer.stream_files(self.rank_file_chunks_list, credentials, read_device) |
There was a problem hiding this comment.
Cache the CUDA-direct decision instead of recomputing it from mutable paths.
stream_files() decides read_device before FileStreamer.stream_files(...), but py/runai_model_streamer/runai_model_streamer/file_streamer/file_streamer.py mutates remote URIs into local cache paths. Re-evaluating _use_cuda_direct() here can therefore flip a remote request from CPU mode into _get_chunks_cuda(), which breaks the “remote backends always use the CPU path” behavior and can make NCCL ranks try to broadcast CPU chunks as CUDA tensors.
Suggested fix
- read_device = device if self._use_cuda_direct() else "cpu"
+ self._cuda_direct_enabled = self._use_cuda_direct()
+ read_device = device if self._cuda_direct_enabled else "cpu"
@@
- if self._use_cuda_direct():
+ if self._cuda_direct_enabled:
yield from self._get_chunks_cuda()
returnInitialize self._cuda_direct_enabled = False in __init__ as well.
Also applies to: 396-398
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`
around lines 372 - 382, The code re-invokes _use_cuda_direct() right before
calling file_streamer.stream_files which can flip after file paths are mutated;
fix this by computing and caching the CUDA-direct decision once (e.g. set
self._cuda_direct_enabled = False in __init__ and then assign it to the actual
decision early) and use that cached flag to determine read_device (and in the
other similar call site around the later stream_files usage) instead of calling
_use_cuda_direct() again; update references to use self._cuda_direct_enabled
when selecting the CPU vs CUDA path so remote-to-local cache mutations in
FileStreamer do not change the chosen path.
| received_view = receive_buffer[:chunk_size] | ||
| t0 = timer() | ||
| torch.cuda.synchronize() | ||
| total_sync_time += timer() - t0 | ||
| t0 = timer() | ||
| dist.broadcast(received_view, global_broadcasting_rank, group=self.distribution_group) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py | head -n 600 | tail -n 50Repository: run-ai/runai-model-streamer
Length of output: 1894
🏁 Script executed:
sed -n '550,620p' py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.pyRepository: run-ai/runai-model-streamer
Length of output: 4466
🏁 Script executed:
sed -n '1,600p' py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py | grep -n "receive_buffer" | head -20Repository: run-ai/runai-model-streamer
Length of output: 572
🌐 Web query:
torch.cuda.synchronize() documentation device parameter behavior
💡 Result:
torch.cuda.synchronize(device=None) blocks the host (CPU) until all queued CUDA work on the specified CUDA device has finished—i.e., it “wait[s] for all kernels in all streams on a CUDA device to complete.” [1]
device parameter behavior:
- Type:
torch.deviceorint(CUDA device index). [1] - Default (
None): synchronizes the current CUDA device, as returned bytorch.cuda.current_device(). [1] - Scope: synchronizes all streams on that device (not a single stream). [1]
- Caveat (multi-thread / multi-GPU): there has been a reported issue where calling
torch.cuda.synchronize()in one Python thread can block CUDA execution in other threads even when they are using other devices. [2]
For per-stream synchronization, use torch.cuda.Stream.synchronize() instead (it targets a specific stream rather than “all streams on a device”). [3]
Sources: [1] PyTorch API docs, [2] PyTorch GitHub issue #24963, [3] PyTorch Forums discussion.
Synchronize the correct device before reusing receive_buffer.
torch.cuda.synchronize() without a device parameter waits on torch.cuda.current_device(), not receive_buffer.device (which is self.device). If the current device differs from the actual buffer device, the subsequent dist.broadcast() can overwrite the buffer while NCCL work on the real destination device is still in flight.
Suggested fix
- torch.cuda.synchronize()
+ torch.cuda.synchronize(device=receive_buffer.device)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/distributed_streamer/distributed_streamer.py`
around lines 588 - 593, The code calls torch.cuda.synchronize() which syncs the
current device, not necessarily the device of receive_buffer; change the sync to
target receive_buffer's device before reusing it (e.g. call
torch.cuda.synchronize(receive_buffer.device) or
torch.cuda.synchronize(self.device) / torch.cuda.synchronize(self.device.index)
as appropriate) so that NCCL work on the buffer's device completes before
calling dist.broadcast(received_view, global_broadcasting_rank,
group=self.distribution_group); keep the existing timing accounting
(total_sync_time) around that synchronized call.
| def __init__(self, id: int, path: str, offset: int, chunks: List[int], | ||
| buffer_strides: Optional[List[int]] = None) -> None: | ||
| self.id = id # the id of the file chunk must be unique in the context of a single stream_files request |
There was a problem hiding this comment.
Rename the id parameter to satisfy Ruff A002.
This shadows Python’s builtin id and will keep lint red. Renaming just the constructor argument is enough; self.id can stay if that attribute is part of the existing API.
Suggested fix
- def __init__(self, id: int, path: str, offset: int, chunks: List[int],
+ def __init__(self, file_id: int, path: str, offset: int, chunks: List[int],
buffer_strides: Optional[List[int]] = None) -> None:
- self.id = id
+ self.id = file_id🧰 Tools
🪛 Ruff (0.15.6)
[error] 42-42: Function argument id is shadowing a Python builtin
(A002)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@py/runai_model_streamer/runai_model_streamer/file_streamer/requests_iterator.py`
around lines 42 - 44, The constructor parameter name `id` in
RequestsIterator.__init__ shadows the builtin and must be renamed to satisfy
Ruff A002; change the parameter (e.g., to `chunk_id` or `id_`) in the __init__
signature and update any uses inside __init__ to assign the value to the
existing attribute `self.id` (leave `self.id` unchanged to preserve the public
API), ensuring all local references in requests_iterator.py to the old parameter
name are updated accordingly.
Direct GPU Streaming via Pinned Staging Buffer
Overview
This branch adds NVIDIA CUDA direct streaming: instead of reading file data into a
CPU buffer and then copying it to the GPU, the C++ worker threads read each block
into a thread-local pinned (page-locked) host staging buffer and DMA it straight
to a pre-allocated PyTorch CUDA tensor via
cuMemcpyHtoDAsync. The tensor lives indevice memory for its entire lifetime and is yielded directly to the caller — no
extra host-to-device copy, no intermediate NumPy array.
The yielded GPU tensor has an aligned address in GPU memory similar to the native Pytorch tensor.
This path is NVIDIA CUDA only. AMD ROCm and all other device types fall back to the
CPU path (read to CPU, then copy to device). Remote backends (S3, GCS, Azure) also
always use the CPU path regardless of device.
For distributed streaming the feature is transparent: no changes are required in
the caller (e.g. vLLM). The distributed streamer detects the device and switches
paths internally, and each rank receives GPU tensors directly without any API change.
For TP=1 (non-distributed) usage, integration changes in the caller are required
to pass a CUDA device string and consume the yielded GPU tensors directly.
Direct streaming to GPU also reduces the CPU memory requirements. The minimal additional device memory is two times the size of the largest tensor.
Benchmarks
Cluster: NVIDIA H200, 8 GPUs per node
Storage: Nebius shared filesystem
Qwen1.5-110B-Chat — TP=8 (25.91 GiB per GPU)
Qwen1.5-110B-Chat — TP=4 (51.8 GiB per GPU)
Qwen3-235B-A22B — TP=8 (54.92 GiB per GPU)
TP=1 (non-distributed) streaming
FilesRequestsIteratorWithBufferaccepts adeviceargument. When the device is anNVIDIA CUDA device (
device.startswith("cuda")andtorch.version.hip is None):torch.empty(..., device=device)instead ofnp.empty(...). Slices of this buffer become the yielded tensors.runai_requestis called withcuda=True, routing C++ worker threads intoBatch::read_cuda().next_request()callstorch.cuda.synchronize()before reusing the buffer for thenext batch, ensuring any async operations still holding a reference to the previous
slice (e.g. dtype casts, user application ops) have completed.
In C++, each worker thread owns a
thread_local CudaStagingBuffer: a pinned hostbuffer (
cuMemAllocHost) and a private CUDA stream (cuStreamCreate).read_cudareads the file infs_block_bytesizechunks, copies each to the devicepointer via
cuMemcpyHtoDAsync, and synchronizes the stream after every chunk so thestaging buffer can be immediately reused for the next read.
Distributed streaming
When
_use_cuda_direct()is true (NVIDIA CUDA, local paths, no ROCm), the distributedstreamer takes a new code path with no API changes required in the caller:
stream_files()passes the real CUDA device string tofile_streamer.stream_files()instead of
"cpu", so tensors land in GPU memory from the start.get_chunks()delegates to the new_get_chunks_cuda()method instead of theexisting
prefill/broadcast loop._get_chunks_cuda()broadcasts one tensor at a time across the distribution group:in lockstep.
file_streamer.get_chunks()(already on the GPU), fills a two-row metadata tensor
[count | orig_req_idx, orig_chunk_idx, size, 0], and does twodist.broadcastcalls: first the metadata, then the GPU tensor itself.
countandsize, calltorch.cuda.synchronize()to ensure the previous receive buffer is no longer inuse, then broadcast into a pre-allocated
receive_buffer[:chunk_size]view andyield it.
The receive buffer is sized to hold one tensor at a time (
max_chunkbytes), so GPUmemory usage on receiving ranks is proportional to the largest single tensor rather
than to the full batch buffer used by the CPU path.
Fallback conditions
The CUDA direct path is skipped and the original CPU path is used when:
deviceisNoneor does not start with"cuda"torch.version.hip is not None)not torch.cuda.is_available())s3://,gs://,az://)Summary by CodeRabbit
New Features
Tests
Chores