Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,18 +699,20 @@ Status PipelineFragmentContext::_create_tree_helper(
int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
bool current_require_bucket_distribution = require_bucket_distribution;
// TODO: Create CacheOperator is confused now
OperatorPtr op = nullptr;
OperatorPtr cache_op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_operator,
current_require_bucket_distribution));
current_require_bucket_distribution, cache_op));
// Initialization must be done here. For example, group by expressions in agg will be used to
// decide if a local shuffle should be planed, so it must be initialized here.
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
// add to parent's child(s)
RETURN_IF_ERROR(parent->set_child(op));
RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
} else {
*root = op;
}
Expand Down Expand Up @@ -1277,7 +1279,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_operator,
const bool require_bucket_distribution) {
const bool require_bucket_distribution,
OperatorPtr& cache_op) {
std::vector<DataSinkOperatorPtr> sink_ops;
Defer defer = Defer([&]() {
if (op) {
Expand Down Expand Up @@ -1376,7 +1379,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(new_pipe->id());

DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
next_sink_operator_id(), cache_source_id, op->operator_id()));
next_sink_operator_id(), op->node_id(), op->operator_id()));
RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
return Status::OK();
};
Expand All @@ -1402,6 +1405,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

cache_op = op;
op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
tnode, descs);
RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
Expand All @@ -1416,7 +1420,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

cache_op = op;
op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
descs);
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
Expand All @@ -1432,6 +1436,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
if (need_create_cache_op) {
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
cache_op = op;
}

if (enable_spill) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx,
const bool followed_by_shuffled_join,
const bool require_bucket_distribution);
const bool require_bucket_distribution, OperatorPtr& cache_op);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, OperatorPtr& op,
Expand Down
Loading