Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ebadb51
Unify bounce buffer management
kingcrimsontianyu Jan 30, 2026
9ba980e
Improve implementation and function naming
kingcrimsontianyu Jan 30, 2026
d90a801
Modify naming
kingcrimsontianyu Jan 30, 2026
95e57c8
Update
kingcrimsontianyu Jan 30, 2026
b13a210
Update
kingcrimsontianyu Jan 30, 2026
9393471
Merge branch 'main' into unify-bounce-buffer
kingcrimsontianyu Jan 30, 2026
a4eae68
Add return value for accumulate_and_submit_h2d
kingcrimsontianyu Jan 30, 2026
00338b3
Add reset
kingcrimsontianyu Jan 30, 2026
ee33ad8
Add clarifying comments on bounce buffer's potential multicontext use
kingcrimsontianyu Jan 30, 2026
91a6bee
Clarify the multicontext issue for bounce buffer ring
kingcrimsontianyu Jan 30, 2026
23daf51
Merge branch 'main' into unify-bounce-buffer
kingcrimsontianyu Jan 30, 2026
08dd7f9
Implement event pool
kingcrimsontianyu Feb 1, 2026
3948400
Fix stream race condition
kingcrimsontianyu Feb 2, 2026
43a38d9
Make ctor dtor private. Remove inner NVTX
kingcrimsontianyu Feb 2, 2026
bf20744
Initial impl of event pool
kingcrimsontianyu Feb 2, 2026
7e0c71f
Set a get() overload to private
kingcrimsontianyu Feb 2, 2026
74e6740
Add Doxygen comments
kingcrimsontianyu Feb 2, 2026
bc7064b
Update
kingcrimsontianyu Feb 2, 2026
812c8a9
Remove get(ctx, tid) and move its content to get()
kingcrimsontianyu Feb 3, 2026
c9d7acd
Merge branch 'fix-stream-bug' into event-pool
kingcrimsontianyu Feb 3, 2026
d878f79
Log exception msg
kingcrimsontianyu Feb 3, 2026
df8a4a8
Update cpp/src/detail/event.cpp
kingcrimsontianyu Feb 3, 2026
9d73727
Update name for clarity
kingcrimsontianyu Feb 3, 2026
b71c857
Update
kingcrimsontianyu Feb 3, 2026
7b2c32a
Set buffer ctor to private
kingcrimsontianyu Feb 3, 2026
c0d57d7
Update
kingcrimsontianyu Feb 3, 2026
dab118e
Update
kingcrimsontianyu Feb 3, 2026
3103863
Silly bug fixes
kingcrimsontianyu Feb 4, 2026
ab00a34
Leak intentionnaly
kingcrimsontianyu Feb 4, 2026
4208e5c
Update
kingcrimsontianyu Feb 4, 2026
b36f967
Update doxygen doc
kingcrimsontianyu Feb 4, 2026
badc794
Merge branch 'main' into event-pool
kingcrimsontianyu Feb 5, 2026
6954b3c
Merge branch 'main' into unify-bounce-buffer
kingcrimsontianyu Feb 5, 2026
1fb2d29
Merge branch 'main' into unify-bounce-buffer
kingcrimsontianyu Feb 9, 2026
b1ba9dd
Remove batch copy as it is half baked
kingcrimsontianyu Feb 9, 2026
075dbd8
Merge branch 'main' into event-pool
kingcrimsontianyu Feb 9, 2026
d327f91
Merge branch 'event-pool' into unify-bounce-buffer
kingcrimsontianyu Feb 9, 2026
64ee425
Add event query
kingcrimsontianyu Feb 9, 2026
0c6cf4a
Merge branch 'event-pool' into unify-bounce-buffer
kingcrimsontianyu Feb 9, 2026
9645424
Improve impl by using the event based approach
kingcrimsontianyu Feb 9, 2026
db169e6
Fix error
kingcrimsontianyu Feb 9, 2026
87e8fe2
Merge branch 'main' into event-pool
kingcrimsontianyu Feb 13, 2026
facf3b6
Merge branch 'main' into unify-bounce-buffer
kingcrimsontianyu Feb 13, 2026
4d98a76
Merge branch 'event-pool' into unify-bounce-buffer
kingcrimsontianyu Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ set(SOURCES
"src/file_utils.cpp"
"src/mmap.cpp"
"src/detail/env.cpp"
"src/detail/event.cpp"
"src/detail/nvtx.cpp"
"src/detail/posix_io.cpp"
"src/detail/stream.cpp"
Expand Down
292 changes: 291 additions & 1 deletion cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
*/
#pragma once

#include <map>
#include <mutex>
#include <stack>
#include <utility>
#include <vector>

#include <kvikio/defaults.hpp>
#include <kvikio/detail/event.hpp>

namespace kvikio {

Expand Down Expand Up @@ -43,6 +48,9 @@ class PageAlignedAllocator {
* transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at
* least 256 bytes". It is NOT guaranteed to be page aligned.
*
* @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts,
* not just the one that performed the allocation. This allows the singleton BounceBufferPool to
* safely serve buffers across multiple contexts and devices.
* @note Do NOT use with Direct I/O - lacks page alignment guarantee
*/
class CudaPinnedAllocator {
Expand Down Expand Up @@ -71,6 +79,9 @@ class CudaPinnedAllocator {
* (for efficient host-device transfers). Uses std::aligned_alloc followed by
* cudaMemHostRegister to achieve both properties.
*
* @note Registration uses CU_MEMHOSTREGISTER_PORTABLE, making buffers accessible from all CUDA
* contexts, not just the one active during allocation. This allows the singleton BounceBufferPool
* to safely serve buffers across multiple contexts and devices.
* @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA
* context when allocating.
*/
Expand Down Expand Up @@ -111,6 +122,9 @@ class CudaPageAlignedPinnedAllocator {
* - CudaPinnedAllocator: For device I/O without Direct I/O
* - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O
*
* @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators
* use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any
* other context.
* @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static
* destructors run after CUDA context destruction
*/
Expand All @@ -134,13 +148,16 @@ class BounceBufferPool {
* @note Non-copyable but movable to allow transfer of ownership while maintaining RAII
*/
class Buffer {
friend BounceBufferPool<Allocator>;

private:
BounceBufferPool* _pool;
void* _buffer;
std::size_t _size;

public:
Buffer(BounceBufferPool<Allocator>* pool, void* buffer, std::size_t size);

public:
Buffer(Buffer const&) = delete;
Buffer& operator=(Buffer const&) = delete;
Buffer(Buffer&& o) noexcept;
Expand Down Expand Up @@ -268,4 +285,277 @@ using CudaPinnedBounceBufferPool = BounceBufferPool<CudaPinnedAllocator>;
* Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers)
*/
using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool<CudaPageAlignedPinnedAllocator>;

/**
* @brief K-way bounce buffer ring for overlapping I/O with host-device transfers.
*
* Manages a ring of k bounce buffers to enable pipelining between file I/O and CUDA memory
* transfers. By rotating through multiple buffers, the ring allows async H2D copies to proceed
* while the next buffer is being filled.
*
* Synchronization occurs automatically when the ring wraps around to prevent overwriting buffers
* with in-flight transfers.
*
* @tparam Allocator The allocator policy for bounce buffers:
* - CudaPinnedAllocator: For device I/O without Direct I/O
* - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O
*
* @note This class is NOT thread-safe. Use one ring per thread or per operation.
* @note The internal bounce buffers use CU_MEMHOSTALLOC_PORTABLE and are accessible from any CUDA
* context. However, each transfer session (from first enqueue/submit through synchronize/reset)
* must use a stream and device pointers from a single, consistent CUDA context.
*/
template <typename Allocator = CudaPinnedAllocator>
class BounceBufferRing {
private:
std::vector<typename BounceBufferPool<Allocator>::Buffer> _buffers;
std::size_t _cur_buf_idx{0};
std::size_t _cur_buffer_offset{0};
std::vector<detail::EventPool::Event> _events;

/**
* @brief Advance to next buffer in the ring.
*
* Resets current buffer offset and moves to next buffer index. Waits on the target buffer's event
* if a prior async transfer is still in flight, ensuring the buffer is safe for CPU writes.
*/
void advance();

/**
* @brief Queue async copy from current bounce buffer to device memory.
*
* @param device_dst Device memory destination.
* @param size Bytes to copy from cur_buffer().
* @param stream CUDA stream for the async transfer.
*
* @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance.
*/
void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream);

public:
/**
* @brief Construct a bounce buffer ring.
*
* @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values
* allow more overlap but consume more memory.
*/
explicit BounceBufferRing(std::size_t num_buffers = 1);

~BounceBufferRing() noexcept = default;

// Non-copyable, non-movable
BounceBufferRing(BounceBufferRing const&) = delete;
BounceBufferRing& operator=(BounceBufferRing const&) = delete;
BounceBufferRing(BounceBufferRing&&) = delete;
BounceBufferRing& operator=(BounceBufferRing&&) = delete;

/**
* @brief Get pointer to the current bounce buffer.
*
* Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the
* data to device.
*
* @return Pointer to the start of the current buffer.
*/
[[nodiscard]] void* cur_buffer() const noexcept;

/**
* @brief Get pointer to the current bounce buffer at a specific offset.
*
* Useful for partial buffer fills or when accumulating data incrementally.
*
* @param offset Byte offset from the start of the current buffer.
* @return Pointer to cur_buffer() + offset.
*/
[[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept;

/**
* @brief Get the size of each bounce buffer in the ring.
*
* All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at
* ring construction time.
*
* @return Size in bytes of each buffer.
*/
[[nodiscard]] std::size_t buffer_size() const noexcept;

/**
* @brief Get the number of buffers in the ring (k for k-way overlap).
*
* @return Number of bounce buffers.
*/
[[nodiscard]] std::size_t num_buffers() const noexcept;

/**
* @brief Get the current fill level of the active buffer.
*
* Indicates how many bytes have been accumulated in the current buffer via
* accumulate_and_submit_h2d(). Reset to 0 after each advance().
*
* @return Number of bytes currently in the buffer.
*/
[[nodiscard]] std::size_t cur_buffer_offset() const noexcept;

/**
* @brief Get remaining number of bytes in current buffer for accumulation.
*/
[[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept;

/**
* @brief Accumulate data into bounce buffer, auto-submit when full.
*
* Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and
* advances to the next buffer. Handles data larger than buffer_size() by splitting across
* multiple buffers.
*
* Typical usage for streaming host data to device:
* @code
* while (has_more_data()) {
* auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream);
* device_ptr += submitted;
* }
* auto flushed = ring.flush_h2d(device_ptr, stream);
* device_ptr += flushed;
* ring.synchronize(stream);
* @endcode
*
* @param device_dst Device memory destination (should track cumulative offset externally).
* @param host_src Source data in host memory.
* @param size Bytes to copy.
* @param stream CUDA stream for async H2D transfers.
* @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer
* contents remain until flush_h2d()).
*
* @note Partial buffer contents remain until flush_h2d() is called.
* @note Final data visibility requires flush_h2d() + synchronize().
*/
std::size_t accumulate_and_submit_h2d(void* device_dst,
void const* host_src,
std::size_t size,
CUstream stream);

/**
* @brief Submit current buffer contents to device and advance to next buffer.
*
* Typical usage pattern for direct-fill (e.g., pread into buffer):
* @code
* ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset);
* ring.submit_h2d(device_ptr, n, stream);
* device_ptr += n;
* @endcode
*
* @param device_dst Device memory destination.
* @param size Bytes actually written to cur_buffer().
* @param stream CUDA stream for async H2D transfer.
*
* @note Synchronization may occur if this causes a wrap-around.
* @note Final data visibility requires calling synchronize() after all submits.
*/
void submit_h2d(void* device_dst, std::size_t size, CUstream stream);

/**
* @brief Flush any partially accumulated data to device.
*
* Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete
* buffer.
*
* @param device_dst Device memory destination for the partial buffer.
* @param stream CUDA stream for async H2D transfer.
* @return Number of bytes flushed (0 if buffer was empty).
*
* @note Still requires synchronize() for data visibility guarantee.
*/
std::size_t flush_h2d(void* device_dst, CUstream stream);

/**
* @brief Ensure all queued H2D transfers are complete.
*
* @param stream CUDA stream to synchronize.
*
* @note Must be called before reading transferred data on device.
* @note After synchronize(), the ring can be reused for new transfers.
*/
void synchronize(CUstream stream);

/**
* @brief Synchronize pending transfers and reset ring state for a new transfer session.
*
* Ensures all in-flight transfers complete, then resets the ring to its initial state.
*
* @param stream CUDA stream to synchronize.
*/
void reset(CUstream stream);
};

/**
* @brief Thread-safe singleton cache for per-thread, per-context bounce buffer rings.
*
* Manages a collection of BounceBufferRing instances, each uniquely associated with a (CUDA
* context, thread ID) pair. This ensures that each thread operating within a specific CUDA context
* gets its own dedicated ring, avoiding synchronization overhead during I/O operations while
* maintaining correct behavior across multi-context applications.
*
* @tparam Allocator The allocator policy for the underlying bounce buffer rings:
* - CudaPinnedAllocator: For device I/O without Direct I/O
* - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O
*
* @note Rings are created lazily on first access and persist for the lifetime of the program.
* @note The cache itself is thread-safe; individual rings are not (by design, since each ring is
* accessed by only one thread within one context).
* @note This class intentionally leaks its Ring allocations to avoid undefined behavior during
* process exit. Without this, the following destruction sequence causes crashes. By using raw
* pointers and never deleting them, we avoid calling into CUDA during static destruction. The OS
* reclaims all process memory at exit regardless.
* - `BounceBufferRingCachePerThreadAndContext` singleton is destroyed
* - Its map of `Ring*` entries would be deleted
* - Each `Ring` destructor destroys its `std::vector<Buffer>`
* - Each `Buffer` destructor calls `BounceBufferPool::put()` to return memory
* - `put()` may call `CudaPinnedAllocator::deallocate()` which includes CUDA API call
* - CUDA driver is already shut down, which leads to heap corruption / crash (sample error message:
* "free(): corrupted unsorted chunks")
*
* @sa BounceBufferPool, EventPool (same intentional leak pattern)
* @sa https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization
*/
template <typename Allocator = CudaPinnedAllocator>
class BounceBufferRingCachePerThreadAndContext {
public:
using Ring = BounceBufferRing<Allocator>;

private:
std::map<std::pair<CUcontext, std::thread::id>, Ring*> _rings;
std::mutex mutable _mutex;

BounceBufferRingCachePerThreadAndContext() = default;
~BounceBufferRingCachePerThreadAndContext() = default;

public:
// Non-copyable, non-movable singleton
BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext const&) =
delete;
BounceBufferRingCachePerThreadAndContext& operator=(
BounceBufferRingCachePerThreadAndContext const&) = delete;
BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext&&) = delete;
BounceBufferRingCachePerThreadAndContext& operator=(BounceBufferRingCachePerThreadAndContext&&) =
delete;

/**
* @brief Get the bounce buffer ring for the current thread and CUDA context.
*
* Returns the cached ring for the calling thread's current CUDA context, creating one if it
* doesn't exist. The ring is configured with `defaults::bounce_buffer_count()` buffers.
*
* @return Reference to the ring associated with (current context, current thread).
* @exception kvikio::CUfileException if no CUDA context is current.
*/
Ring& ring();

/**
* @brief Get the singleton instance of the cache.
*
* @return Reference to the singleton cache instance.
*/
static BounceBufferRingCachePerThreadAndContext& instance();
};

} // namespace kvikio
7 changes: 6 additions & 1 deletion cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -116,6 +116,7 @@ class defaults {
std::size_t _task_size;
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _bounce_buffer_count;
std::size_t _http_max_attempts;
long _http_timeout;
std::vector<int> _http_status_codes;
Expand Down Expand Up @@ -302,6 +303,10 @@ class defaults {
*/
static void set_bounce_buffer_size(std::size_t nbytes);

[[nodiscard]] static std::size_t bounce_buffer_count();

static void set_bounce_buffer_count(std::size_t count);

/**
* @brief Get the maximum number of attempts per remote IO read.
*
Expand Down
Loading