Skip to content

Commit 157a35e

Browse files
authored
[opt](explode) Optimize explode_outer and posexplode (#62069)
### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Improvement of #60352, optimize explode_outer, posexplode and posexplode_outer. The approach is a single-pass algorithm: walk through child rows, accumulating contiguous segments into the output, then when hitting a null/empty row or reaching the end, flush the segment using bulk operations. For outer-null rows, insert a NULL and copy the non-table-function columns directly. This naturally handles both outer and non-outer modes since non-outer mode just won't produce any null outputs. For posexplode, generate position indices alongside this. Performance test result. Create and populate a large test table. Use 100,000,000 base rows, each with an array of 10 INT elements, producing 1,000,000,000 total exploded output rows. | Function | slow path | optimized | SpeedUp| | --------- |---------- |-----------|----------| | explode | 9734 ms | 5105 ms |90%| | explode_outer | 9685 ms | 5064 ms |91%| | posexplode | 11021 ms | 5963 ms |84%| | posexplode_outer | 14088 ms | 5903 ms |138%| ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent a8c8905 commit 157a35e

18 files changed

+820
-78
lines changed

be/src/exec/operator/operator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,6 @@ class OperatorXBase : public OperatorBase {
796796
_resource_profile(tnode.resource_profile),
797797
_limit(tnode.limit) {
798798
if (tnode.__isset.output_tuple_id) {
799-
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}));
800799
_output_row_descriptor =
801800
std::make_unique<RowDescriptor>(descs, std::vector {tnode.output_tuple_id});
802801
}

be/src/exec/operator/table_function_operator.cpp

Lines changed: 159 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
#include "core/block/block.h"
2929
#include "core/block/column_numbers.h"
3030
#include "core/column/column_nullable.h"
31+
#include "core/column/column_struct.h"
32+
#include "core/column/column_vector.h"
3133
#include "core/custom_allocator.h"
3234
#include "exec/operator/operator.h"
3335
#include "exprs/table_function/table_function_factory.h"
@@ -264,30 +266,156 @@ Status TableFunctionLocalState::_get_expanded_block_block_fast_path(
264266
const auto& offsets = *_block_fast_path_ctx.offsets_ptr;
265267
const auto child_rows = cast_set<int64_t>(offsets.size());
266268

267-
std::vector<uint32_t> row_ids;
268-
row_ids.reserve(remaining_capacity);
269-
uint64_t first_nested_idx = 0;
270-
uint64_t expected_next_nested_idx = 0;
271-
bool found_nested_range = false;
272-
273269
int64_t child_row = _block_fast_path_row;
274270
uint64_t in_row_offset = _block_fast_path_in_row_offset;
275271
int produced_rows = 0;
276272

277-
while (produced_rows < remaining_capacity && child_row < child_rows) {
278-
if (_block_fast_path_ctx.array_nullmap_data &&
279-
_block_fast_path_ctx.array_nullmap_data[child_row]) {
280-
// NULL array row: skip it here. Slow path will handle output semantics if needed.
281-
child_row++;
282-
in_row_offset = 0;
283-
continue;
273+
const bool is_outer = _fns[0]->is_outer();
274+
const bool is_posexplode = _block_fast_path_ctx.generate_row_index;
275+
auto& out_col = columns[p._child_slots.size()];
276+
277+
// Decompose posexplode struct output column if needed
278+
ColumnStruct* struct_col_ptr = nullptr;
279+
ColumnUInt8* outer_struct_nullmap_ptr = nullptr;
280+
IColumn* value_col_ptr = nullptr;
281+
ColumnInt32* pos_col_ptr = nullptr;
282+
if (is_posexplode) {
283+
if (out_col->is_nullable()) {
284+
auto* nullable = assert_cast<ColumnNullable*>(out_col.get());
285+
struct_col_ptr = assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
286+
outer_struct_nullmap_ptr =
287+
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get());
288+
} else {
289+
struct_col_ptr = assert_cast<ColumnStruct*>(out_col.get());
290+
}
291+
pos_col_ptr = assert_cast<ColumnInt32*>(&struct_col_ptr->get_column(0));
292+
value_col_ptr = &struct_col_ptr->get_column(1);
293+
}
294+
// Segment tracking: accumulate contiguous nested ranges, flush on boundaries.
295+
// Array column offsets are monotonically non-decreasing, so nested data across child rows
296+
// is always contiguous (even with NULL/empty rows that contribute zero elements).
297+
struct ExpandSegmentContext {
298+
std::vector<uint32_t>
299+
seg_row_ids; // row ids of non table-function columns to replicate for this segment
300+
std::vector<int32_t>
301+
seg_positions; // for posexplode, the position values to write for this segment
302+
int64_t seg_nested_start = -1; // start offset in the nested column of this segment
303+
int seg_nested_count =
304+
0; // number of nested rows in this segment (can be > child row count due to multiple elements per row)
305+
};
306+
ExpandSegmentContext segment_ctx;
307+
segment_ctx.seg_row_ids.reserve(remaining_capacity);
308+
if (is_posexplode) {
309+
segment_ctx.seg_positions.reserve(remaining_capacity);
310+
}
311+
312+
auto reset_expand_segment_ctx = [&segment_ctx, is_posexplode]() {
313+
segment_ctx.seg_nested_start = -1;
314+
segment_ctx.seg_nested_count = 0;
315+
segment_ctx.seg_row_ids.clear();
316+
if (is_posexplode) {
317+
segment_ctx.seg_positions.clear();
318+
}
319+
};
320+
321+
// Flush accumulated contiguous segment to output columns
322+
auto flush_segment = [&]() {
323+
if (segment_ctx.seg_nested_count == 0) {
324+
return;
325+
}
326+
327+
// Non-TF columns: replicate each child row for every output element
328+
for (auto index : p._output_slot_indexs) {
329+
auto src_column = _child_block->get_by_position(index).column;
330+
columns[index]->insert_indices_from(
331+
*src_column, segment_ctx.seg_row_ids.data(),
332+
segment_ctx.seg_row_ids.data() + segment_ctx.seg_row_ids.size());
333+
}
334+
335+
if (is_posexplode) {
336+
// Write positions
337+
pos_col_ptr->insert_many_raw_data(
338+
reinterpret_cast<const char*>(segment_ctx.seg_positions.data()),
339+
segment_ctx.seg_positions.size());
340+
// Write nested values to the struct's value sub-column
341+
DCHECK(value_col_ptr->is_nullable())
342+
<< "posexplode fast path requires nullable value column";
343+
auto* val_nullable = assert_cast<ColumnNullable*>(value_col_ptr);
344+
val_nullable->get_nested_column_ptr()->insert_range_from(
345+
*_block_fast_path_ctx.nested_col, segment_ctx.seg_nested_start,
346+
segment_ctx.seg_nested_count);
347+
auto* val_nullmap =
348+
assert_cast<ColumnUInt8*>(val_nullable->get_null_map_column_ptr().get());
349+
auto& val_nullmap_data = val_nullmap->get_data();
350+
const size_t old_size = val_nullmap_data.size();
351+
val_nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
352+
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
353+
memcpy(val_nullmap_data.data() + old_size,
354+
_block_fast_path_ctx.nested_nullmap_data + segment_ctx.seg_nested_start,
355+
segment_ctx.seg_nested_count * sizeof(UInt8));
356+
} else {
357+
memset(val_nullmap_data.data() + old_size, 0,
358+
segment_ctx.seg_nested_count * sizeof(UInt8));
359+
}
360+
// Struct-level null map: these rows are not null
361+
if (outer_struct_nullmap_ptr) {
362+
outer_struct_nullmap_ptr->insert_many_defaults(segment_ctx.seg_nested_count);
363+
}
364+
} else if (out_col->is_nullable()) {
365+
auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
366+
out_nullable->get_nested_column_ptr()->insert_range_from(
367+
*_block_fast_path_ctx.nested_col, segment_ctx.seg_nested_start,
368+
segment_ctx.seg_nested_count);
369+
auto* nullmap_column =
370+
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
371+
auto& nullmap_data = nullmap_column->get_data();
372+
const size_t old_size = nullmap_data.size();
373+
nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
374+
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
375+
memcpy(nullmap_data.data() + old_size,
376+
_block_fast_path_ctx.nested_nullmap_data + segment_ctx.seg_nested_start,
377+
segment_ctx.seg_nested_count * sizeof(UInt8));
378+
} else {
379+
memset(nullmap_data.data() + old_size, 0,
380+
segment_ctx.seg_nested_count * sizeof(UInt8));
381+
}
382+
} else {
383+
out_col->insert_range_from(*_block_fast_path_ctx.nested_col,
384+
segment_ctx.seg_nested_start, segment_ctx.seg_nested_count);
284385
}
386+
reset_expand_segment_ctx();
387+
};
388+
389+
// Emit one NULL output row for an outer-null/empty child row
390+
auto emit_outer_null = [&](int64_t cr) {
391+
for (auto index : p._output_slot_indexs) {
392+
auto src_column = _child_block->get_by_position(index).column;
393+
columns[index]->insert_from(*src_column, cr);
394+
}
395+
out_col->insert_default();
396+
};
397+
// Walk through child rows, accumulating contiguous segments into the output,
398+
// then when hitting a null/empty row or reaching the end,
399+
// flush the segment using bulk operations.
400+
// For outer-null rows, insert a NULL and copy the non-table-function columns directly.
401+
// This naturally handles both outer and non-outer modes since non-outer mode
402+
// just won't produce any null outputs.
403+
// For posexplode, generate position indices alongside this.
404+
while (produced_rows < remaining_capacity && child_row < child_rows) {
405+
const bool is_null_row = _block_fast_path_ctx.array_nullmap_data &&
406+
_block_fast_path_ctx.array_nullmap_data[child_row];
285407

286408
const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1];
287-
const uint64_t cur_off = offsets[child_row];
409+
const uint64_t cur_off = is_null_row ? prev_off : offsets[child_row];
288410
const uint64_t nested_len = cur_off - prev_off;
289411

290-
if (in_row_offset >= nested_len) {
412+
if (is_null_row || in_row_offset >= nested_len) {
413+
// for outer functions, emit null row for NULL or empty array rows
414+
if (is_outer && in_row_offset == 0 && (is_null_row || nested_len == 0)) {
415+
flush_segment();
416+
emit_outer_null(child_row);
417+
produced_rows++;
418+
}
291419
child_row++;
292420
in_row_offset = 0;
293421
continue;
@@ -301,57 +429,37 @@ Status TableFunctionLocalState::_get_expanded_block_block_fast_path(
301429
DCHECK_LE(nested_start + take_count, cur_off);
302430
DCHECK_LE(nested_start + take_count, _block_fast_path_ctx.nested_col->size());
303431

304-
if (!found_nested_range) {
305-
found_nested_range = true;
306-
first_nested_idx = nested_start;
307-
expected_next_nested_idx = nested_start;
432+
if (segment_ctx.seg_nested_count == 0) {
433+
segment_ctx.seg_nested_start = nested_start;
434+
} else {
435+
// Nested data from an array column is always contiguous: offsets are monotonically
436+
// non-decreasing, so skipping NULL/empty rows doesn't create gaps.
437+
DCHECK_EQ(static_cast<uint64_t>(segment_ctx.seg_nested_start +
438+
segment_ctx.seg_nested_count),
439+
nested_start)
440+
<< "nested data must be contiguous across child rows";
308441
}
309-
DCHECK_EQ(nested_start, expected_next_nested_idx);
310442

311443
// Map each produced output row back to its source child row for copying non-table-function
312444
// columns via insert_indices_from().
313445
for (int j = 0; j < take_count; ++j) {
314-
row_ids.push_back(cast_set<uint32_t>(child_row));
446+
segment_ctx.seg_row_ids.push_back(cast_set<uint32_t>(child_row));
447+
if (is_posexplode) {
448+
segment_ctx.seg_positions.push_back(cast_set<int32_t>(in_row_offset + j));
449+
}
315450
}
316451

452+
segment_ctx.seg_nested_count += take_count;
317453
produced_rows += take_count;
318-
expected_next_nested_idx += take_count;
319454
in_row_offset += take_count;
320455
if (in_row_offset >= nested_len) {
321456
child_row++;
322457
in_row_offset = 0;
323458
}
324459
}
325460

326-
if (produced_rows > 0) {
327-
for (auto index : p._output_slot_indexs) {
328-
auto src_column = _child_block->get_by_position(index).column;
329-
columns[index]->insert_indices_from(*src_column, row_ids.data(),
330-
row_ids.data() + produced_rows);
331-
}
332-
333-
auto& out_col = columns[p._child_slots.size()];
334-
if (out_col->is_nullable()) {
335-
auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
336-
out_nullable->get_nested_column_ptr()->insert_range_from(
337-
*_block_fast_path_ctx.nested_col, first_nested_idx, produced_rows);
338-
auto* nullmap_column =
339-
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
340-
auto& nullmap_data = nullmap_column->get_data();
341-
const size_t old_size = nullmap_data.size();
342-
nullmap_data.resize(old_size + produced_rows);
343-
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
344-
memcpy(nullmap_data.data() + old_size,
345-
_block_fast_path_ctx.nested_nullmap_data + first_nested_idx,
346-
produced_rows * sizeof(UInt8));
347-
} else {
348-
memset(nullmap_data.data() + old_size, 0, produced_rows * sizeof(UInt8));
349-
}
350-
} else {
351-
out_col->insert_range_from(*_block_fast_path_ctx.nested_col, first_nested_idx,
352-
produced_rows);
353-
}
354-
}
461+
// Flush any remaining segment
462+
flush_segment();
355463

356464
_block_fast_path_row = child_row;
357465
_block_fast_path_in_row_offset = in_row_offset;

be/src/exprs/table_function/table_function.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class TableFunction {
4040
const IColumn::Offsets64* offsets_ptr = nullptr;
4141
ColumnPtr nested_col = nullptr;
4242
const UInt8* nested_nullmap_data = nullptr;
43+
bool generate_row_index = false;
4344
};
4445

4546
virtual Status prepare() { return Status::OK(); }

be/src/exprs/table_function/vexplode.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
9494
}
9595

9696
bool VExplodeTableFunction::support_block_fast_path() const {
97-
return !_is_outer;
97+
return true;
9898
}
9999

100100
Status VExplodeTableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/,

be/src/exprs/table_function/vexplode_v2.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Status VExplodeV2TableFunction::process_init(Block* block, RuntimeState* state)
109109
}
110110

111111
bool VExplodeV2TableFunction::support_block_fast_path() const {
112-
return !_is_outer && !_generate_row_index && _multi_detail.size() == 1;
112+
return _multi_detail.size() == 1;
113113
}
114114

115115
Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/,
@@ -123,6 +123,7 @@ Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, Runtim
123123
ctx->offsets_ptr = detail.offsets_ptr;
124124
ctx->nested_col = detail.nested_col;
125125
ctx->nested_nullmap_data = detail.nested_nullmap_data;
126+
ctx->generate_row_index = _generate_row_index;
126127
return Status::OK();
127128
}
128129

be/test/exec/operator/analytic_sink_operator_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
5656
sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
5757
source = std::make_unique<AnalyticSourceOperatorX>();
5858
state = std::make_shared<MockRuntimeState>();
59-
state->batsh_size = batch_size;
59+
state->_batch_size = batch_size;
6060
std::cout << "AnalyticSinkOperatorTest::SetUp() batch_size: " << batch_size << std::endl;
6161
_child_op = std::make_unique<MockAnalyticSinkOperator>();
6262
for (int i = 0; i < batch_size; i++) {

be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct DistinctStreamingAggOperatorTest : public ::testing::Test {
3434
op = std::make_unique<DistinctStreamingAggOperatorX>();
3535
mock_op = std::make_shared<MockOperatorX>();
3636
state = std::make_shared<MockRuntimeState>();
37-
state->batsh_size = 10;
37+
state->_batch_size = 10;
3838
op->_child = mock_op;
3939
}
4040

be/test/exec/operator/exchange_source_operator_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct MockExchangeSourceLocalState : public ExchangeLocalState {
6868
struct ExchangeSourceOperatorXTest : public ::testing::Test {
6969
void SetUp() override {
7070
state = std::make_shared<MockRuntimeState>();
71-
state->batsh_size = 10;
71+
state->_batch_size = 10;
7272
}
7373

7474
void create_op(int num_senders, bool is_merging, int offset, int limit) {

be/test/exec/operator/partition_sort_sink_operator_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class PartitionSortOperatorMockOperator : public OperatorXBase {
5151
struct PartitionSortOperatorTest : public ::testing::Test {
5252
void SetUp() override {
5353
state = std::make_shared<MockRuntimeState>();
54-
state->batsh_size = 10;
54+
state->_batch_size = 10;
5555
_child_op = std::make_unique<PartitionSortOperatorMockOperator>();
5656
}
5757

be/test/exec/operator/query_cache_operator_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class QueryCacheMockChildOperator : public OperatorXBase {
5050
struct QueryCacheOperatorTest : public ::testing::Test {
5151
void SetUp() override {
5252
state = std::make_shared<MockRuntimeState>();
53-
state->batsh_size = 10;
53+
state->_batch_size = 10;
5454
child_op = std::make_unique<QueryCacheMockChildOperator>();
5555
query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 * 1024));
5656
query_cache = query_cache_uptr.get();

0 commit comments

Comments
 (0)