Skip to content

[feature](be) Implement adaptive batch size with EWMA prediction for byte-budget control#61535

Open
mrhhsg wants to merge 7 commits intoapache:masterfrom
mrhhsg:adaptive_batch_size
Open

[feature](be) Implement adaptive batch size with EWMA prediction for byte-budget control#61535
mrhhsg wants to merge 7 commits intoapache:masterfrom
mrhhsg:adaptive_batch_size

Conversation

@mrhhsg
Copy link
Copy Markdown
Member

@mrhhsg mrhhsg commented Mar 20, 2026

Summary

Implement adaptive batch sizing using EWMA (Exponential Weighted Moving Average) prediction so each output Block stays near a configurable byte budget (default 8 MB). This replaces the fixed-row-count approach that causes memory spikes for wide tables and under-utilization for narrow tables.

What problem does this PR solve?

Issue Number: close #xxx

Problem Summary:

The current fixed-row-count batch sizing (default 4096 rows) does not account for row width. For wide tables with hundreds of columns, a single 4096-row block can consume hundreds of MBs, leading to memory spikes and OOM. For narrow tables, the same block size produces tiny blocks, under-utilizing memory bandwidth and increasing per-block overhead.

This PR introduces dual-dimension control (rows + bytes) across the entire pipeline so that every operator respects both block_max_rows() and block_max_bytes() limits.

Key Changes

1. Core Predictor (adaptive_block_size_predictor.{h,cpp})

  • New AdaptiveBlockSizePredictor class with EWMA (alpha=0.9 old, beta=0.1 new)
  • predict_next_rows() returns the predicted row count for the next batch given the byte budget
  • update() feeds actual (rows, bytes) observations to refine the EWMA estimate
  • compute_metadata_hints() bootstraps from segment column metadata for accurate first-batch estimation

2. Runtime State (runtime_state.h)

  • block_max_rows() -- upper row limit (returns preferred_block_size_rows when byte budget is active, else batch_size)
  • block_max_bytes() -- byte budget (0 when adaptive sizing is disabled)
  • preferred_block_size_bytes() / preferred_block_size_rows() accessors

3. Storage Layer

  • SegmentIterator: Creates predictor from segment footer metadata; predicts rows before each next_batch; updates EWMA on success
  • BlockReader: Byte-budget stop condition in _replace_key_next_block, _unique_key_next_block, _agg_key_next_block
  • VCollectIterator: collected_enough_rows() and estimate_collected_enough() for byte-budget in merge accumulation loops; TopN path returns results in byte-budget-sized chunks
  • BetaRowsetReader: Seeds block_row_max from preferred_block_size_rows when adaptive is active, allowing the predictor to grow beyond legacy batch_size

4. Operator Layer (25 files)
All pipeline operators switch from batch_size() to block_max_rows() / block_max_bytes():

  • Aggregation (source, streaming, distinct streaming): Byte-budget output control; distinct streaming agg split respects byte budget
  • Hash Join Probe: Pre-estimation from both build and probe sides; batch_size_limit parameter to find_batch/iterate_map for thread-safe adaptive control
  • Nested Loop Join: Byte-budget aware output accumulation
  • Sort (full sort, partition sort, merge sort, spill sort): Byte-budget early exit; single-run fast path disabled when byte budget active
  • Set/Union/Exchange/Table Function/Schema Scan: Dual row+byte control

5. Scan/Format Layer

  • FileScanner: EWMA integration for all format readers (CSV, JSON, ORC, Parquet)
  • OlapScanner: Propagates byte budget to storage layer via ReaderParams

6. FE/Thrift

  • Three new session variables:
    • preferred_block_size_bytes (default 8 MB)
    • preferred_block_size_rows (default 65535)
    • preferred_max_column_in_block_size_bytes (default 1 MB)
  • BE config: enable_adaptive_batch_size (mBool, default true)
  • Thrift fields 212/213/214 in TQueryOptions for FE->BE propagation

Release note

Added adaptive batch size feature that dynamically adjusts block row counts to target a byte budget (default 8 MB), improving memory efficiency for wide tables and throughput for narrow tables. Controlled by session variables preferred_block_size_bytes (default 8 MB), preferred_block_size_rows (default 65535), and preferred_max_column_in_block_size_bytes (default 1 MB), plus BE config enable_adaptive_batch_size (default true). Disable with SET preferred_block_size_bytes = 0 or SET enable_adaptive_batch_size = false.

Check List (For Author)

  • Test
    • Unit Test (183+ tests across 12 suites: predictor, sort, hash join, scan, format readers, storage iterators, operator tests)
    • Regression test (adaptive_batch_size.groovy)
  • Behavior changed:
    • Yes. Block sizes are now adaptively controlled by byte budget when enabled. Disable with SET preferred_block_size_bytes = 0.
  • Does this need documentation:
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Mar 20, 2026

run buildall

@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Mar 20, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts SegmentIterator chunk row counts to target a preferred block byte size. The design is reasonable and the overall approach is sound. However, there are several issues that need to be addressed.

Critical Checkpoints

Goal and correctness: The feature aims to dynamically adjust batch sizes to target byte-based block sizes instead of fixed row counts. The implementation adds prediction at the SegmentIterator level and byte-budget stops at BlockReader and VCollectIterator. There are correctness bugs in the record_rowids path (see inline comments).

Concurrency: The predictor is single-threaded per instance and used within a single SegmentIterator — no concurrency concern.

Lifecycle / SIOF: No static initialization order issues. The predictor is created as a unique_ptr member.

Config items: enable_adaptive_batch_size is dynamic (mutable), which is appropriate. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are correctly propagated via Thrift.

Incompatible changes: New Thrift field IDs (210, 211) have defaults matching the session variable defaults. Backward compatible.

Parallel code paths: The byte-budget check is added to _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and VCollectIterator::_merge_next. The _direct_next_block path (DUP KEY) relies on the SegmentIterator-level predictor limiting block_row_max, which is correct by design.

Test coverage: Regression tests cover DUP, AGG, and UNIQUE key table types. Unit tests are thorough for the predictor class itself. However, no test covers the record_rowids path which has bugs.

Observability: Profile counters AdaptiveBatchPredictMinRows and AdaptiveBatchPredictMaxRows are added. However, a LOG(INFO) debug log is left in hot path (see inline).

Performance: Block::columns_byte_size() is called per-row in some paths. Each call is O(num_columns) with O(1) per column. Acceptable for typical column counts but could be a concern for very wide tables (hundreds of columns).

Issues Found

  1. [Bug] _replace_key_next_block and _unique_key_next_block: Missing _block_row_locations.resize(target_block_row) when byte-budget triggers break with record_rowids enabled. This will cause the DCHECK at line 604 to fail and potentially return stale/incorrect row locations.

  2. [Bug/Severity:High] LOG(INFO) with ****** prefix on every predict_next_rows() call — this is a debug log accidentally left in production code. It will spam logs on every batch of every segment scan.

  3. [Minor] The _replace_key_next_block function does not call block->set_columns() after mutating columns — this is pre-existing behavior but worth noting for consistency.

  4. [Observation] In VCollectIterator::_merge_next, the byte-budget check uses Block::columns_byte_size(target_columns) before flushing continuous_row_in_block pending rows, so the actual output block can exceed the budget. This is a design choice (conservative), not a correctness bug.

static_cast<size_t>(static_cast<double>(_block_size_bytes) / estimated_bytes_per_row);
LOG(INFO) << "****** predicted rows: " << predicted
<< ", estimated_bytes_per_row: " << estimated_bytes_per_row
<< ", _block_size_bytes: " << _block_size_bytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug/High] This LOG(INFO) with ****** prefix is called on every predict_next_rows() invocation — i.e., every batch in every segment scan. This will generate massive log spam in production.

This should be changed to VLOG_DEBUG or removed entirely. At minimum, this is clearly a debug statement accidentally left in:

VLOG_DEBUG << "predicted rows: " << predicted
           << ", estimated_bytes_per_row: " << estimated_bytes_per_row
           << ", _block_size_bytes: " << _block_size_bytes;

Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
break;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] When record_rowids is enabled, this byte-budget break exits the outer while loop without resizing _block_row_locations to target_block_row. At line 349, _block_row_locations was resized to batch_size(), so after this break it will still have batch_size() entries instead of target_block_row entries.

The EOF path at lines 372-374 correctly does _block_row_locations.resize(target_block_row). This break path needs the same treatment:

if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
    Block::columns_byte_size(target_columns) >=
            _reader_context.preferred_block_size_bytes) {
    if (UNLIKELY(_reader_context.record_rowids)) {
        _block_row_locations.resize(target_block_row);
    }
    break;
}

Without this fix, current_block_row_locations() returns a vector with stale entries, and the DCHECK at line 604 (DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count)) will fail.

Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
break;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] Same issue as in _replace_key_next_block: when record_rowids is enabled, this byte-budget break exits without resizing _block_row_locations to target_block_row. At line 530, _block_row_locations was resized to _reader_context.batch_size. The EOF path at lines 548-550 correctly resizes.

Fix:

if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
    Block::columns_byte_size(target_columns) >=
            _reader_context.preferred_block_size_bytes) {
    if (UNLIKELY(_reader_context.record_rowids)) {
        _block_row_locations.resize(target_block_row);
    }
    break;
}

Note: in _unique_key_next_block, there is also the _delete_sign_available filtering path (starting at line 566) that uses target_block_row and _block_row_locations. An incorrectly-sized _block_row_locations could corrupt the filter logic.

@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 30963 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit daebf3f9ccd3a062258f75fb387de4d89c7ecce6, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17671	4802	4775	4775
q2	q3	10807	879	646	646
q4	4755	454	351	351
q5	8228	1316	1095	1095
q6	258	209	176	176
q7	950	934	776	776
q8	10868	1770	1663	1663
q9	7254	5324	5288	5288
q10	6476	2021	1901	1901
q11	487	320	294	294
q12	839	671	526	526
q13	18097	3025	2203	2203
q14	236	234	218	218
q15	q16	789	777	704	704
q17	935	819	758	758
q18	6446	5588	5519	5519
q19	1231	1428	1089	1089
q20	676	602	476	476
q21	5538	2505	2175	2175
q22	431	355	330	330
Total cold run time: 102972 ms
Total hot run time: 30963 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	5106	5051	5083	5051
q2	q3	4094	4498	4007	4007
q4	1364	1465	1080	1080
q5	4535	4723	4680	4680
q6	244	215	178	178
q7	1910	1680	1541	1541
q8	2860	3070	2966	2966
q9	7980	7939	7658	7658
q10	4027	4178	3874	3874
q11	614	525	477	477
q12	615	700	528	528
q13	3018	3319	2470	2470
q14	304	311	295	295
q15	q16	774	811	747	747
q17	1395	1560	1468	1468
q18	7783	7183	6930	6930
q19	1284	1334	1259	1259
q20	2104	2347	2038	2038
q21	4969	4316	4150	4150
q22	516	445	422	422
Total cold run time: 55496 ms
Total hot run time: 51819 ms

@doris-robot
Copy link
Copy Markdown

TPC-DS: Total hot run time: 173281 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit daebf3f9ccd3a062258f75fb387de4d89c7ecce6, data reload: false

query5	4316	650	495	495
query6	335	228	226	226
query7	4249	527	281	281
query8	342	255	234	234
query9	9250	5274	5237	5237
query10	501	420	341	341
query11	7061	5150	4884	4884
query12	188	131	122	122
query13	1289	479	367	367
query14	5878	3947	3605	3605
query14_1	2995	3015	3003	3003
query15	209	200	180	180
query16	996	490	437	437
query17	916	757	662	662
query18	2436	488	357	357
query19	237	235	192	192
query20	136	131	129	129
query21	231	132	117	117
query22	13184	13369	13319	13319
query23	15963	15577	15361	15361
query23_1	15545	15514	15426	15426
query24	7340	1779	1334	1334
query24_1	1316	1310	1329	1310
query25	553	470	428	428
query26	1240	279	171	171
query27	2781	501	324	324
query28	4664	2704	2662	2662
query29	868	599	510	510
query30	312	234	193	193
query31	1053	965	882	882
query32	86	80	75	75
query33	513	346	288	288
query34	930	938	594	594
query35	678	692	606	606
query36	1120	1159	956	956
query37	138	104	86	86
query38	2973	3021	2889	2889
query39	869	830	814	814
query39_1	793	788	803	788
query40	233	153	146	146
query41	69	63	63	63
query42	273	265	271	265
query43	258	264	227	227
query44	
query45	197	196	185	185
query46	967	1071	672	672
query47	2165	2187	2105	2105
query48	345	345	250	250
query49	646	469	395	395
query50	744	288	230	230
query51	4105	4090	4093	4090
query52	269	266	263	263
query53	314	353	309	309
query54	326	276	308	276
query55	92	87	89	87
query56	328	344	309	309
query57	1959	1825	1706	1706
query58	287	275	278	275
query59	2881	3060	2789	2789
query60	353	351	327	327
query61	158	158	186	158
query62	628	598	548	548
query63	323	304	287	287
query64	5107	1301	1007	1007
query65	
query66	1474	486	372	372
query67	24257	24455	24139	24139
query68	
query69	476	333	304	304
query70	937	1051	1003	1003
query71	350	310	306	306
query72	2814	2677	2423	2423
query73	597	601	374	374
query74	9644	9612	9419	9419
query75	3028	2901	2560	2560
query76	2342	1169	855	855
query77	404	400	318	318
query78	11103	11115	10530	10530
query79	1204	824	615	615
query80	729	648	556	556
query81	480	262	227	227
query82	1303	155	131	131
query83	353	274	263	263
query84	299	126	114	114
query85	872	518	480	480
query86	395	304	291	291
query87	3158	3122	3026	3026
query88	4341	3346	3346	3346
query89	460	390	353	353
query90	1969	208	196	196
query91	174	166	147	147
query92	78	80	77	77
query93	1020	933	594	594
query94	461	331	307	307
query95	605	358	340	340
query96	726	564	266	266
query97	2475	2502	2469	2469
query98	259	227	222	222
query99	1027	1034	914	914
Total cold run time: 251402 ms
Total hot run time: 173281 ms

@doris-robot
Copy link
Copy Markdown

BE UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.71% (19793/37554)
Line Coverage 36.23% (184854/510166)
Region Coverage 32.50% (143146/440485)
Branch Coverage 33.66% (62584/185927)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.32% (26944/36750)
Line Coverage 56.73% (288414/508437)
Region Coverage 53.94% (239774/444521)
Branch Coverage 55.74% (103911/186407)

@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from daebf3f to f404b06 Compare March 20, 2026 07:06
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Mar 20, 2026

run buildall

@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Mar 20, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary: feat Implement adaptive batch size for SegmentIterator

This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts chunk row counts so each output Block stays near a configurable byte target. The overall design is sound, with proper plumbing from FE session variables through Thrift to the storage layer. The byte-budget checks in BlockReader and VCollectIterator are placed at safe boundaries.

However, I found several issues ranging from a unit test correctness bug to regression test standard violations.

Critical Checkpoints

1. Goal & Correctness: The PR achieves its stated goal of adaptive batch sizing. The EWMA predictor logic is correct. Byte-budget stops in _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and _merge_next are all placed at safe row boundaries. No data correctness issues found in the core logic.

2. Focused & Minimal: The change is reasonably focused. Adding new session variables, config flags, thrift fields, the predictor, and byte-budget stops across multiple layers is inherent to the feature.

3. Concurrency: The TOCTOU race in _collect_profile_before_close for min/max counter updates is benign (display-only counters) but should be documented or fixed. See inline comment.

4. Lifecycle: No lifecycle issues. The predictor is owned by SegmentIterator and destroyed with it.

5. Configuration: enable_adaptive_batch_size is mutable and appropriately documented. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are properly forwarded.

6. Incompatible changes: Thrift fields 210/211 are optional with correct IDs. No compatibility issues. (Note: PR description incorrectly states fields 204/205.)

7. Parallel code paths: The three BlockReader paths (_replace_key, _agg_key, _unique_key) are all covered. VCollectIterator merge path is covered. Compaction paths are not affected (they don't set preferred_block_size_bytes).

8. Test coverage: Unit tests are comprehensive (19+ test cases) but have a critical mock bug that makes metadata-hint tests read indeterminate memory. Regression tests have standard violations. See inline comments.

9. Observability: Profile counters AdaptiveBatchPredictMinRows/MaxRows are added. The else branch stats overwrite is problematic. See inline comment.

10. Performance: Block::columns_byte_size() is O(num_columns) per call (all byte_size() implementations are O(1)). Called per-row in _unique_key_next_block and _merge_next, this adds measurable but acceptable overhead for wide tables.

Issues Found

# Severity File Issue
1 High Unit test MockSegment::num_rows() mock is broken — Segment::num_rows() is non-virtual, so ON_CALL has no effect through Segment& reference. Tests using metadata hints read indeterminate _num_rows.
2 Medium Regression test Uses qt_ prefix instead of order_qt_ (violates regression test standards)
3 Medium Regression test Drops 4 of 5 tables after use (should only drop before, to preserve debug state)
4 Low olap_scanner.cpp TOCTOU race on min/max counter updates (benign for display-only counters)
5 Low segment_iterator.cpp else branch overwrites stats every call when predictor is inactive, producing misleading values
6 Info PR description States thrift fields 204/205 but actual code uses 210/211


// Set up num_rows mock.
ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (High): Segment::num_rows() is non-virtual (declared as uint32_t num_rows() const { return _num_rows; } in segment.h:119). The ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows)) configures gmock's virtual mock method, but when predict_next_rows() calls segment.num_rows() through a Segment& reference, the compiler statically dispatches to the base class's non-virtual method, bypassing the mock entirely.

The actual value returned will be whatever _num_rows was left at by the base Segment constructor — which does not initialize _num_rows in its member initializer list (see segment.cpp:168-174). This means tests like PredictNoHistoryMetadataHint are reading indeterminate memory (undefined behavior).

Fix: Since MockSegment is declared as a friend of Segment, you can directly set the member:

seg->_num_rows = num_rows;

instead of (or in addition to) the ON_CALL mock.


if (stats.adaptive_batch_size_predict_max_rows > 0) {
auto cur_min = local_state->_adaptive_batch_predict_min_rows_counter->value();
if (cur_min == 0 || stats.adaptive_batch_size_predict_min_rows < cur_min) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (Low): This read-check-store pattern on shared RuntimeProfile::Counter is a classic TOCTOU race. Multiple scanners call _collect_profile_before_close concurrently on the same local_state counters. Thread A can read a stale cur_min, pass the check, and overwrite thread B's correct value.

Since these are display-only profile counters, this is benign — the worst outcome is a slightly inaccurate min/max in the query profile. But consider documenting this or using COUNTER_UPDATE with atomic min/max if RuntimeProfile::Counter supports it.

def res_enabled = sql "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4"

qt_wide "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4 limit 50"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standards violation (Medium): Per regression test standards, use order_qt_ prefix instead of qt_ to ensure deterministic ordered output. This applies to all query tags in this file (qt_wide, qt_narrow, qt_agg, qt_unique, qt_flag).

While some of these queries have explicit ORDER BY or return single rows, the standard convention is to consistently use order_qt_ prefix.

// toward returning close to max_rows (batch is still row-limited).

sql "drop table if exists abs_narrow_table"
sql """
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standards violation (Medium): Per regression test standards: "After completing tests, do not drop tables; instead drop tables before using them in tests, to preserve the environment for debugging."

This drop table after use (and similar ones for abs_agg_table, abs_unique_table, abs_flag_table) should be removed. The drop table if exists before CREATE TABLE at the beginning of each test case is the correct pattern (and is already present).

static_cast<int64_t>(predicted));
} else {
_opts.stats->adaptive_batch_size_predict_min_rows = _opts.block_row_max;
_opts.stats->adaptive_batch_size_predict_max_rows = _opts.block_row_max;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (Low): When _block_size_predictor is null (feature disabled), this else branch unconditionally overwrites adaptive_batch_size_predict_min_rows and adaptive_batch_size_predict_max_rows with _opts.block_row_max on every next_batch() call. Since OlapReaderStatistics is shared across segment iterators for the same scanner, and adaptive_batch_size_predict_min_rows is initialized to INT64_MAX, the first segment iterator's overwrite clobbers the sentinel.

This means the profile counters will show misleading values when the feature is disabled. Consider only setting these once, or guarding with a check (e.g., only set if currently INT64_MAX / 0).

@doris-robot
Copy link
Copy Markdown

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.63% (1796/2284)
Line Coverage 64.38% (32274/50130)
Region Coverage 65.27% (16162/24760)
Branch Coverage 55.71% (8611/15456)

@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 27323 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit f404b067c8aa7622413fd7d269235aaccd195121, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17655	4402	4359	4359
q2	q3	11076	846	550	550
q4	5270	389	259	259
q5	9358	1253	1020	1020
q6	252	179	151	151
q7	833	892	691	691
q8	10989	1566	1438	1438
q9	6834	4874	4905	4874
q10	6372	1930	1703	1703
q11	471	255	242	242
q12	746	586	471	471
q13	18060	2948	2210	2210
q14	231	236	205	205
q15	q16	747	763	684	684
q17	743	876	469	469
q18	6012	5388	5145	5145
q19	1117	997	638	638
q20	543	482	377	377
q21	4650	2002	1555	1555
q22	371	327	282	282
Total cold run time: 102330 ms
Total hot run time: 27323 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4645	4569	4703	4569
q2	q3	3992	4406	3881	3881
q4	943	1227	847	847
q5	4215	4459	4331	4331
q6	197	191	150	150
q7	1868	1676	1584	1584
q8	2544	2745	2608	2608
q9	7604	7456	7373	7373
q10	3857	4110	3620	3620
q11	564	515	484	484
q12	508	607	501	501
q13	2764	3188	2341	2341
q14	300	306	282	282
q15	q16	739	793	832	793
q17	1375	1433	1422	1422
q18	7227	6874	6555	6555
q19	1041	989	989	989
q20	2113	2190	2015	2015
q21	4021	3561	3439	3439
q22	452	426	397	397
Total cold run time: 50969 ms
Total hot run time: 48181 ms

@doris-robot
Copy link
Copy Markdown

TPC-DS: Total hot run time: 169237 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit f404b067c8aa7622413fd7d269235aaccd195121, data reload: false

query5	4328	658	511	511
query6	339	234	209	209
query7	4225	503	273	273
query8	357	255	238	238
query9	8698	2777	2790	2777
query10	514	391	349	349
query11	6983	5116	4889	4889
query12	195	133	132	132
query13	1279	462	362	362
query14	5762	3839	3592	3592
query14_1	2908	2900	2917	2900
query15	211	188	176	176
query16	972	413	452	413
query17	879	712	607	607
query18	2439	459	341	341
query19	231	225	194	194
query20	138	127	128	127
query21	214	135	114	114
query22	13266	14164	14916	14164
query23	16166	15714	15584	15584
query23_1	15849	15782	15826	15782
query24	7273	1668	1244	1244
query24_1	1243	1251	1242	1242
query25	562	469	410	410
query26	1255	274	154	154
query27	2768	507	305	305
query28	4477	1897	1859	1859
query29	841	566	499	499
query30	309	216	196	196
query31	1008	948	876	876
query32	88	74	70	70
query33	495	349	287	287
query34	913	903	543	543
query35	644	686	620	620
query36	1087	1108	927	927
query37	137	96	84	84
query38	2946	2916	2867	2867
query39	856	827	804	804
query39_1	786	786	796	786
query40	245	158	141	141
query41	63	60	58	58
query42	263	257	260	257
query43	248	274	257	257
query44	
query45	200	196	187	187
query46	945	1019	633	633
query47	2099	2123	2026	2026
query48	319	322	231	231
query49	625	464	400	400
query50	717	289	224	224
query51	4164	4042	4004	4004
query52	266	266	257	257
query53	307	344	291	291
query54	315	285	273	273
query55	99	90	90	90
query56	320	328	317	317
query57	1906	1855	1658	1658
query58	294	287	272	272
query59	2822	2956	2743	2743
query60	363	344	335	335
query61	152	154	155	154
query62	631	584	529	529
query63	324	296	287	287
query64	5029	1298	1058	1058
query65	
query66	1488	493	398	398
query67	24209	24298	24147	24147
query68	
query69	422	316	297	297
query70	970	988	931	931
query71	366	322	306	306
query72	3170	2883	2363	2363
query73	562	573	332	332
query74	9597	9540	9387	9387
query75	2892	2778	2490	2490
query76	2292	1098	728	728
query77	378	415	330	330
query78	10970	11041	10476	10476
query79	3115	781	580	580
query80	1748	655	548	548
query81	576	268	226	226
query82	978	157	122	122
query83	339	269	255	255
query84	302	118	110	110
query85	948	500	477	477
query86	499	307	302	302
query87	3144	3130	3005	3005
query88	3685	2684	2686	2684
query89	435	386	360	360
query90	2024	187	189	187
query91	181	166	138	138
query92	81	80	71	71
query93	1816	846	505	505
query94	650	332	281	281
query95	594	415	326	326
query96	654	540	233	233
query97	2469	2501	2423	2423
query98	250	221	218	218
query99	1018	1005	931	931
Total cold run time: 253570 ms
Total hot run time: 169237 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.85% (19877/37610)
Line Coverage 36.36% (185718/510836)
Region Coverage 32.61% (143863/441137)
Branch Coverage 33.79% (62960/186314)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.32% (26985/36806)
Line Coverage 56.83% (289314/509111)
Region Coverage 54.07% (240723/445178)
Branch Coverage 55.85% (104336/186800)

@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from f404b06 to ae43fe9 Compare March 20, 2026 14:21
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Mar 20, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.63% (1796/2284)
Line Coverage 64.36% (32264/50130)
Region Coverage 65.24% (16154/24760)
Branch Coverage 55.65% (8602/15456)

@doris-robot
Copy link
Copy Markdown

BE UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.80% (19860/37616)
Line Coverage 36.28% (185340/510878)
Region Coverage 32.54% (143556/441152)
Branch Coverage 33.73% (62850/186327)

@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 26528 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit ae43fe96d6a20040c16d54f6ee49141576a11c2a, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	16921	4568	4330	4330
q2	q3	10395	787	521	521
q4	4687	356	257	257
q5	7572	1217	995	995
q6	168	172	146	146
q7	785	852	673	673
q8	9335	1455	1313	1313
q9	4828	4768	4620	4620
q10	6236	1918	1617	1617
q11	448	265	243	243
q12	694	582	468	468
q13	18037	2983	2184	2184
q14	234	234	216	216
q15	q16	735	745	689	689
q17	724	844	427	427
q18	5822	5439	5196	5196
q19	1115	981	617	617
q20	528	495	365	365
q21	4411	1855	1413	1413
q22	335	297	238	238
Total cold run time: 94010 ms
Total hot run time: 26528 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4890	4658	4594	4594
q2	q3	3900	4490	3870	3870
q4	905	1212	779	779
q5	4058	4350	4343	4343
q6	193	181	145	145
q7	1775	1660	1651	1651
q8	2575	2705	2561	2561
q9	7498	7300	7357	7300
q10	3737	3983	3606	3606
q11	510	450	449	449
q12	503	635	475	475
q13	2721	3157	2469	2469
q14	295	303	282	282
q15	q16	739	836	742	742
q17	1217	1318	1415	1318
q18	7172	6692	6694	6692
q19	896	876	896	876
q20	2061	2130	2011	2011
q21	3995	3533	3365	3365
q22	494	438	391	391
Total cold run time: 50134 ms
Total hot run time: 47919 ms

@doris-robot
Copy link
Copy Markdown

TPC-DS: Total hot run time: 169678 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit ae43fe96d6a20040c16d54f6ee49141576a11c2a, data reload: false

query5	4311	670	499	499
query6	340	232	235	232
query7	4219	492	266	266
query8	334	237	226	226
query9	8742	2742	2713	2713
query10	539	414	374	374
query11	6986	5083	4861	4861
query12	189	131	127	127
query13	1280	475	349	349
query14	5737	3760	3545	3545
query14_1	2939	2858	2883	2858
query15	209	200	176	176
query16	982	472	477	472
query17	924	744	642	642
query18	2448	459	351	351
query19	219	226	195	195
query20	133	127	129	127
query21	215	140	112	112
query22	13437	14426	14713	14426
query23	16117	15651	15646	15646
query23_1	15909	15702	15816	15702
query24	7297	1620	1240	1240
query24_1	1234	1239	1279	1239
query25	559	498	404	404
query26	1237	268	151	151
query27	2774	487	305	305
query28	4345	1861	1865	1861
query29	820	566	488	488
query30	310	224	196	196
query31	996	952	903	903
query32	80	71	71	71
query33	515	346	293	293
query34	893	879	547	547
query35	646	703	592	592
query36	1074	1104	986	986
query37	141	100	82	82
query38	2937	2940	2900	2900
query39	849	822	811	811
query39_1	793	794	809	794
query40	237	157	140	140
query41	63	59	61	59
query42	260	261	255	255
query43	256	248	229	229
query44	
query45	199	193	186	186
query46	875	997	605	605
query47	2495	2130	2072	2072
query48	308	332	234	234
query49	635	471	393	393
query50	693	281	221	221
query51	4027	3958	4048	3958
query52	263	270	260	260
query53	297	348	293	293
query54	311	275	284	275
query55	100	88	87	87
query56	320	336	346	336
query57	1888	1916	1708	1708
query58	285	280	274	274
query59	2783	2967	2773	2773
query60	352	346	328	328
query61	161	161	162	161
query62	638	581	511	511
query63	315	287	294	287
query64	4882	1302	1006	1006
query65	
query66	1467	463	390	390
query67	24220	24234	24122	24122
query68	
query69	418	321	304	304
query70	993	962	946	946
query71	360	316	309	309
query72	3077	2928	2659	2659
query73	556	559	327	327
query74	9567	9619	9383	9383
query75	2884	2801	2468	2468
query76	2155	1052	695	695
query77	373	432	317	317
query78	10875	11020	10521	10521
query79	2691	762	589	589
query80	1792	675	574	574
query81	564	262	233	233
query82	965	152	121	121
query83	338	271	250	250
query84	303	130	103	103
query85	921	506	477	477
query86	419	317	279	279
query87	3141	3175	3028	3028
query88	3631	2686	2677	2677
query89	429	377	348	348
query90	2038	193	188	188
query91	174	165	137	137
query92	87	74	84	74
query93	1367	822	523	523
query94	638	280	292	280
query95	600	398	325	325
query96	648	532	235	235
query97	2442	2487	2398	2398
query98	248	225	217	217
query99	1029	1008	919	919
Total cold run time: 252034 ms
Total hot run time: 169678 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.19% (26943/36811)
Line Coverage 56.69% (288618/509150)
Region Coverage 54.00% (240384/445189)
Branch Coverage 55.67% (103990/186813)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.18% (26940/36811)
Line Coverage 56.68% (288581/509150)
Region Coverage 53.98% (240333/445189)
Branch Coverage 55.66% (103972/186813)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.19% (26941/36811)
Line Coverage 56.68% (288599/509150)
Region Coverage 53.98% (240297/445189)
Branch Coverage 55.66% (103987/186813)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.19% (26941/36811)
Line Coverage 56.68% (288586/509150)
Region Coverage 53.99% (240339/445189)
Branch Coverage 55.66% (103977/186813)

RuntimeProfile::Counter* _variant_doc_value_column_iter_count = nullptr;

RuntimeProfile::Counter* _adaptive_batch_predict_min_rows_counter = nullptr;
RuntimeProfile::Counter* _adaptive_batch_predict_max_rows_counter = nullptr;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use HighWaterMarkCounter

DEFINE_mBool(enable_low_cardinality_optimize, "true");
DEFINE_Bool(enable_low_cardinality_cache_code, "true");

DEFINE_mBool(enable_adaptive_batch_size, "true");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a session variable instead ?

auto v = _query_options.preferred_block_size_bytes;
return v > 0 ? static_cast<size_t>(v) : 0;
}
return 8388608UL; // 8MB default
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not enable this feature by default

}
// Byte-budget check: after the inner loop _next_row is either EOF or the next different
// key, so it is safe to stop accumulating here without repeating any row.
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract a common function to do this checking

// Adaptive batch size: target output block byte budget. 0 = disabled.
size_t preferred_block_size_bytes = 0;
// Per-column byte limit for the adaptive predictor. 0 = no column limit.
size_t preferred_max_col_bytes = 1048576;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 by default?

}
}
if (matched_cols > 0 && total_bytes > 0.0) {
_metadata_hint_bytes_per_row = (total_bytes / static_cast<double>(seg_rows)) * 1.2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does 1.2 mean?

@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from ae43fe9 to 841c2a7 Compare March 25, 2026 15:50
@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 36837 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit a6852d1b3244bc2b4b0e0ed25aad98b25f80599e, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17760	9386	9498	9386
q2	q3	10667	852	599	599
q4	4672	417	310	310
q5	7589	1348	1156	1156
q6	222	194	168	168
q7	934	976	789	789
q8	9928	1467	1364	1364
q9	5460	5227	5195	5195
q10	6309	2371	2104	2104
q11	484	295	294	294
q12	649	391	245	245
q13	18242	3650	3055	3055
q14	315	328	296	296
q15	q16	1074	1043	989	989
q17	956	1174	752	752
q18	7221	7025	6714	6714
q19	1396	1311	1084	1084
q20	578	446	281	281
q21	4815	2182	1724	1724
q22	462	384	332	332
Total cold run time: 99733 ms
Total hot run time: 36837 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	10217	10202	10009	10009
q2	q3	5971	6426	5729	5729
q4	1618	1995	1303	1303
q5	5738	5869	6253	5869
q6	294	208	167	167
q7	2566	2433	2187	2187
q8	4803	4727	4784	4727
q9	10611	10619	10522	10522
q10	5231	5227	5101	5101
q11	912	682	630	630
q12	680	790	582	582
q13	3580	3914	3219	3219
q14	378	392	357	357
q15	q16	1041	1125	1020	1020
q17	1361	1431	1403	1403
q18	9599	9011	8803	8803
q19	1119	1212	1101	1101
q20	2764	2922	2704	2704
q21	6328	5839	5385	5385
q22	482	450	394	394
Total cold run time: 75293 ms
Total hot run time: 71212 ms

@doris-robot
Copy link
Copy Markdown

BE UT Coverage Report

Increment line coverage 57.73% (392/679) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 53.01% (20112/37942)
Line Coverage 36.58% (188929/516450)
Region Coverage 32.86% (146792/446773)
Branch Coverage 33.95% (64221/189142)

@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from a6852d1 to c8bc656 Compare April 7, 2026 03:24
@mrhhsg mrhhsg changed the title [feat](storage) Implement adaptive batch size for SegmentIterator [feature](be) Implement adaptive batch size with EWMA prediction for byte-budget control Apr 7, 2026
…byte-budget control

Issue Number: close #xxx

Problem Summary:

Fixed-row-count batch sizing causes memory spikes for wide tables and
under-utilization for narrow tables. This PR adds adaptive batch sizing
that dynamically adjusts row counts using EWMA (Exponential Weighted
Moving Average) prediction so that each output block stays near a
configurable byte budget (default 8MB).

**Key changes:**

1. **Storage layer**: New `AdaptiveBlockSizePredictor` class in
   SegmentIterator uses segment column metadata for initial estimation
   and EWMA for runtime adjustment. BlockReader and VCollectIterator
   gain byte-budget awareness.

2. **Operator layer**: All pipeline operators (aggregation, sort, join,
   set, union, exchange, table function, schema scan) switch from
   `batch_size()` to `block_max_rows()` / `block_max_bytes()` for
   dual row+byte control. Hash join probe adds per-instance
   `batch_size_limit` parameter to `find_batch` for thread-safe
   adaptive control without modifying shared hash table state.

3. **Scan/Format layer**: FileScanner integrates EWMA prediction for
   CSV, JSON, ORC, and Parquet readers. OlapScanner propagates byte
   budget to storage layer.

4. **FE/Thrift**: Three new session variables (`preferred_block_size_bytes`,
   `preferred_block_size_rows`, `preferred_max_column_in_block_size_bytes`)
   with BE config `enable_adaptive_batch_size` (mBool, default true).

Added adaptive batch size feature that dynamically adjusts block row
counts to target a byte budget (default 8MB), improving memory
efficiency for wide tables and throughput for narrow tables. Controlled
by session variables `preferred_block_size_bytes` (default 8MB),
`preferred_block_size_rows` (default 65535), and
`preferred_max_column_in_block_size_bytes` (default 1MB), plus BE
config `enable_adaptive_batch_size` (default true).

- Test: Unit Test (full BE UT pass) / Regression test (adaptive_batch_size suite)
- Behavior changed: Yes - block sizes are now adaptively controlled by byte budget when enabled; disable with SET enable_adaptive_batch_size=false or preferred_block_size_bytes=0
- Does this need documentation: Yes

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from c8bc656 to 661e4ee Compare April 7, 2026 04:47
@mrhhsg mrhhsg requested a review from Copilot April 7, 2026 07:49
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Apr 7, 2026

/review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 98 out of 98 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +255 to +263
// Return the effective maximum row count for output blocks.
// When adaptive batch sizing is enabled, prefer preferred_block_size_rows;
// otherwise fall back to the traditional batch_size.
size_t batch_max_rows() const {
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_rows > 0) {
return _reader_context.preferred_block_size_rows;
}
return _reader_context.batch_size;
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_max_rows() switches to preferred_block_size_rows whenever enable_adaptive_batch_size is true, regardless of whether preferred_block_size_bytes is 0. If preferred_block_size_bytes=0 is intended to fully disable adaptive sizing (per PR description), this still changes behavior vs the legacy fixed batch_size row cap. Consider also gating the preferred_block_size_rows branch on preferred_block_size_bytes>0 (or explicitly documenting that rows-only expansion remains enabled when bytes is 0).

Copilot uses AI. Check for mistakes.
assertEquals(res_enabled[i].toString(), res_disabled[i].toString())
}

// sql "drop table abs_wide_table"
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abs_wide_table is created and populated, but the DROP at the end of Test 1 is commented out. Leaving the table behind can pollute the test environment and potentially affect reruns or later suites. Consider dropping abs_wide_table (like the other test tables) during cleanup.

Copilot uses AI. Check for mistakes.
Comment on lines +142 to +146
MOCK_FUNCTION int block_max_rows() const {
return config::enable_adaptive_batch_size
? std::max(_query_options.batch_size, int(preferred_block_size_rows()))
: _query_options.batch_size;
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_max_rows() is currently gated only by config::enable_adaptive_batch_size, not by whether the query’s byte budget is enabled. As a result, preferred_block_size_bytes = 0 (the documented disable switch) still leaves block_max_rows() potentially much larger than batch_size (e.g. 65535), which can change legacy behavior and risk large blocks for wide rows. Consider gating the preferred_block_size_rows path on preferred_block_size_bytes()>0 (or ensuring FE sets preferred_block_size_rows=0 when bytes=0) so “disable” truly restores the fixed-row behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +45
def set_adaptive = { enabled ->
// preferred_block_size_bytes controls the feature end-to-end.
// Setting it to a very small value (e.g. 1 byte) forces the predictor to
// return row count = 1 every time, which is a good stress test.
if (enabled) {
sql "set preferred_block_size_bytes = 8388608" // 8 MB (default)
sql "set batch_size = 65535"
sql "set preferred_max_column_in_block_size_bytes = 1048576" // 1 MB
} else {
// Setting to 0 disables the byte-limit path at the accumulation layer.
sql "set preferred_block_size_bytes = 0"
sql "set batch_size = 4096"
sql "set preferred_max_column_in_block_size_bytes = 0"
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suite modifies multiple session variables (e.g. batch_size) but only resets preferred_block_size_bytes and preferred_max_column_in_block_size_bytes at the end. Leaving batch_size / preferred_block_size_rows at non-default values can leak into subsequent suites if they share the same session. Suggest resetting all modified vars back to defaults in the cleanup section.

Copilot uses AI. Check for mistakes.
Comment on lines +40 to +45
} else {
// Setting to 0 disables the byte-limit path at the accumulation layer.
sql "set preferred_block_size_bytes = 0"
sql "set batch_size = 4096"
sql "set preferred_max_column_in_block_size_bytes = 0"
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disabled branch only sets preferred_block_size_bytes=0, but does not reset preferred_block_size_rows. Since many operators now use block_max_rows(), leaving preferred_block_size_rows at its default (65535) can still change behavior compared to legacy fixed batch_size=4096. If the intent is “fixed row count only” when disabled, consider also setting preferred_block_size_rows to 0 (or to batch_size) in the disabled branch.

Copilot uses AI. Check for mistakes.
Comment on lines +373 to +381
// Base TabletReader always returns 0 (feature unsupported at base level).
TEST_F(BlockReaderByteBudgetTest, BaseTabletReaderReturnsZero) {
config::enable_adaptive_batch_size = true;
// TabletReader is abstract; use BlockReader as base pointer to test virtual dispatch.
BlockReader concrete;
concrete._reader_context.preferred_block_size_bytes = 99999;
TabletReader* base = &concrete;
// Through the virtual dispatch, BlockReader's override should be called.
EXPECT_EQ(base->preferred_block_size_bytes(), 99999);
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name/comment says “Base TabletReader always returns 0”, but the assertion is checking virtual dispatch through a TabletReader* to BlockReader’s override and expects 99999. Suggest updating the comment (or renaming the test) to reflect what is actually being validated.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 2 issues in the adaptive batch size implementation.

  1. SET preferred_block_size_bytes = 0 does not actually restore the legacy fixed-row behavior. RuntimeState::block_max_rows() and TabletReader::batch_max_rows() still switch to preferred_block_size_rows whenever the global config is enabled, so many operators now produce up to 65535 rows even though the session-level byte-budget flag is meant to disable the feature. The regression test only checks result equivalence, so it misses this behavioral regression.

  2. The OLAP scan path cannot grow beyond the legacy reader batch size. BetaRowsetReader still seeds StorageReadOptions::block_row_max from read_context->batch_size, and SegmentIterator then clamps every prediction to _initial_block_row_max. For narrow tables the predictor can therefore only shrink batches, never grow toward preferred_block_size_rows, which means the storage layer does not deliver the advertised adaptive growth behavior.

Critical checkpoint conclusions:

  • Goal / tests: Partially met. The PR wires adaptive sizing through many paths and adds broad tests, but the current tests do not prove the documented disable semantics or OLAP narrow-row growth.
  • Small / focused change: Not especially small; the feature spans many operators and the reviewed paths are not fully consistent yet.
  • Concurrency: No new lock-order or thread-safety bug was identified in the reviewed paths.
  • Lifecycle / static initialization: No new lifecycle or SIOF issue found in the reviewed paths.
  • Configuration: New session/config knobs are wired through FE->BE, but the session-level disable path is not honored consistently.
  • Compatibility: No storage-format or FE/BE compatibility break was identified from the reviewed diff.
  • Parallel code paths: Not all equivalent paths behave the same yet; OLAP scan still keeps the old hard cap while downstream operators use the new cap.
  • Special conditions: The new row-cap clamp in SegmentIterator defeats the intended adaptive growth behavior.
  • Test coverage: Good breadth, but missing assertions for the two regressions above.
  • Observability: Added FileScanner counters help there; nothing additional blocking found.
  • Transaction / persistence / data-write correctness: Not applicable for the reviewed changes; no issue found there.
  • Performance: The OLAP scan clamp leaves narrow-table performance gains unrealized.
  • Other issues: None beyond the 2 findings above from the reviewed paths.

void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }

MOCK_FUNCTION int block_max_rows() const {
return config::enable_adaptive_batch_size
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferred_block_size_bytes = 0 is documented as the session-level switch to disable adaptive sizing, but block_max_rows() still returns max(batch_size, preferred_block_size_rows()) whenever the global config is on. After this PR, many operators use state->block_max_rows(), so SET preferred_block_size_bytes = 0 still inflates the row cap from the legacy batch_size to 65535 by default instead of restoring the old fixed-row behavior. The same issue exists in TabletReader::batch_max_rows(), so the disable path is inconsistent across BE.

_init_virtual_columns(block);
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({
// Adaptive batch size: predict how many rows this batch should read.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clamp prevents the storage-side predictor from ever growing past the legacy reader cap. _initial_block_row_max is captured from _opts.block_row_max, and that value still comes from read_context->batch_size in BetaRowsetReader. For narrow OLAP scans, predict_next_rows() may return a much larger value, but std::min(predicted, _initial_block_row_max) forces it back down to the old fixed batch size, so this path can only shrink batches, never grow toward preferred_block_size_rows().

mrhhsg and others added 3 commits April 7, 2026 18:05
### What problem does this PR solve?

Issue Number: close #N/A

Related PR: apache#61535

Problem Summary: Address code review findings on the adaptive batch size feature:

1. `RuntimeState::block_max_rows()` now gates on `preferred_block_size_bytes() > 0`,
   so `SET preferred_block_size_bytes = 0` correctly restores legacy fixed-row behavior.
   Also fixes the `std::max` semantics that turned `preferred_block_size_rows` into a
   lower bound instead of upper bound, and fixes `size_t` to `int` cast overflow risk.

2. `TabletReader::batch_max_rows()` now also gates on `preferred_block_size_bytes > 0`.

3. `BetaRowsetReader` now seeds `block_row_max` from `preferred_block_size_rows` when
   adaptive sizing is active, so the OLAP scan path can grow beyond the legacy
   `batch_size` (1024) for narrow tables. `SegmentIterator::_initial_block_row_max`
   then correctly reflects the larger cap.

4. `VSortedRunMerger` single-run fast path now falls through to the general merge
   path when byte budget is active, preventing oversized output blocks.

5. Regression test: renamed `qt_` to `order_qt_` prefix per standard, removed
   post-test `drop table` statements (keep drop-before-create pattern per standard).

6. Unit tests updated to reflect the new gating semantics, including a new test
   case for the `preferred_block_size_bytes=0` fallback.

### Release note

None

### Check List (For Author)

- Test: Unit Test (71+110 tests passed including all adaptive batch size, sort merger,
  block reader, collect iterator, and operator tests)
- Behavior changed: Yes - `preferred_block_size_bytes = 0` now correctly disables
  adaptive row cap expansion (previously still used preferred_block_size_rows=65535)
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:
Follow-up fixes for remaining review comments on PR apache#61535:
1. Removed stale commented-out DROP TABLE in regression test
2. Reset all modified session variables (batch_size, preferred_block_size_rows)
   in regression test cleanup to prevent leaking into subsequent suites
3. Renamed misleading test "BaseTabletReaderReturnsZero" to
   "VirtualDispatchThroughTabletReaderPtr" with accurate comment
4. Applied clang-format to beta_rowset_reader.cpp

### Release note

None

### Check List (For Author)

- Test: Unit Test (BlockReaderBatchMaxRows, BlockReaderByteBudget, EstimateCollectedEnough — 27 tests passed)
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:
Two code paths lacked byte-budget enforcement for the adaptive batch size feature:

1. **Distinct Streaming Aggregation**: The split logic in
   _distinct_pre_agg_with_serialized_key only enforced row-count cap
   (batch_size). For wide grouping keys, this could produce blocks
   exceeding preferred_block_size_bytes within a single push().
   Fix: Added byte-budget-aware split — estimates remaining byte budget
   using average row size and caps max_rows_to_add accordingly. Excess
   rows go to _cache_block for the next iteration.

2. **TopN storage path** (_topn_next): The one-shot topN computation
   returned the entire result in a single block without consulting
   byte budget. For large LIMIT values on wide tables, this could
   produce blocks far exceeding preferred_block_size_bytes.
   Fix: After computing the full topN result, if it exceeds the byte
   budget, the result is stored and returned in chunks. Each chunk
   respects both batch_max_rows and the byte budget. When byte budget
   is disabled, behavior is unchanged (zero overhead).

### Release note

None

### Check List (For Author)

- Test: Unit Test (78 + 105 = 183 tests passed including DistinctStreamingAgg, Sorter, Operator, and adaptive batch size tests)
- Behavior changed: No (feature is gated on preferred_block_size_bytes > 0)
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Apr 7, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. be/src/exec/operator/nested_loop_join_probe_operator.cpp: the byte budget is only translated into effective_max_rows after the first process append, so the first build or probe chunk can still overshoot badly on wide rows.
  2. be/src/exec/operator/distinct_streaming_aggregation_operator.cpp: the new split logic uses the average size of rows already buffered in out_block, which does not bound the bytes contributed by the candidate distinct rows being appended.
  3. be/src/storage/iterator/vcollect_iterator.cpp: _topn_next_chunk() slices by the full result average row size, so a chunk that starts with a few very wide rows can still exceed preferred_block_size_bytes.

Critical checkpoints:

  • Goal / correctness: Partially met. Adaptive sizing is broadly wired through and tested, but the three paths above still violate the advertised byte-budget guarantee on skewed-width data. I did not see regression coverage for those cases.
  • Minimal / focused: Reasonably focused for a cross-cutting feature.
  • Concurrency: Reviewed the touched concurrent paths; no new lock-order or shared-state bug found in the modified code.
  • Lifecycle / static initialization: No issue found.
  • Configuration: FE/BE/thrift propagation for the new knobs looks complete, and preferred_block_size_bytes = 0 correctly gates the feature off.
  • Compatibility: New thrift fields are appended with new IDs; no compatibility issue found in the touched protocol changes.
  • Parallel code paths: Not fully consistent yet; the three findings are remaining byte-budget gaps in parallel paths.
  • Special conditions / comments: The new average-based checks are not strong enough to justify the claimed byte-budget enforcement in these paths.
  • Test coverage: Broad positive coverage exists, but adversarial skewed-row-size tests are still missing for these code paths.
  • Observability: Added scanner counters are useful; no blocker found.
  • Transaction / persistence: Not applicable.
  • Data writes / atomicity: Not applicable.
  • FE-BE variable passing: Complete in the touched code.
  • Performance: Overall reasonable; the current blockers are correctness gaps, not overhead.
  • Other issues: None beyond the findings above.

while (_join_block.rows() + add_rows() <= state->batch_size()) {
const auto block_max_bytes = state->block_max_bytes();
size_t effective_max_rows = state->block_max_rows();
bool bytes_estimated = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

effective_max_rows only becomes byte-aware after the first process_probe_block() or process_build_block() append. That means the first iteration can still materialize a full build or probe chunk based only on row count and blow far past preferred_block_size_bytes on wide rows. The same late adjustment pattern is duplicated in _generate_block_base_build(), so this is still not a real byte-budget bound for nested-loop join output.

} else {
size_t avg_row_bytes = current_bytes / out_block->rows();
if (avg_row_bytes > 0) {
size_t rows_for_budget = (block_max_bytes - current_bytes) / avg_row_bytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using out_block->bytes() / out_block->rows() here does not actually bound the bytes added by the rows in _distinct_row. If the buffered rows are narrow but the next distinct key is wide, such as large STRING, JSON, or VARIANT values, rows_for_budget can still be at least 1 and this push emits a block far above preferred_block_size_bytes. To preserve the new contract, this split needs to account for the candidate rows bytes themselves, not just the historical average of the already-buffered rows.

// Cap by batch_max_rows.
chunk_rows = std::min(chunk_rows, (size_t)_reader->batch_max_rows());

// Cap by byte budget.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still does not guarantee the chunk respects preferred_block_size_bytes. chunk_rows is derived from the full result average row size, so a skewed TopN result, for example a few very wide rows followed by many narrow rows, can produce a first chunk whose real bytes are far above the budget even though the average says it is safe. If this path is supposed to enforce the byte cap, it needs a byte-aware cut over the actual rows being copied, not a global average.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 99 out of 99 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1010 to +1013
// Byte-budget check: _merge_next() has already advanced _ref to the next unread row,
// so it is safe to stop here without duplicating any data.
if (collected_enough_rows(target_columns, continuous_row_in_block)) {
if (continuous_row_in_block > 0) {
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Level1Iterator::_merge_next(), collected_enough_rows() is evaluated on every loop iteration. That call recomputes Block::columns_byte_size(columns) by summing byte_size() across all columns each time, which makes the merge loop O(rows * columns) and can be a noticeable CPU regression on wide schemas. Consider checking the byte budget only every N rows (similar to BlockReader’s interval checks), or maintaining an incremental byte counter updated when ranges are inserted, so you avoid rescanning all columns per row.

Copilot uses AI. Check for mistakes.
Comment on lines +66 to +71
// Insert 1000 rows with ~3 KB data each.
def wide_rows = (1..1000).collect { i ->
"(${i}, '${('a' * 1000)}', '${('b' * 1000)}', '${('c' * 1000)}')"
}
sql "insert into abs_wide_table values ${wide_rows.join(',')}"

Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wide-table data load builds a single INSERT statement with 1000 rows of ~3KB string literals (wide_rows.join(',')), which can produce a multi‑MB SQL string. This risks exceeding max query length / max_packet limits in some CI environments and can make the regression suite flaky/slow. Prefer generating the data on the BE side (e.g., INSERT…SELECT from numbers() with repeat()/lpad(), or inserting in smaller batches) to keep the statement size bounded.

Copilot uses AI. Check for mistakes.
Comment on lines +155 to +161
MOCK_FUNCTION size_t preferred_block_size_bytes() const {
if (_query_options.__isset.preferred_block_size_bytes) {
auto v = _query_options.preferred_block_size_bytes;
return v > 0 ? static_cast<size_t>(v) : 0;
}
return 0;
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeState::preferred_block_size_bytes() returns 0 when the query option is not explicitly set (__isset is false). This makes adaptive batch sizing effectively disabled by default, which is inconsistent with the stated default of 8MB (and also inconsistent with preferred_block_size_rows()/preferred_max_column_in_block_size_bytes(), which do return defaults when unset). Consider returning 8MB when __isset is false (similar to spill_buffer_size_bytes()), so the BE-side default behavior matches the documented defaults even if an older FE doesn’t forward the new field.

Copilot uses AI. Check for mistakes.
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Apr 7, 2026

run buildall

…control

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#61535

Problem Summary: Multiple operators (aggregation_source, streaming_aggregation,
set_source, distinct_streaming_aggregation, union_source, table_function,
nested_loop_join_probe, schema_scan) had identical copy-pasted logic for
computing effective max rows from byte budgets, checking within-budget
conditions, and computing remaining row capacity. This duplication increases
maintenance burden and inconsistency risk.

Introduce `BlockBudget` struct in `be/src/common/block_budget.h` that
centralizes the 4 common patterns:
- `effective_max_rows(estimated_row_bytes)` — Pattern A (5 call sites)
- `within_budget(rows, bytes)` — Pattern B (6 call sites)
- `exceeded(rows, bytes)` — break conditions
- `remaining_rows(rows, bytes)` — Pattern C (2 call sites)

Net effect: -33 lines across 8 operator files, replaced with concise
single-line calls. All 59 related unit tests pass.

### Release note

None

### Check List (For Author)

- Test: Unit Test (59 tests including 14 new BlockBudget tests)
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 100.00% (6/6) 🎉
Increment coverage report
Complete coverage report

@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from 784300c to fcabf4c Compare April 7, 2026 17:43
Problem Summary: The BlockBudget refactoring lacked operator-level byte-budget tests. This commit adds targeted unit tests for aggregation, streaming aggregation, distinct streaming aggregation, set (except), union (const-expr path), and table function operators to verify that the byte budget correctly limits output rows/blocks.

None

- Test: Unit Test (all 104 operator tests pass including 21 ByteBudget-specific tests)
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mrhhsg mrhhsg force-pushed the adaptive_batch_size branch from fcabf4c to d8bccbf Compare April 8, 2026 10:19
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented Apr 8, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.48% (1798/2291)
Line Coverage 64.15% (32276/50312)
Region Coverage 65.07% (16237/24954)
Branch Coverage 55.61% (8680/15610)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 100.00% (6/6) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 84.10% (709/843) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 63.20% (23517/37213)
Line Coverage 46.73% (241293/516369)
Region Coverage 43.73% (197549/451784)
Branch Coverage 45.06% (85723/190225)

…tive batch size

### What problem does this PR solve?

Issue Number: close #xxx

Problem Summary: When the adaptive batch size predictor reduces `block_row_max`
before the first `_lazy_init` call, columns are reserved for the reduced prediction
(e.g., 256 rows). On subsequent batches, if the predictor increases `block_row_max`
(e.g., to 4096), the segment iterator tries to read more rows than the column
capacity, causing a heap-buffer-overflow in `ColumnDictI32::insert_many_dict_data`.

The fix uses `_initial_block_row_max` (the original unreduced ceiling) for column
reservation in `_lazy_init`, ensuring columns always have enough capacity for the
maximum possible batch size.

### Release note

None

### Check List (For Author)

- Test: Unit Test (38 AdaptiveBlockSizePredictor + 29 BlockBudget/ByteBudget tests pass)
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants