Skip to content

Commit fc90d91

Browse files
committed
[fix](doris catalog) FragmentMgr should not cancel virtual doris cluster query
1 parent 157a35e commit fc90d91

File tree

4 files changed

+175
-4
lines changed

4 files changed

+175
-4
lines changed

be/src/runtime/fragment_mgr.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -765,9 +765,16 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
765765

766766
// This may be a first fragment request of the query.
767767
// Create the query fragments context.
768-
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
769-
params.coord, params.is_nereids,
770-
params.current_connect_fe, query_source);
768+
// Cross-cluster query: coordinator FE may not belong to local cluster.
769+
// In that case, cancel_worker() should not cancel it based on local FE liveness.
770+
QuerySource actual_query_source = query_source;
771+
if (query_source == QuerySource::INTERNAL_FRONTEND &&
772+
!_exec_env->get_running_frontends().contains(params.coord)) {
773+
actual_query_source = QuerySource::EXTERNAL_FRONTEND;
774+
}
775+
query_ctx = QueryContext::create(
776+
query_id, _exec_env, params.query_options, params.coord,
777+
params.is_nereids, params.current_connect_fe, actual_query_source);
771778
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
772779
RETURN_IF_ERROR(DescriptorTbl::create(
773780
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
@@ -1128,6 +1135,11 @@ void FragmentMgr::_collect_invalid_queries(
11281135
-> Status {
11291136
for (const auto& it : map) {
11301137
if (auto q_ctx = it.second.lock()) {
1138+
// Cross-cluster query: coordinator FE is not in local `running_fes`,
1139+
// we should not cancel it based on local coordinator liveness.
1140+
if (q_ctx->get_query_source() == QuerySource::EXTERNAL_FRONTEND) {
1141+
continue;
1142+
}
11311143
q_contexts.push_back(q_ctx);
11321144
const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
11331145

be/src/runtime/query_context.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ const std::string toString(QuerySource queryType) {
7373
return "ROUTINE_LOAD";
7474
case QuerySource::EXTERNAL_CONNECTOR:
7575
return "EXTERNAL_CONNECTOR";
76+
case QuerySource::EXTERNAL_FRONTEND:
77+
return "EXTERNAL_FRONTEND";
7678
default:
7779
return "UNKNOWN";
7880
}

be/src/runtime/query_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ enum class QuerySource {
7171
STREAM_LOAD,
7272
GROUP_COMMIT_LOAD,
7373
ROUTINE_LOAD,
74-
EXTERNAL_CONNECTOR
74+
EXTERNAL_CONNECTOR,
75+
EXTERNAL_FRONTEND
7576
};
7677

7778
const std::string toString(QuerySource query_source);
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gen_cpp/PaloInternalService_types.h>
19+
#include <gtest/gtest.h>
20+
21+
#include "runtime/descriptor_helper.h"
22+
#include "runtime/exec_env.h"
23+
#include "runtime/fragment_mgr.h"
24+
#include "runtime/workload_group/workload_group_manager.h"
25+
26+
namespace doris {
27+
28+
class FragmentMgrCrossClusterCancelTest : public testing::Test {
29+
public:
30+
void SetUp() override {
31+
// Make frontends list deterministic for this ExecEnv instance.
32+
_exec_env.update_frontends({});
33+
34+
_exec_env._workload_group_manager = new WorkloadGroupMgr();
35+
// Ensure there is a "normal" workload group, otherwise WorkloadGroupMgr::get_group() will throw.
36+
WorkloadGroupInfo normal_wg_info {.id = 1, .name = "normal"};
37+
_exec_env._workload_group_manager->get_or_create_workload_group(normal_wg_info);
38+
39+
_exec_env._fragment_mgr = new FragmentMgr(&_exec_env);
40+
}
41+
42+
void TearDown() override {
43+
if (_exec_env._fragment_mgr != nullptr) {
44+
_exec_env._fragment_mgr->stop();
45+
}
46+
delete _exec_env._fragment_mgr;
47+
_exec_env._fragment_mgr = nullptr;
48+
delete _exec_env._workload_group_manager;
49+
_exec_env._workload_group_manager = nullptr;
50+
_exec_env.update_frontends({});
51+
}
52+
53+
protected:
54+
static TDescriptorTable _make_min_desc_tbl() {
55+
TDescriptorTableBuilder dtb;
56+
TTupleDescriptorBuilder tuple_builder;
57+
tuple_builder.add_slot(TSlotDescriptorBuilder()
58+
.type(TYPE_INT)
59+
.nullable(true)
60+
.column_name("c1")
61+
.column_pos(1)
62+
.build());
63+
tuple_builder.build(&dtb);
64+
return dtb.desc_tbl();
65+
}
66+
67+
static TQueryOptions _make_min_query_options(int64_t fe_process_uuid) {
68+
TQueryOptions query_options;
69+
query_options.__set_query_type(TQueryType::SELECT);
70+
query_options.__set_execution_timeout(60);
71+
query_options.__set_query_timeout(60);
72+
query_options.__set_mem_limit(64L * 1024 * 1024);
73+
query_options.__set_fe_process_uuid(fe_process_uuid);
74+
return query_options;
75+
}
76+
77+
ExecEnv _exec_env;
78+
};
79+
80+
TEST_F(FragmentMgrCrossClusterCancelTest,
81+
MarkQuerySourceAsExternalFrontendWhenCoordinatorNotLocal) {
82+
auto* fragment_mgr = _exec_env.fragment_mgr();
83+
ASSERT_NE(fragment_mgr, nullptr);
84+
85+
TUniqueId query_id;
86+
query_id.__set_hi(1);
87+
query_id.__set_lo(2);
88+
89+
TNetworkAddress coord;
90+
coord.hostname = "fe-a";
91+
coord.port = 9030;
92+
93+
TPipelineFragmentParams params;
94+
params.__set_query_id(query_id);
95+
params.__set_is_simplified_param(false);
96+
params.__set_coord(coord);
97+
params.__set_is_nereids(false);
98+
params.__set_current_connect_fe(coord);
99+
params.__set_fragment_num_on_host(1);
100+
params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 123));
101+
params.__set_desc_tbl(_make_min_desc_tbl());
102+
103+
std::shared_ptr<QueryContext> query_ctx;
104+
TPipelineFragmentParamsList parent;
105+
auto st = fragment_mgr->_get_or_create_query_ctx(params, parent, QuerySource::INTERNAL_FRONTEND,
106+
query_ctx);
107+
ASSERT_TRUE(st.ok()) << st.to_string();
108+
ASSERT_NE(query_ctx, nullptr);
109+
EXPECT_EQ(query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
110+
}
111+
112+
TEST_F(FragmentMgrCrossClusterCancelTest, CancelWorkerInvalidQueryDetectionSkipsExternalFrontend) {
113+
auto* fragment_mgr = _exec_env.fragment_mgr();
114+
ASSERT_NE(fragment_mgr, nullptr);
115+
116+
TUniqueId query_id;
117+
query_id.__set_hi(3);
118+
query_id.__set_lo(4);
119+
120+
TNetworkAddress coord;
121+
coord.hostname = "fe-b";
122+
coord.port = 9030;
123+
124+
TPipelineFragmentParams params;
125+
params.__set_query_id(query_id);
126+
params.__set_is_simplified_param(false);
127+
params.__set_coord(coord);
128+
params.__set_is_nereids(false);
129+
params.__set_current_connect_fe(coord);
130+
params.__set_fragment_num_on_host(1);
131+
params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 456));
132+
params.__set_desc_tbl(_make_min_desc_tbl());
133+
134+
std::shared_ptr<QueryContext> query_ctx;
135+
TPipelineFragmentParamsList parent;
136+
auto st = fragment_mgr->_get_or_create_query_ctx(params, parent, QuerySource::INTERNAL_FRONTEND,
137+
query_ctx);
138+
ASSERT_TRUE(st.ok()) << st.to_string();
139+
ASSERT_NE(query_ctx, nullptr);
140+
ASSERT_EQ(query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
141+
142+
std::vector<TUniqueId> queries_lost_coordinator;
143+
std::vector<TUniqueId> queries_pipeline_task_leak;
144+
std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
145+
std::map<TNetworkAddress, FrontendInfo> running_fes;
146+
timespec ts;
147+
ts.tv_sec = 0;
148+
ts.tv_nsec = 0;
149+
150+
fragment_mgr->_collect_invalid_queries(queries_lost_coordinator, queries_pipeline_task_leak,
151+
running_queries_on_all_fes, running_fes, ts);
152+
EXPECT_TRUE(queries_lost_coordinator.empty());
153+
EXPECT_TRUE(queries_pipeline_task_leak.empty());
154+
}
155+
156+
} // namespace doris

0 commit comments

Comments
 (0)