Skip to content

PartitionedOutput Improvements: Introduce PartitionedVector #1703

@yingsu00

Description

@yingsu00

PartitionedVector: Design Document for In-Place Vector Partitioning

Problem Statement

In data processing systems like Velox, there's a frequent need to partition vectors (columns of data) into multiple groups based on partition IDs for operations such as:

  • Hash-based shuffling for distributed joins
  • Group-by aggregation with partitioning
  • Optimized data exchange between operators (LocalExchange)

Currently, partitioning vectors often requires:

  1. Creating multiple copies of the vector in VectorStream
  2. Allocating new memory for each partition
  3. Performing expensive data movement operations
  4. Difficult buffer reuse across operations

This leads to:

  • High memory overhead (O(n×p) where n=rows, p=partitions)
  • Increased garbage collection pressure
  • Suboptimal CPU cache utilization

Proposed Solution

Introduce PartitionedVector - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs.

Key Design Principles

  1. In-place rearrangement: Rearrange vector data in memory without creating multiple copies
  2. Buffer reuse: Allow reuse of temporary buffers across multiple partitioning operations
  3. Minimal abstraction: Similar to DecodedVector, focus on efficient execution rather than operator semantics
  4. Thread-unsafe by design: Optimized for single-threaded execution contexts

Design Details

Class Architecture

PartitionedVector (abstract base)
├── PartitionedFlatVector<T> (template for flat vectors)
└── Future: PartitionedComplexVector (for arrays, maps, rows)

Core Components

1. PartitionBuildContext

Transient execution context only valid during construction:

struct PartitionBuildContext {
  BufferPtr beginPartitionOffsetsBuffer;
  BufferPtr swappingBuffer;
  vector_size_t firstRow{0};
  bool useSwapping = false;
};

2. PartitionedVector

Key methods:

  • create(): Factory method that partitions a vector in-place
  • partitionAt(): Extract a specific partition as a view/slice
  • baseVector(): Access the underlying partitioned vector

3. Partitioning Algorithms

  • Fixed-width types: In-place swapping using cyclic permutation
  • Nulls bitmap: Bit-level swapping algorithm. Will add more SIMD algorithms in the future
  • Dictionary vectors: Future extension via dictionary wrapping
  • Complex types: Future extension for arrays, maps, rows with direct in place swapping of the internal offsets vectors

In-Place Partitioning Algorithm

The core idea is the in-place partitioning algorithm that rearranges elements without extra O(n) storage:

// For each partition
for (current_partition = 0; current_partition < num_partitions; current_partition++) {
    // For each position in current partition's range
    while (position < partition_end) {
        target_partition = partition_id[position];
        while (target_partition != current_partition) {
            // Swap element to its target partition
            destination = start_of_target_partition[target_partition]++;
            swap(elements[destination], elements[position]);
            target_partition = partition_id[destination];
        }
        position++;
    }
}

This algorithm guarantees:

  • Each element moved at most once
  • O(n) time complexity with minimal constant factors
  • Preserves relative order within partitions when stable partitioning is required

Memory Layout

Before partitioning: [a0, a1, a2, a3, a4, a5, a6, a7]
Partition IDs:        [0,  1,  0,  2,  1,  0,  2,  1]

After partitioning:  [a0, a2, a5, a1, a4, a7, a3, a6]
                      └───┬───┘ └───┬───┘ └───┬───┘
                         p=0        p=1       p=2

API Usage Example

// Setup
VectorPtr data = makeFlatVector<int64_t>(1000, auto row { return row; });
std::vector<uint32_t> partitions = generatePartitionIds(1000, 8);
BufferPtr partitionOffsets;
PartitionBuildContext ctx;

// Create partitioned vector
auto pvector = PartitionedVector::create(
    data, partitions, 8, partitionOffsets, ctx, pool);

// Access partitions
for (int i = 0; i < 8; i++) {
    VectorPtr partition = pvector->partitionAt(i);
    processPartition(partition);
}

Alternatives Considered

Alternative 1: Multiple Vector Copies

std::vector<VectorPtr> partitions(num_partitions);
for (auto row : rows) {
    auto partition = partition_id[row];
    partitions[partition]->append(row_data);
}

Pros: Simple implementation
Cons: High memory overhead, poor cache locality

Alternative 2: Dictionary Views

for (auto partition : partitions) {
    auto indices = getPartitionIndices(partition);
    auto dict = BaseVector::wrapInDictionary(nulls, indices, vector);
}

Pros: Zero-copy, good for read-only scenarios
Cons: Multiple dictionary layers degrade performance, doesn't solve write scenarios

Alternative 3: External Memory Buffer

BufferPtr partitionedBuffer = allocateAligned(total_size);
for (auto row : rows) {
    auto offset = partition_offsets[partition_id[row]]++;
    memcpy(partitionedBuffer + offset, row_data);
}

Pros: Contiguous output, good for serialization
Cons: Requires full copy, doubles memory usage during partitioning

Testing Strategy

Unit Tests

  1. Basic functionality: Verify partitioning correctness for various partition counts
  2. Edge cases: Empty partitions, single partition, one-row-per-partition
  3. Data types: All supported fixed-width types (int8, int16, int32, int64, float, double)
  4. Null handling: Vectors with different null patterns (every Nth null, all nulls, no nulls)

Performance Benchmarks

// Benchmark metrics
- Throughput (rows/sec)
- Memory allocation count/size  
- Cache miss rates
- Comparison with baseline (naive copy approach)

Performance Considerations

Expected Improvements

  • Memory: Reduce peak memory usage compared to copying
  • CPU: Better cache locality through in-place rearrangement
  • Allocation: Buffer reuse eliminates malloc/free overhead
    The existing ExchangeBenchmark shows over 20x improvement vs. existing implementation on FlatVectors.

Optimization Opportunities

  1. SIMD acceleration: Use AVX2/AVX-512 for bulk bit manipulation
  2. Pre-compute permutation: For tables with many columns, we can calculate the shuffle permutation once and apply to all columns
  3. Prefetching: Hardware prefetch hints for better cache performance

Dependencies

Implementation Plan

Phase 1: Core Infrastructure (Current PR)

  • Basic PartitionedVector and PartitionedFlatVector classes
  • In-place partitioning algorithms for fixed-width types
  • Bit-level nulls partitioning
  • Basic test suite

Phase 2: New PrestoIterativePartitionedSerializer and OptimizedPartitionedOutput operator

  • Introduce PrestoIterativePartitionedSerializer using PartitionedVector
  • Introduce OptimizedPartitionedOutput operator using PrestoIterativePartitionedSerializer
  • Integration test that does e2e shuffle of FlatVector

Phase 3: Enhanced Functionality

  • Support for dictionary vectors (wrap in dictionary rather than copy)
  • Support for string vectors (variable-width types)
  • Support for complex types (arrays, maps, rows)

Phase 4: Optimization

  • SIMD-accelerated bit manipulation
  • Adaptive algorithms based on data distribution
  • Parallel partitioning for large vectors
  • Integration with operator memory management

Risks and Mitigations

Risk 1: Algorithm Complexity

The in-place swapping algorithm is non-trivial to implement correctly.

Mitigation:

  • Extensive unit testing with randomized inputs
  • Formal proof of correctness for the algorithm
  • Comparison with reference implementation

Risk 2: Thread Safety

The class is intentionally not thread-safe for performance reasons.

Mitigation:

  • Clear documentation of thread safety constraints
  • Assertions in debug builds to catch concurrent access
  • Separate thread-safe wrapper if needed

Risk 3: Memory Corruption

In-place manipulation risks corrupting data if bugs exist.

Mitigation:

  • Use of VELOX_CHECK for preconditions
  • Memory sanitizer tests
  • Fuzz testing with random inputs

Conclusion

The PartitionedVector abstraction provides a performant foundation for vector partitioning operations in Velox. By enabling in-place rearrangement with buffer reuse, it addresses key performance bottlenecks in shuffle, aggregation, and other partitioning-heavy operations.

The design follows Velox's philosophy of providing low-level, efficient building blocks that higher-level operators can compose. Future extensions will support more vector types and optimizations while maintaining backward compatibility.

Next Steps:

  1. Review and merge current implementation
  2. Integrate with PartitionedOutput operator
  3. Add support for dictionary vectors
  4. Add support for RLE vectors
  5. Add support for Row vectors
  6. Add support for Array vectors
  7. Add support for Bias vectors
  8. Add support for Map vectors
  9. Profile and optimize based on real workloads

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions