PartitionedVector: Design Document for In-Place Vector Partitioning
Problem Statement
In data processing systems like Velox, there's a frequent need to partition vectors (columns of data) into multiple groups based on partition IDs for operations such as:
- Hash-based shuffling for distributed joins
- Group-by aggregation with partitioning
- Optimized data exchange between operators (LocalExchange)
Currently, partitioning vectors often requires:
- Creating multiple copies of the vector in VectorStream
- Allocating new memory for each partition
- Performing expensive data movement operations
- Difficult buffer reuse across operations
This leads to:
- High memory overhead (O(n×p) where n=rows, p=partitions)
- Increased garbage collection pressure
- Suboptimal CPU cache utilization
Proposed Solution
Introduce PartitionedVector - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs.
Key Design Principles
- In-place rearrangement: Rearrange vector data in memory without creating multiple copies
- Buffer reuse: Allow reuse of temporary buffers across multiple partitioning operations
- Minimal abstraction: Similar to
DecodedVector, focus on efficient execution rather than operator semantics
- Thread-unsafe by design: Optimized for single-threaded execution contexts
Design Details
Class Architecture
PartitionedVector (abstract base)
├── PartitionedFlatVector<T> (template for flat vectors)
└── Future: PartitionedComplexVector (for arrays, maps, rows)
Core Components
1. PartitionBuildContext
Transient execution context only valid during construction:
struct PartitionBuildContext {
BufferPtr beginPartitionOffsetsBuffer;
BufferPtr swappingBuffer;
vector_size_t firstRow{0};
bool useSwapping = false;
};
2. PartitionedVector
Key methods:
create(): Factory method that partitions a vector in-place
partitionAt(): Extract a specific partition as a view/slice
baseVector(): Access the underlying partitioned vector
3. Partitioning Algorithms
- Fixed-width types: In-place swapping using cyclic permutation
- Nulls bitmap: Bit-level swapping algorithm. Will add more SIMD algorithms in the future
- Dictionary vectors: Future extension via dictionary wrapping
- Complex types: Future extension for arrays, maps, rows with direct in place swapping of the internal offsets vectors
In-Place Partitioning Algorithm
The core idea is the in-place partitioning algorithm that rearranges elements without extra O(n) storage:
// For each partition
for (current_partition = 0; current_partition < num_partitions; current_partition++) {
// For each position in current partition's range
while (position < partition_end) {
target_partition = partition_id[position];
while (target_partition != current_partition) {
// Swap element to its target partition
destination = start_of_target_partition[target_partition]++;
swap(elements[destination], elements[position]);
target_partition = partition_id[destination];
}
position++;
}
}
This algorithm guarantees:
- Each element moved at most once
- O(n) time complexity with minimal constant factors
- Preserves relative order within partitions when stable partitioning is required
Memory Layout
Before partitioning: [a0, a1, a2, a3, a4, a5, a6, a7]
Partition IDs: [0, 1, 0, 2, 1, 0, 2, 1]
After partitioning: [a0, a2, a5, a1, a4, a7, a3, a6]
└───┬───┘ └───┬───┘ └───┬───┘
p=0 p=1 p=2
API Usage Example
// Setup
VectorPtr data = makeFlatVector<int64_t>(1000, auto row { return row; });
std::vector<uint32_t> partitions = generatePartitionIds(1000, 8);
BufferPtr partitionOffsets;
PartitionBuildContext ctx;
// Create partitioned vector
auto pvector = PartitionedVector::create(
data, partitions, 8, partitionOffsets, ctx, pool);
// Access partitions
for (int i = 0; i < 8; i++) {
VectorPtr partition = pvector->partitionAt(i);
processPartition(partition);
}
Alternatives Considered
Alternative 1: Multiple Vector Copies
std::vector<VectorPtr> partitions(num_partitions);
for (auto row : rows) {
auto partition = partition_id[row];
partitions[partition]->append(row_data);
}
Pros: Simple implementation
Cons: High memory overhead, poor cache locality
Alternative 2: Dictionary Views
for (auto partition : partitions) {
auto indices = getPartitionIndices(partition);
auto dict = BaseVector::wrapInDictionary(nulls, indices, vector);
}
Pros: Zero-copy, good for read-only scenarios
Cons: Multiple dictionary layers degrade performance, doesn't solve write scenarios
Alternative 3: External Memory Buffer
BufferPtr partitionedBuffer = allocateAligned(total_size);
for (auto row : rows) {
auto offset = partition_offsets[partition_id[row]]++;
memcpy(partitionedBuffer + offset, row_data);
}
Pros: Contiguous output, good for serialization
Cons: Requires full copy, doubles memory usage during partitioning
Testing Strategy
Unit Tests
- Basic functionality: Verify partitioning correctness for various partition counts
- Edge cases: Empty partitions, single partition, one-row-per-partition
- Data types: All supported fixed-width types (int8, int16, int32, int64, float, double)
- Null handling: Vectors with different null patterns (every Nth null, all nulls, no nulls)
Performance Benchmarks
// Benchmark metrics
- Throughput (rows/sec)
- Memory allocation count/size
- Cache miss rates
- Comparison with baseline (naive copy approach)
Performance Considerations
Expected Improvements
- Memory: Reduce peak memory usage compared to copying
- CPU: Better cache locality through in-place rearrangement
- Allocation: Buffer reuse eliminates malloc/free overhead
The existing ExchangeBenchmark shows over 20x improvement vs. existing implementation on FlatVectors.
Optimization Opportunities
- SIMD acceleration: Use AVX2/AVX-512 for bulk bit manipulation
- Pre-compute permutation: For tables with many columns, we can calculate the shuffle permutation once and apply to all columns
- Prefetching: Hardware prefetch hints for better cache performance
Dependencies
Implementation Plan
Phase 1: Core Infrastructure (Current PR)
Phase 2: New PrestoIterativePartitionedSerializer and OptimizedPartitionedOutput operator
Phase 3: Enhanced Functionality
Phase 4: Optimization
Risks and Mitigations
Risk 1: Algorithm Complexity
The in-place swapping algorithm is non-trivial to implement correctly.
Mitigation:
- Extensive unit testing with randomized inputs
- Formal proof of correctness for the algorithm
- Comparison with reference implementation
Risk 2: Thread Safety
The class is intentionally not thread-safe for performance reasons.
Mitigation:
- Clear documentation of thread safety constraints
- Assertions in debug builds to catch concurrent access
- Separate thread-safe wrapper if needed
Risk 3: Memory Corruption
In-place manipulation risks corrupting data if bugs exist.
Mitigation:
- Use of
VELOX_CHECK for preconditions
- Memory sanitizer tests
- Fuzz testing with random inputs
Conclusion
The PartitionedVector abstraction provides a performant foundation for vector partitioning operations in Velox. By enabling in-place rearrangement with buffer reuse, it addresses key performance bottlenecks in shuffle, aggregation, and other partitioning-heavy operations.
The design follows Velox's philosophy of providing low-level, efficient building blocks that higher-level operators can compose. Future extensions will support more vector types and optimizations while maintaining backward compatibility.
Next Steps:
- Review and merge current implementation
- Integrate with PartitionedOutput operator
- Add support for dictionary vectors
- Add support for RLE vectors
- Add support for Row vectors
- Add support for Array vectors
- Add support for Bias vectors
- Add support for Map vectors
- Profile and optimize based on real workloads
PartitionedVector: Design Document for In-Place Vector Partitioning
Problem Statement
In data processing systems like Velox, there's a frequent need to partition vectors (columns of data) into multiple groups based on partition IDs for operations such as:
Currently, partitioning vectors often requires:
This leads to:
Proposed Solution
Introduce
PartitionedVector- a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs.Key Design Principles
DecodedVector, focus on efficient execution rather than operator semanticsDesign Details
Class Architecture
Core Components
1. PartitionBuildContext
Transient execution context only valid during construction:
2. PartitionedVector
Key methods:
create(): Factory method that partitions a vector in-placepartitionAt(): Extract a specific partition as a view/slicebaseVector(): Access the underlying partitioned vector3. Partitioning Algorithms
In-Place Partitioning Algorithm
The core idea is the in-place partitioning algorithm that rearranges elements without extra O(n) storage:
This algorithm guarantees:
Memory Layout
API Usage Example
Alternatives Considered
Alternative 1: Multiple Vector Copies
Pros: Simple implementation
Cons: High memory overhead, poor cache locality
Alternative 2: Dictionary Views
Pros: Zero-copy, good for read-only scenarios
Cons: Multiple dictionary layers degrade performance, doesn't solve write scenarios
Alternative 3: External Memory Buffer
Pros: Contiguous output, good for serialization
Cons: Requires full copy, doubles memory usage during partitioning
Testing Strategy
Unit Tests
Performance Benchmarks
Performance Considerations
Expected Improvements
The existing ExchangeBenchmark shows over 20x improvement vs. existing implementation on FlatVectors.
Optimization Opportunities
Dependencies
Implementation Plan
Phase 1: Core Infrastructure (Current PR)
PartitionedVectorandPartitionedFlatVectorclassesPhase 2: New PrestoIterativePartitionedSerializer and OptimizedPartitionedOutput operator
PartitionedVectorPhase 3: Enhanced Functionality
Phase 4: Optimization
Risks and Mitigations
Risk 1: Algorithm Complexity
The in-place swapping algorithm is non-trivial to implement correctly.
Mitigation:
Risk 2: Thread Safety
The class is intentionally not thread-safe for performance reasons.
Mitigation:
Risk 3: Memory Corruption
In-place manipulation risks corrupting data if bugs exist.
Mitigation:
VELOX_CHECKfor preconditionsConclusion
The
PartitionedVectorabstraction provides a performant foundation for vector partitioning operations in Velox. By enabling in-place rearrangement with buffer reuse, it addresses key performance bottlenecks in shuffle, aggregation, and other partitioning-heavy operations.The design follows Velox's philosophy of providing low-level, efficient building blocks that higher-level operators can compose. Future extensions will support more vector types and optimizations while maintaining backward compatibility.
Next Steps: