Skip to content

Commit 7bbb961

Browse files
committed
fix[ATK-4637]: add Jackson dependencies and enhance MySQL delay detection logic
1 parent 09e58bb commit 7bbb961

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<grpc.version>1.75.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
2727
<netty.version>4.1.124.Final</netty.version>
2828
<log4j2.version>2.18.0</log4j2.version>
29+
<jackson.version>2.15.0</jackson.version>
2930
</properties>
3031
<repositories>
3132
<repository>
@@ -130,6 +131,27 @@
130131
<groupId>de.ruedigermoeller</groupId>
131132
<artifactId>fst</artifactId>
132133
<version>2.57</version>
134+
<exclusions>
135+
<exclusion>
136+
<groupId>com.fasterxml.jackson.core</groupId>
137+
<artifactId>jackson-core</artifactId>
138+
</exclusion>
139+
</exclusions>
140+
</dependency>
141+
<dependency>
142+
<groupId>com.fasterxml.jackson.core</groupId>
143+
<artifactId>jackson-databind</artifactId>
144+
<version>${jackson.version}</version>
145+
</dependency>
146+
<dependency>
147+
<groupId>com.fasterxml.jackson.core</groupId>
148+
<artifactId>jackson-annotations</artifactId>
149+
<version>${jackson.version}</version>
150+
</dependency>
151+
<dependency>
152+
<groupId>com.fasterxml.jackson.core</groupId>
153+
<artifactId>jackson-core</artifactId>
154+
<version>${jackson.version}</version>
133155
</dependency>
134156
<dependency>
135157
<groupId>junit</groupId>

src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,16 @@ private void delayCal(long delay, long delayThreshold) {
5656
heartbeat.setSlaveBehindMaster((int) delayVal);
5757
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
5858
} else {
59-
// master and slave maybe switch
60-
heartbeat.setSlaveBehindMaster(null);
61-
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
59+
if (heartbeat.getStatus() != MySQLHeartbeat.OK_STATUS) {
60+
long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay));
61+
LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName());
62+
heartbeat.setSlaveBehindMaster(0);
63+
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
64+
} else {
65+
// master and slave maybe switch
66+
heartbeat.setSlaveBehindMaster(null);
67+
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
68+
}
6269
}
6370
}
6471
}

src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public MySQLHeartbeat(PhysicalDbInstance dbInstance) {
8484
this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout();
8585
this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection();
8686
if (isDelayDetection) {
87-
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase());
87+
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(),
88+
dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance());
8889
} else {
8990
this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL();
9091
}
@@ -181,12 +182,12 @@ public void heartbeat() {
181182
}
182183
}
183184

184-
private String getDetectorSql(String dbGroupName, String delayDatabase) {
185+
private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) {
185186
String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()};
186187
String sourceName = Joiner.on("_").join(str);
187188
String sqlTableName = delayDatabase + ".u_delay ";
188189
String detectorSql;
189-
if (!source.isReadInstance()) {
190+
if (!readInstance) {
190191
String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)";
191192
detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName));
192193
} else {
@@ -199,9 +200,14 @@ private String getDetectorSql(String dbGroupName, String delayDatabase) {
199200
private String convert(String template, List<String> list) {
200201
StringBuilder sb = new StringBuilder(template);
201202
String replace = "?";
203+
int fromIndex = 0;
202204
for (String str : list) {
203-
int index = sb.indexOf(replace);
204-
sb.replace(index, index + 1, str);
205+
int index = sb.indexOf(replace, fromIndex);
206+
if (index < 0) {
207+
throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list);
208+
}
209+
sb.replace(index, index + replace.length(), str);
210+
fromIndex = index + str.length();
205211
}
206212
return sb.toString();
207213
}
@@ -387,11 +393,17 @@ public long getHeartbeatTimeout() {
387393
}
388394

389395
String getHeartbeatSQL() {
390-
if (isDelayDetection && !source.isReadInstance()) {
391-
return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
392-
} else {
393-
return heartbeatSQL;
396+
if (isDelayDetection) {
397+
boolean readInstance = source.isReadInstance();
398+
String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(),
399+
source.getDbGroupConfig().getDelayDatabase(), readInstance);
400+
if (!readInstance) {
401+
return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()),
402+
String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
403+
}
404+
return detectorSql;
394405
}
406+
return heartbeatSQL;
395407
}
396408

397409
public DbInstanceSyncRecorder getAsyncRecorder() {

0 commit comments

Comments
 (0)