Skip to content

Commit 92fc2b2

Browse files
committed
fix unstable case
1 parent 5a5b4a0 commit 92fc2b2

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,17 @@ private SplitReadResult prepareStreamSplit(
351351
}
352352
Tuple2<SourceSplitBase, Boolean> splitFlag = createStreamSplit(offsetMeta, baseReq);
353353
this.streamSplit = splitFlag.f0.asStreamSplit();
354+
355+
// Close previous stream reader to release resources (e.g. PG replication slot)
356+
// before creating a new one. This prevents connection leaks when a cancelled
357+
// task's reader is still active while a new task arrives.
358+
if (this.streamReader != null) {
359+
LOG.info("Closing previous stream reader before creating new one for job {}",
360+
baseReq.getJobId());
361+
closeReaderInternal(this.streamReader);
362+
this.streamReader = null;
363+
}
364+
354365
this.streamReader = getBinlogSplitReader(baseReq);
355366

356367
LOG.info("Prepare stream split: {}", this.streamSplit.toString());

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,17 @@ private SplitReadResult prepareBinlogSplit(
337337
tryLoadTableSchemasFromRequest(baseReq);
338338
Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, baseReq);
339339
this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
340+
341+
// Close previous binlog reader to release resources before creating a new one.
342+
// This prevents connection leaks when a cancelled task's reader is still active
343+
// while a new task arrives.
344+
if (this.binlogReader != null) {
345+
LOG.info("Closing previous binlog reader before creating new one for job {}",
346+
baseReq.getJobId());
347+
this.binlogReader.close();
348+
this.binlogReader = null;
349+
}
350+
340351
this.binlogReader = getBinlogSplitReader(baseReq);
341352

342353
LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());

0 commit comments

Comments
 (0)