[feature](be) Implement adaptive batch size with EWMA prediction for byte-budget control#61535
[feature](be) Implement adaptive batch size with EWMA prediction for byte-budget control#61535mrhhsg wants to merge 7 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary
This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts SegmentIterator chunk row counts to target a preferred block byte size. The design is reasonable and the overall approach is sound. However, there are several issues that need to be addressed.
Critical Checkpoints
Goal and correctness: The feature aims to dynamically adjust batch sizes to target byte-based block sizes instead of fixed row counts. The implementation adds prediction at the SegmentIterator level and byte-budget stops at BlockReader and VCollectIterator. There are correctness bugs in the record_rowids path (see inline comments).
Concurrency: The predictor is single-threaded per instance and used within a single SegmentIterator — no concurrency concern.
Lifecycle / SIOF: No static initialization order issues. The predictor is created as a unique_ptr member.
Config items: enable_adaptive_batch_size is dynamic (mutable), which is appropriate. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are correctly propagated via Thrift.
Incompatible changes: New Thrift field IDs (210, 211) have defaults matching the session variable defaults. Backward compatible.
Parallel code paths: The byte-budget check is added to _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and VCollectIterator::_merge_next. The _direct_next_block path (DUP KEY) relies on the SegmentIterator-level predictor limiting block_row_max, which is correct by design.
Test coverage: Regression tests cover DUP, AGG, and UNIQUE key table types. Unit tests are thorough for the predictor class itself. However, no test covers the record_rowids path which has bugs.
Observability: Profile counters AdaptiveBatchPredictMinRows and AdaptiveBatchPredictMaxRows are added. However, a LOG(INFO) debug log is left in hot path (see inline).
Performance: Block::columns_byte_size() is called per-row in some paths. Each call is O(num_columns) with O(1) per column. Acceptable for typical column counts but could be a concern for very wide tables (hundreds of columns).
Issues Found
-
[Bug]
_replace_key_next_blockand_unique_key_next_block: Missing_block_row_locations.resize(target_block_row)when byte-budget triggers break withrecord_rowidsenabled. This will cause the DCHECK at line 604 to fail and potentially return stale/incorrect row locations. -
[Bug/Severity:High]
LOG(INFO)with******prefix on everypredict_next_rows()call — this is a debug log accidentally left in production code. It will spam logs on every batch of every segment scan. -
[Minor] The
_replace_key_next_blockfunction does not callblock->set_columns()after mutating columns — this is pre-existing behavior but worth noting for consistency. -
[Observation] In
VCollectIterator::_merge_next, the byte-budget check usesBlock::columns_byte_size(target_columns)before flushingcontinuous_row_in_blockpending rows, so the actual output block can exceed the budget. This is a design choice (conservative), not a correctness bug.
| static_cast<size_t>(static_cast<double>(_block_size_bytes) / estimated_bytes_per_row); | ||
| LOG(INFO) << "****** predicted rows: " << predicted | ||
| << ", estimated_bytes_per_row: " << estimated_bytes_per_row | ||
| << ", _block_size_bytes: " << _block_size_bytes; |
There was a problem hiding this comment.
[Bug/High] This LOG(INFO) with ****** prefix is called on every predict_next_rows() invocation — i.e., every batch in every segment scan. This will generate massive log spam in production.
This should be changed to VLOG_DEBUG or removed entirely. At minimum, this is clearly a debug statement accidentally left in:
VLOG_DEBUG << "predicted rows: " << predicted
<< ", estimated_bytes_per_row: " << estimated_bytes_per_row
<< ", _block_size_bytes: " << _block_size_bytes;| Block::columns_byte_size(target_columns) >= | ||
| _reader_context.preferred_block_size_bytes) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
[Bug] When record_rowids is enabled, this byte-budget break exits the outer while loop without resizing _block_row_locations to target_block_row. At line 349, _block_row_locations was resized to batch_size(), so after this break it will still have batch_size() entries instead of target_block_row entries.
The EOF path at lines 372-374 correctly does _block_row_locations.resize(target_block_row). This break path needs the same treatment:
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
if (UNLIKELY(_reader_context.record_rowids)) {
_block_row_locations.resize(target_block_row);
}
break;
}Without this fix, current_block_row_locations() returns a vector with stale entries, and the DCHECK at line 604 (DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count)) will fail.
| Block::columns_byte_size(target_columns) >= | ||
| _reader_context.preferred_block_size_bytes) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
[Bug] Same issue as in _replace_key_next_block: when record_rowids is enabled, this byte-budget break exits without resizing _block_row_locations to target_block_row. At line 530, _block_row_locations was resized to _reader_context.batch_size. The EOF path at lines 548-550 correctly resizes.
Fix:
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
if (UNLIKELY(_reader_context.record_rowids)) {
_block_row_locations.resize(target_block_row);
}
break;
}Note: in _unique_key_next_block, there is also the _delete_sign_available filtering path (starting at line 566) that uses target_block_row and _block_row_locations. An incorrectly-sized _block_row_locations could corrupt the filter logic.
TPC-H: Total hot run time: 30963 ms |
TPC-DS: Total hot run time: 173281 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
daebf3f to
f404b06
Compare
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary: feat Implement adaptive batch size for SegmentIterator
This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts chunk row counts so each output Block stays near a configurable byte target. The overall design is sound, with proper plumbing from FE session variables through Thrift to the storage layer. The byte-budget checks in BlockReader and VCollectIterator are placed at safe boundaries.
However, I found several issues ranging from a unit test correctness bug to regression test standard violations.
Critical Checkpoints
1. Goal & Correctness: The PR achieves its stated goal of adaptive batch sizing. The EWMA predictor logic is correct. Byte-budget stops in _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and _merge_next are all placed at safe row boundaries. No data correctness issues found in the core logic.
2. Focused & Minimal: The change is reasonably focused. Adding new session variables, config flags, thrift fields, the predictor, and byte-budget stops across multiple layers is inherent to the feature.
3. Concurrency: The TOCTOU race in _collect_profile_before_close for min/max counter updates is benign (display-only counters) but should be documented or fixed. See inline comment.
4. Lifecycle: No lifecycle issues. The predictor is owned by SegmentIterator and destroyed with it.
5. Configuration: enable_adaptive_batch_size is mutable and appropriately documented. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are properly forwarded.
6. Incompatible changes: Thrift fields 210/211 are optional with correct IDs. No compatibility issues. (Note: PR description incorrectly states fields 204/205.)
7. Parallel code paths: The three BlockReader paths (_replace_key, _agg_key, _unique_key) are all covered. VCollectIterator merge path is covered. Compaction paths are not affected (they don't set preferred_block_size_bytes).
8. Test coverage: Unit tests are comprehensive (19+ test cases) but have a critical mock bug that makes metadata-hint tests read indeterminate memory. Regression tests have standard violations. See inline comments.
9. Observability: Profile counters AdaptiveBatchPredictMinRows/MaxRows are added. The else branch stats overwrite is problematic. See inline comment.
10. Performance: Block::columns_byte_size() is O(num_columns) per call (all byte_size() implementations are O(1)). Called per-row in _unique_key_next_block and _merge_next, this adds measurable but acceptable overhead for wide tables.
Issues Found
| # | Severity | File | Issue |
|---|---|---|---|
| 1 | High | Unit test | MockSegment::num_rows() mock is broken — Segment::num_rows() is non-virtual, so ON_CALL has no effect through Segment& reference. Tests using metadata hints read indeterminate _num_rows. |
| 2 | Medium | Regression test | Uses qt_ prefix instead of order_qt_ (violates regression test standards) |
| 3 | Medium | Regression test | Drops 4 of 5 tables after use (should only drop before, to preserve debug state) |
| 4 | Low | olap_scanner.cpp | TOCTOU race on min/max counter updates (benign for display-only counters) |
| 5 | Low | segment_iterator.cpp | else branch overwrites stats every call when predictor is inactive, producing misleading values |
| 6 | Info | PR description | States thrift fields 204/205 but actual code uses 210/211 |
|
|
||
| // Set up num_rows mock. | ||
| ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows)); | ||
|
|
There was a problem hiding this comment.
Bug (High): Segment::num_rows() is non-virtual (declared as uint32_t num_rows() const { return _num_rows; } in segment.h:119). The ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows)) configures gmock's virtual mock method, but when predict_next_rows() calls segment.num_rows() through a Segment& reference, the compiler statically dispatches to the base class's non-virtual method, bypassing the mock entirely.
The actual value returned will be whatever _num_rows was left at by the base Segment constructor — which does not initialize _num_rows in its member initializer list (see segment.cpp:168-174). This means tests like PredictNoHistoryMetadataHint are reading indeterminate memory (undefined behavior).
Fix: Since MockSegment is declared as a friend of Segment, you can directly set the member:
seg->_num_rows = num_rows;instead of (or in addition to) the ON_CALL mock.
be/src/exec/scan/olap_scanner.cpp
Outdated
|
|
||
| if (stats.adaptive_batch_size_predict_max_rows > 0) { | ||
| auto cur_min = local_state->_adaptive_batch_predict_min_rows_counter->value(); | ||
| if (cur_min == 0 || stats.adaptive_batch_size_predict_min_rows < cur_min) { |
There was a problem hiding this comment.
Minor (Low): This read-check-store pattern on shared RuntimeProfile::Counter is a classic TOCTOU race. Multiple scanners call _collect_profile_before_close concurrently on the same local_state counters. Thread A can read a stale cur_min, pass the check, and overwrite thread B's correct value.
Since these are display-only profile counters, this is benign — the worst outcome is a slightly inaccurate min/max in the query profile. But consider documenting this or using COUNTER_UPDATE with atomic min/max if RuntimeProfile::Counter supports it.
| def res_enabled = sql "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4" | ||
|
|
||
| qt_wide "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4 limit 50" | ||
|
|
There was a problem hiding this comment.
Standards violation (Medium): Per regression test standards, use order_qt_ prefix instead of qt_ to ensure deterministic ordered output. This applies to all query tags in this file (qt_wide, qt_narrow, qt_agg, qt_unique, qt_flag).
While some of these queries have explicit ORDER BY or return single rows, the standard convention is to consistently use order_qt_ prefix.
| // toward returning close to max_rows (batch is still row-limited). | ||
|
|
||
| sql "drop table if exists abs_narrow_table" | ||
| sql """ |
There was a problem hiding this comment.
Standards violation (Medium): Per regression test standards: "After completing tests, do not drop tables; instead drop tables before using them in tests, to preserve the environment for debugging."
This drop table after use (and similar ones for abs_agg_table, abs_unique_table, abs_flag_table) should be removed. The drop table if exists before CREATE TABLE at the beginning of each test case is the correct pattern (and is already present).
| static_cast<int64_t>(predicted)); | ||
| } else { | ||
| _opts.stats->adaptive_batch_size_predict_min_rows = _opts.block_row_max; | ||
| _opts.stats->adaptive_batch_size_predict_max_rows = _opts.block_row_max; |
There was a problem hiding this comment.
Minor (Low): When _block_size_predictor is null (feature disabled), this else branch unconditionally overwrites adaptive_batch_size_predict_min_rows and adaptive_batch_size_predict_max_rows with _opts.block_row_max on every next_batch() call. Since OlapReaderStatistics is shared across segment iterators for the same scanner, and adaptive_batch_size_predict_min_rows is initialized to INT64_MAX, the first segment iterator's overwrite clobbers the sentinel.
This means the profile counters will show misleading values when the feature is disabled. Consider only setting these once, or guarding with a check (e.g., only set if currently INT64_MAX / 0).
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 27323 ms |
TPC-DS: Total hot run time: 169237 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
f404b06 to
ae43fe9
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 26528 ms |
TPC-DS: Total hot run time: 169678 ms |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
| RuntimeProfile::Counter* _variant_doc_value_column_iter_count = nullptr; | ||
|
|
||
| RuntimeProfile::Counter* _adaptive_batch_predict_min_rows_counter = nullptr; | ||
| RuntimeProfile::Counter* _adaptive_batch_predict_max_rows_counter = nullptr; |
There was a problem hiding this comment.
Use HighWaterMarkCounter
| DEFINE_mBool(enable_low_cardinality_optimize, "true"); | ||
| DEFINE_Bool(enable_low_cardinality_cache_code, "true"); | ||
|
|
||
| DEFINE_mBool(enable_adaptive_batch_size, "true"); |
There was a problem hiding this comment.
Could we use a session variable instead ?
be/src/runtime/runtime_state.h
Outdated
| auto v = _query_options.preferred_block_size_bytes; | ||
| return v > 0 ? static_cast<size_t>(v) : 0; | ||
| } | ||
| return 8388608UL; // 8MB default |
There was a problem hiding this comment.
Maybe we should not enable this feature by default
| } | ||
| // Byte-budget check: after the inner loop _next_row is either EOF or the next different | ||
| // key, so it is safe to stop accumulating here without repeating any row. | ||
| if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && |
There was a problem hiding this comment.
Extract a common function to do this checking
| // Adaptive batch size: target output block byte budget. 0 = disabled. | ||
| size_t preferred_block_size_bytes = 0; | ||
| // Per-column byte limit for the adaptive predictor. 0 = no column limit. | ||
| size_t preferred_max_col_bytes = 1048576; |
| } | ||
| } | ||
| if (matched_cols > 0 && total_bytes > 0.0) { | ||
| _metadata_hint_bytes_per_row = (total_bytes / static_cast<double>(seg_rows)) * 1.2; |
ae43fe9 to
841c2a7
Compare
TPC-H: Total hot run time: 36837 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
a6852d1 to
c8bc656
Compare
…byte-budget control Issue Number: close #xxx Problem Summary: Fixed-row-count batch sizing causes memory spikes for wide tables and under-utilization for narrow tables. This PR adds adaptive batch sizing that dynamically adjusts row counts using EWMA (Exponential Weighted Moving Average) prediction so that each output block stays near a configurable byte budget (default 8MB). **Key changes:** 1. **Storage layer**: New `AdaptiveBlockSizePredictor` class in SegmentIterator uses segment column metadata for initial estimation and EWMA for runtime adjustment. BlockReader and VCollectIterator gain byte-budget awareness. 2. **Operator layer**: All pipeline operators (aggregation, sort, join, set, union, exchange, table function, schema scan) switch from `batch_size()` to `block_max_rows()` / `block_max_bytes()` for dual row+byte control. Hash join probe adds per-instance `batch_size_limit` parameter to `find_batch` for thread-safe adaptive control without modifying shared hash table state. 3. **Scan/Format layer**: FileScanner integrates EWMA prediction for CSV, JSON, ORC, and Parquet readers. OlapScanner propagates byte budget to storage layer. 4. **FE/Thrift**: Three new session variables (`preferred_block_size_bytes`, `preferred_block_size_rows`, `preferred_max_column_in_block_size_bytes`) with BE config `enable_adaptive_batch_size` (mBool, default true). Added adaptive batch size feature that dynamically adjusts block row counts to target a byte budget (default 8MB), improving memory efficiency for wide tables and throughput for narrow tables. Controlled by session variables `preferred_block_size_bytes` (default 8MB), `preferred_block_size_rows` (default 65535), and `preferred_max_column_in_block_size_bytes` (default 1MB), plus BE config `enable_adaptive_batch_size` (default true). - Test: Unit Test (full BE UT pass) / Regression test (adaptive_batch_size suite) - Behavior changed: Yes - block sizes are now adaptively controlled by byte budget when enabled; disable with SET enable_adaptive_batch_size=false or preferred_block_size_bytes=0 - Does this need documentation: Yes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
c8bc656 to
661e4ee
Compare
|
/review |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 98 out of 98 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Return the effective maximum row count for output blocks. | ||
| // When adaptive batch sizing is enabled, prefer preferred_block_size_rows; | ||
| // otherwise fall back to the traditional batch_size. | ||
| size_t batch_max_rows() const { | ||
| if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_rows > 0) { | ||
| return _reader_context.preferred_block_size_rows; | ||
| } | ||
| return _reader_context.batch_size; | ||
| } |
There was a problem hiding this comment.
batch_max_rows() switches to preferred_block_size_rows whenever enable_adaptive_batch_size is true, regardless of whether preferred_block_size_bytes is 0. If preferred_block_size_bytes=0 is intended to fully disable adaptive sizing (per PR description), this still changes behavior vs the legacy fixed batch_size row cap. Consider also gating the preferred_block_size_rows branch on preferred_block_size_bytes>0 (or explicitly documenting that rows-only expansion remains enabled when bytes is 0).
| assertEquals(res_enabled[i].toString(), res_disabled[i].toString()) | ||
| } | ||
|
|
||
| // sql "drop table abs_wide_table" |
There was a problem hiding this comment.
abs_wide_table is created and populated, but the DROP at the end of Test 1 is commented out. Leaving the table behind can pollute the test environment and potentially affect reruns or later suites. Consider dropping abs_wide_table (like the other test tables) during cleanup.
| MOCK_FUNCTION int block_max_rows() const { | ||
| return config::enable_adaptive_batch_size | ||
| ? std::max(_query_options.batch_size, int(preferred_block_size_rows())) | ||
| : _query_options.batch_size; | ||
| } |
There was a problem hiding this comment.
block_max_rows() is currently gated only by config::enable_adaptive_batch_size, not by whether the query’s byte budget is enabled. As a result, preferred_block_size_bytes = 0 (the documented disable switch) still leaves block_max_rows() potentially much larger than batch_size (e.g. 65535), which can change legacy behavior and risk large blocks for wide rows. Consider gating the preferred_block_size_rows path on preferred_block_size_bytes()>0 (or ensuring FE sets preferred_block_size_rows=0 when bytes=0) so “disable” truly restores the fixed-row behavior.
| def set_adaptive = { enabled -> | ||
| // preferred_block_size_bytes controls the feature end-to-end. | ||
| // Setting it to a very small value (e.g. 1 byte) forces the predictor to | ||
| // return row count = 1 every time, which is a good stress test. | ||
| if (enabled) { | ||
| sql "set preferred_block_size_bytes = 8388608" // 8 MB (default) | ||
| sql "set batch_size = 65535" | ||
| sql "set preferred_max_column_in_block_size_bytes = 1048576" // 1 MB | ||
| } else { | ||
| // Setting to 0 disables the byte-limit path at the accumulation layer. | ||
| sql "set preferred_block_size_bytes = 0" | ||
| sql "set batch_size = 4096" | ||
| sql "set preferred_max_column_in_block_size_bytes = 0" | ||
| } |
There was a problem hiding this comment.
This suite modifies multiple session variables (e.g. batch_size) but only resets preferred_block_size_bytes and preferred_max_column_in_block_size_bytes at the end. Leaving batch_size / preferred_block_size_rows at non-default values can leak into subsequent suites if they share the same session. Suggest resetting all modified vars back to defaults in the cleanup section.
| } else { | ||
| // Setting to 0 disables the byte-limit path at the accumulation layer. | ||
| sql "set preferred_block_size_bytes = 0" | ||
| sql "set batch_size = 4096" | ||
| sql "set preferred_max_column_in_block_size_bytes = 0" | ||
| } |
There was a problem hiding this comment.
The disabled branch only sets preferred_block_size_bytes=0, but does not reset preferred_block_size_rows. Since many operators now use block_max_rows(), leaving preferred_block_size_rows at its default (65535) can still change behavior compared to legacy fixed batch_size=4096. If the intent is “fixed row count only” when disabled, consider also setting preferred_block_size_rows to 0 (or to batch_size) in the disabled branch.
| // Base TabletReader always returns 0 (feature unsupported at base level). | ||
| TEST_F(BlockReaderByteBudgetTest, BaseTabletReaderReturnsZero) { | ||
| config::enable_adaptive_batch_size = true; | ||
| // TabletReader is abstract; use BlockReader as base pointer to test virtual dispatch. | ||
| BlockReader concrete; | ||
| concrete._reader_context.preferred_block_size_bytes = 99999; | ||
| TabletReader* base = &concrete; | ||
| // Through the virtual dispatch, BlockReader's override should be called. | ||
| EXPECT_EQ(base->preferred_block_size_bytes(), 99999); |
There was a problem hiding this comment.
The test name/comment says “Base TabletReader always returns 0”, but the assertion is checking virtual dispatch through a TabletReader* to BlockReader’s override and expects 99999. Suggest updating the comment (or renaming the test) to reflect what is actually being validated.
There was a problem hiding this comment.
I found 2 issues in the adaptive batch size implementation.
-
SET preferred_block_size_bytes = 0does not actually restore the legacy fixed-row behavior.RuntimeState::block_max_rows()andTabletReader::batch_max_rows()still switch topreferred_block_size_rowswhenever the global config is enabled, so many operators now produce up to 65535 rows even though the session-level byte-budget flag is meant to disable the feature. The regression test only checks result equivalence, so it misses this behavioral regression. -
The OLAP scan path cannot grow beyond the legacy reader batch size.
BetaRowsetReaderstill seedsStorageReadOptions::block_row_maxfromread_context->batch_size, andSegmentIteratorthen clamps every prediction to_initial_block_row_max. For narrow tables the predictor can therefore only shrink batches, never grow towardpreferred_block_size_rows, which means the storage layer does not deliver the advertised adaptive growth behavior.
Critical checkpoint conclusions:
- Goal / tests: Partially met. The PR wires adaptive sizing through many paths and adds broad tests, but the current tests do not prove the documented disable semantics or OLAP narrow-row growth.
- Small / focused change: Not especially small; the feature spans many operators and the reviewed paths are not fully consistent yet.
- Concurrency: No new lock-order or thread-safety bug was identified in the reviewed paths.
- Lifecycle / static initialization: No new lifecycle or SIOF issue found in the reviewed paths.
- Configuration: New session/config knobs are wired through FE->BE, but the session-level disable path is not honored consistently.
- Compatibility: No storage-format or FE/BE compatibility break was identified from the reviewed diff.
- Parallel code paths: Not all equivalent paths behave the same yet; OLAP scan still keeps the old hard cap while downstream operators use the new cap.
- Special conditions: The new row-cap clamp in
SegmentIteratordefeats the intended adaptive growth behavior. - Test coverage: Good breadth, but missing assertions for the two regressions above.
- Observability: Added FileScanner counters help there; nothing additional blocking found.
- Transaction / persistence / data-write correctness: Not applicable for the reviewed changes; no issue found there.
- Performance: The OLAP scan clamp leaves narrow-table performance gains unrealized.
- Other issues: None beyond the 2 findings above from the reviewed paths.
be/src/runtime/runtime_state.h
Outdated
| void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; } | ||
|
|
||
| MOCK_FUNCTION int block_max_rows() const { | ||
| return config::enable_adaptive_batch_size |
There was a problem hiding this comment.
preferred_block_size_bytes = 0 is documented as the session-level switch to disable adaptive sizing, but block_max_rows() still returns max(batch_size, preferred_block_size_rows()) whenever the global config is on. After this PR, many operators use state->block_max_rows(), so SET preferred_block_size_bytes = 0 still inflates the row cap from the legacy batch_size to 65535 by default instead of restoring the old fixed-row behavior. The same issue exists in TabletReader::batch_max_rows(), so the disable path is inconsistent across BE.
| _init_virtual_columns(block); | ||
| auto status = [&]() { | ||
| RETURN_IF_CATCH_EXCEPTION({ | ||
| // Adaptive batch size: predict how many rows this batch should read. |
There was a problem hiding this comment.
This clamp prevents the storage-side predictor from ever growing past the legacy reader cap. _initial_block_row_max is captured from _opts.block_row_max, and that value still comes from read_context->batch_size in BetaRowsetReader. For narrow OLAP scans, predict_next_rows() may return a much larger value, but std::min(predicted, _initial_block_row_max) forces it back down to the old fixed batch size, so this path can only shrink batches, never grow toward preferred_block_size_rows().
### What problem does this PR solve? Issue Number: close #N/A Related PR: apache#61535 Problem Summary: Address code review findings on the adaptive batch size feature: 1. `RuntimeState::block_max_rows()` now gates on `preferred_block_size_bytes() > 0`, so `SET preferred_block_size_bytes = 0` correctly restores legacy fixed-row behavior. Also fixes the `std::max` semantics that turned `preferred_block_size_rows` into a lower bound instead of upper bound, and fixes `size_t` to `int` cast overflow risk. 2. `TabletReader::batch_max_rows()` now also gates on `preferred_block_size_bytes > 0`. 3. `BetaRowsetReader` now seeds `block_row_max` from `preferred_block_size_rows` when adaptive sizing is active, so the OLAP scan path can grow beyond the legacy `batch_size` (1024) for narrow tables. `SegmentIterator::_initial_block_row_max` then correctly reflects the larger cap. 4. `VSortedRunMerger` single-run fast path now falls through to the general merge path when byte budget is active, preventing oversized output blocks. 5. Regression test: renamed `qt_` to `order_qt_` prefix per standard, removed post-test `drop table` statements (keep drop-before-create pattern per standard). 6. Unit tests updated to reflect the new gating semantics, including a new test case for the `preferred_block_size_bytes=0` fallback. ### Release note None ### Check List (For Author) - Test: Unit Test (71+110 tests passed including all adaptive batch size, sort merger, block reader, collect iterator, and operator tests) - Behavior changed: Yes - `preferred_block_size_bytes = 0` now correctly disables adaptive row cap expansion (previously still used preferred_block_size_rows=65535) - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Follow-up fixes for remaining review comments on PR apache#61535: 1. Removed stale commented-out DROP TABLE in regression test 2. Reset all modified session variables (batch_size, preferred_block_size_rows) in regression test cleanup to prevent leaking into subsequent suites 3. Renamed misleading test "BaseTabletReaderReturnsZero" to "VirtualDispatchThroughTabletReaderPtr" with accurate comment 4. Applied clang-format to beta_rowset_reader.cpp ### Release note None ### Check List (For Author) - Test: Unit Test (BlockReaderBatchMaxRows, BlockReaderByteBudget, EstimateCollectedEnough — 27 tests passed) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Two code paths lacked byte-budget enforcement for the adaptive batch size feature: 1. **Distinct Streaming Aggregation**: The split logic in _distinct_pre_agg_with_serialized_key only enforced row-count cap (batch_size). For wide grouping keys, this could produce blocks exceeding preferred_block_size_bytes within a single push(). Fix: Added byte-budget-aware split — estimates remaining byte budget using average row size and caps max_rows_to_add accordingly. Excess rows go to _cache_block for the next iteration. 2. **TopN storage path** (_topn_next): The one-shot topN computation returned the entire result in a single block without consulting byte budget. For large LIMIT values on wide tables, this could produce blocks far exceeding preferred_block_size_bytes. Fix: After computing the full topN result, if it exceeds the byte budget, the result is stored and returned in chunks. Each chunk respects both batch_max_rows and the byte budget. When byte budget is disabled, behavior is unchanged (zero overhead). ### Release note None ### Check List (For Author) - Test: Unit Test (78 + 105 = 183 tests passed including DistinctStreamingAgg, Sorter, Operator, and adaptive batch size tests) - Behavior changed: No (feature is gated on preferred_block_size_bytes > 0) - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
/review |
There was a problem hiding this comment.
Findings:
- be/src/exec/operator/nested_loop_join_probe_operator.cpp: the byte budget is only translated into effective_max_rows after the first process append, so the first build or probe chunk can still overshoot badly on wide rows.
- be/src/exec/operator/distinct_streaming_aggregation_operator.cpp: the new split logic uses the average size of rows already buffered in out_block, which does not bound the bytes contributed by the candidate distinct rows being appended.
- be/src/storage/iterator/vcollect_iterator.cpp: _topn_next_chunk() slices by the full result average row size, so a chunk that starts with a few very wide rows can still exceed preferred_block_size_bytes.
Critical checkpoints:
- Goal / correctness: Partially met. Adaptive sizing is broadly wired through and tested, but the three paths above still violate the advertised byte-budget guarantee on skewed-width data. I did not see regression coverage for those cases.
- Minimal / focused: Reasonably focused for a cross-cutting feature.
- Concurrency: Reviewed the touched concurrent paths; no new lock-order or shared-state bug found in the modified code.
- Lifecycle / static initialization: No issue found.
- Configuration: FE/BE/thrift propagation for the new knobs looks complete, and preferred_block_size_bytes = 0 correctly gates the feature off.
- Compatibility: New thrift fields are appended with new IDs; no compatibility issue found in the touched protocol changes.
- Parallel code paths: Not fully consistent yet; the three findings are remaining byte-budget gaps in parallel paths.
- Special conditions / comments: The new average-based checks are not strong enough to justify the claimed byte-budget enforcement in these paths.
- Test coverage: Broad positive coverage exists, but adversarial skewed-row-size tests are still missing for these code paths.
- Observability: Added scanner counters are useful; no blocker found.
- Transaction / persistence: Not applicable.
- Data writes / atomicity: Not applicable.
- FE-BE variable passing: Complete in the touched code.
- Performance: Overall reasonable; the current blockers are correctness gaps, not overhead.
- Other issues: None beyond the findings above.
| while (_join_block.rows() + add_rows() <= state->batch_size()) { | ||
| const auto block_max_bytes = state->block_max_bytes(); | ||
| size_t effective_max_rows = state->block_max_rows(); | ||
| bool bytes_estimated = false; |
There was a problem hiding this comment.
effective_max_rows only becomes byte-aware after the first process_probe_block() or process_build_block() append. That means the first iteration can still materialize a full build or probe chunk based only on row count and blow far past preferred_block_size_bytes on wide rows. The same late adjustment pattern is duplicated in _generate_block_base_build(), so this is still not a real byte-budget bound for nested-loop join output.
| } else { | ||
| size_t avg_row_bytes = current_bytes / out_block->rows(); | ||
| if (avg_row_bytes > 0) { | ||
| size_t rows_for_budget = (block_max_bytes - current_bytes) / avg_row_bytes; |
There was a problem hiding this comment.
Using out_block->bytes() / out_block->rows() here does not actually bound the bytes added by the rows in _distinct_row. If the buffered rows are narrow but the next distinct key is wide, such as large STRING, JSON, or VARIANT values, rows_for_budget can still be at least 1 and this push emits a block far above preferred_block_size_bytes. To preserve the new contract, this split needs to account for the candidate rows bytes themselves, not just the historical average of the already-buffered rows.
| // Cap by batch_max_rows. | ||
| chunk_rows = std::min(chunk_rows, (size_t)_reader->batch_max_rows()); | ||
|
|
||
| // Cap by byte budget. |
There was a problem hiding this comment.
This still does not guarantee the chunk respects preferred_block_size_bytes. chunk_rows is derived from the full result average row size, so a skewed TopN result, for example a few very wide rows followed by many narrow rows, can produce a first chunk whose real bytes are far above the budget even though the average says it is safe. If this path is supposed to enforce the byte cap, it needs a byte-aware cut over the actual rows being copied, not a global average.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 99 out of 99 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Byte-budget check: _merge_next() has already advanced _ref to the next unread row, | ||
| // so it is safe to stop here without duplicating any data. | ||
| if (collected_enough_rows(target_columns, continuous_row_in_block)) { | ||
| if (continuous_row_in_block > 0) { |
There was a problem hiding this comment.
In Level1Iterator::_merge_next(), collected_enough_rows() is evaluated on every loop iteration. That call recomputes Block::columns_byte_size(columns) by summing byte_size() across all columns each time, which makes the merge loop O(rows * columns) and can be a noticeable CPU regression on wide schemas. Consider checking the byte budget only every N rows (similar to BlockReader’s interval checks), or maintaining an incremental byte counter updated when ranges are inserted, so you avoid rescanning all columns per row.
| // Insert 1000 rows with ~3 KB data each. | ||
| def wide_rows = (1..1000).collect { i -> | ||
| "(${i}, '${('a' * 1000)}', '${('b' * 1000)}', '${('c' * 1000)}')" | ||
| } | ||
| sql "insert into abs_wide_table values ${wide_rows.join(',')}" | ||
|
|
There was a problem hiding this comment.
The wide-table data load builds a single INSERT statement with 1000 rows of ~3KB string literals (wide_rows.join(',')), which can produce a multi‑MB SQL string. This risks exceeding max query length / max_packet limits in some CI environments and can make the regression suite flaky/slow. Prefer generating the data on the BE side (e.g., INSERT…SELECT from numbers() with repeat()/lpad(), or inserting in smaller batches) to keep the statement size bounded.
| MOCK_FUNCTION size_t preferred_block_size_bytes() const { | ||
| if (_query_options.__isset.preferred_block_size_bytes) { | ||
| auto v = _query_options.preferred_block_size_bytes; | ||
| return v > 0 ? static_cast<size_t>(v) : 0; | ||
| } | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
RuntimeState::preferred_block_size_bytes() returns 0 when the query option is not explicitly set (__isset is false). This makes adaptive batch sizing effectively disabled by default, which is inconsistent with the stated default of 8MB (and also inconsistent with preferred_block_size_rows()/preferred_max_column_in_block_size_bytes(), which do return defaults when unset). Consider returning 8MB when __isset is false (similar to spill_buffer_size_bytes()), so the BE-side default behavior matches the documented defaults even if an older FE doesn’t forward the new field.
|
run buildall |
…control ### What problem does this PR solve? Issue Number: close #xxx Related PR: apache#61535 Problem Summary: Multiple operators (aggregation_source, streaming_aggregation, set_source, distinct_streaming_aggregation, union_source, table_function, nested_loop_join_probe, schema_scan) had identical copy-pasted logic for computing effective max rows from byte budgets, checking within-budget conditions, and computing remaining row capacity. This duplication increases maintenance burden and inconsistency risk. Introduce `BlockBudget` struct in `be/src/common/block_budget.h` that centralizes the 4 common patterns: - `effective_max_rows(estimated_row_bytes)` — Pattern A (5 call sites) - `within_budget(rows, bytes)` — Pattern B (6 call sites) - `exceeded(rows, bytes)` — break conditions - `remaining_rows(rows, bytes)` — Pattern C (2 call sites) Net effect: -33 lines across 8 operator files, replaced with concise single-line calls. All 59 related unit tests pass. ### Release note None ### Check List (For Author) - Test: Unit Test (59 tests including 14 new BlockBudget tests) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
FE UT Coverage ReportIncrement line coverage |
784300c to
fcabf4c
Compare
Problem Summary: The BlockBudget refactoring lacked operator-level byte-budget tests. This commit adds targeted unit tests for aggregation, streaming aggregation, distinct streaming aggregation, set (except), union (const-expr path), and table function operators to verify that the byte budget correctly limits output rows/blocks. None - Test: Unit Test (all 104 operator tests pass including 21 ByteBudget-specific tests) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
fcabf4c to
d8bccbf
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
…tive batch size ### What problem does this PR solve? Issue Number: close #xxx Problem Summary: When the adaptive batch size predictor reduces `block_row_max` before the first `_lazy_init` call, columns are reserved for the reduced prediction (e.g., 256 rows). On subsequent batches, if the predictor increases `block_row_max` (e.g., to 4096), the segment iterator tries to read more rows than the column capacity, causing a heap-buffer-overflow in `ColumnDictI32::insert_many_dict_data`. The fix uses `_initial_block_row_max` (the original unreduced ceiling) for column reservation in `_lazy_init`, ensuring columns always have enough capacity for the maximum possible batch size. ### Release note None ### Check List (For Author) - Test: Unit Test (38 AdaptiveBlockSizePredictor + 29 BlockBudget/ByteBudget tests pass) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Summary
Implement adaptive batch sizing using EWMA (Exponential Weighted Moving Average) prediction so each output Block stays near a configurable byte budget (default 8 MB). This replaces the fixed-row-count approach that causes memory spikes for wide tables and under-utilization for narrow tables.
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
The current fixed-row-count batch sizing (default 4096 rows) does not account for row width. For wide tables with hundreds of columns, a single 4096-row block can consume hundreds of MBs, leading to memory spikes and OOM. For narrow tables, the same block size produces tiny blocks, under-utilizing memory bandwidth and increasing per-block overhead.
This PR introduces dual-dimension control (rows + bytes) across the entire pipeline so that every operator respects both
block_max_rows()andblock_max_bytes()limits.Key Changes
1. Core Predictor (
adaptive_block_size_predictor.{h,cpp})AdaptiveBlockSizePredictorclass with EWMA (alpha=0.9 old, beta=0.1 new)predict_next_rows()returns the predicted row count for the next batch given the byte budgetupdate()feeds actual (rows, bytes) observations to refine the EWMA estimatecompute_metadata_hints()bootstraps from segment column metadata for accurate first-batch estimation2. Runtime State (
runtime_state.h)block_max_rows()-- upper row limit (returnspreferred_block_size_rowswhen byte budget is active, elsebatch_size)block_max_bytes()-- byte budget (0 when adaptive sizing is disabled)preferred_block_size_bytes()/preferred_block_size_rows()accessors3. Storage Layer
next_batch; updates EWMA on success_replace_key_next_block,_unique_key_next_block,_agg_key_next_blockcollected_enough_rows()andestimate_collected_enough()for byte-budget in merge accumulation loops; TopN path returns results in byte-budget-sized chunksblock_row_maxfrompreferred_block_size_rowswhen adaptive is active, allowing the predictor to grow beyond legacybatch_size4. Operator Layer (25 files)
All pipeline operators switch from
batch_size()toblock_max_rows()/block_max_bytes():batch_size_limitparameter tofind_batch/iterate_mapfor thread-safe adaptive control5. Scan/Format Layer
ReaderParams6. FE/Thrift
preferred_block_size_bytes(default 8 MB)preferred_block_size_rows(default 65535)preferred_max_column_in_block_size_bytes(default 1 MB)enable_adaptive_batch_size(mBool, default true)Release note
Added adaptive batch size feature that dynamically adjusts block row counts to target a byte budget (default 8 MB), improving memory efficiency for wide tables and throughput for narrow tables. Controlled by session variables
preferred_block_size_bytes(default 8 MB),preferred_block_size_rows(default 65535), andpreferred_max_column_in_block_size_bytes(default 1 MB), plus BE configenable_adaptive_batch_size(default true). Disable withSET preferred_block_size_bytes = 0orSET enable_adaptive_batch_size = false.Check List (For Author)
adaptive_batch_size.groovy)SET preferred_block_size_bytes = 0.Check List (For Reviewer who merge this PR)