Skip to content

Commit 1ec3092

Browse files
committed
[fix](editlog) Fix concurrency bugs and robustness issues in editlog subsystem
1. EditLog.txId data race: Convert `txId` from plain `long` to `AtomicLong`. The flusher thread and DDL caller threads both read/write `txId` without synchronization, causing torn reads on 64-bit values. 2. EditLog.logEdit(op, List<T>) missing roll check: This bulk-write path incremented txId but never checked against `edit_log_roll_num`, so the edit log would never roll when this path was used. 3. BDBEnvironment.close() not thread-safe: `close()` iterated `openedDatabases` without holding `lock.writeLock()`, while `openDatabase()` concurrently modifies the same list under the write lock. This causes ConcurrentModificationException when a RollbackException triggers close() on the replayer thread while HTTP/heartbeat/RPC threads call openDatabase() via getMaxJournalId(). 4. EditLog.flushEditLog() silent hang on fatal error: When `System.exit(-1)` is called after a write failure, producer threads blocked on `req.lock.wait()` are never notified. If `System.exit` is delayed by shutdown hooks, those threads hang indefinitely. 5. BDBJEJournal.write() burns journal IDs on failure: Both single and batch write methods called `nextJournalId.getAndIncrement/getAndAdd` before the actual write, so a failed write permanently skips journal IDs. Now IDs are only advanced after successful commit. 6. BDBEnvironment.removeDatabase() index corruption: Manual index tracking with `openedDatabases.remove(index)` computed wrong index when the target was not the first element. Replaced with `iterator.remove()`. 7. JournalObservable.upperBound() off-by-one: The binary search used `right = size - 1` and `right = middle - 1`, which could miss notifying observers whose targetJournalVersion exactly equals the journalId. Fixed to standard upper_bound pattern. 8. Remove dead `editStream` field and `getEditLogSize()` method from EditLog. The field was never assigned (always null), so `getEditLogSize()` would always NPE.
1 parent 9ecfd40 commit 1ec3092

File tree

4 files changed

+67
-62
lines changed

4 files changed

+67
-62
lines changed

fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -312,21 +312,15 @@ public Database openDatabase(String dbName) {
312312
public void removeDatabase(String dbName) {
313313
lock.writeLock().lock();
314314
try {
315-
String targetDbName = null;
316-
int index = 0;
317-
for (Database db : openedDatabases) {
315+
for (java.util.Iterator<Database> iter = openedDatabases.iterator(); iter.hasNext();) {
316+
Database db = iter.next();
318317
String name = db.getDatabaseName();
319318
if (dbName.equals(name)) {
320319
db.close();
321320
LOG.info("database {} has been closed", name);
322-
targetDbName = name;
321+
iter.remove();
323322
break;
324323
}
325-
index++;
326-
}
327-
if (targetDbName != null) {
328-
LOG.info("begin to remove database {} from openedDatabases", targetDbName);
329-
openedDatabases.remove(index);
330324
}
331325
try {
332326
LOG.info("begin to remove database {} from replicatedEnvironment", dbName);
@@ -481,32 +475,37 @@ public List<Long> getDatabaseNames() {
481475

482476
// Close the store and environment
483477
public void close() {
484-
for (Database db : openedDatabases) {
485-
try {
486-
db.close();
487-
} catch (DatabaseException exception) {
488-
LOG.error("Error closing db {} will exit", db.getDatabaseName(), exception);
478+
lock.writeLock().lock();
479+
try {
480+
for (Database db : openedDatabases) {
481+
try {
482+
db.close();
483+
} catch (DatabaseException exception) {
484+
LOG.error("Error closing db {} will exit", db.getDatabaseName(), exception);
485+
}
489486
}
490-
}
491-
openedDatabases.clear();
487+
openedDatabases.clear();
492488

493-
if (epochDB != null) {
494-
try {
495-
epochDB.close();
496-
epochDB = null;
497-
} catch (DatabaseException exception) {
498-
LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception);
489+
if (epochDB != null) {
490+
try {
491+
epochDB.close();
492+
epochDB = null;
493+
} catch (DatabaseException exception) {
494+
LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception);
495+
}
499496
}
500-
}
501497

502-
if (replicatedEnvironment != null) {
503-
try {
504-
// Finally, close the store and environment.
505-
replicatedEnvironment.close();
506-
replicatedEnvironment = null;
507-
} catch (DatabaseException exception) {
508-
LOG.error("Error closing replicatedEnvironment", exception);
498+
if (replicatedEnvironment != null) {
499+
try {
500+
// Finally, close the store and environment.
501+
replicatedEnvironment.close();
502+
replicatedEnvironment = null;
503+
} catch (DatabaseException exception) {
504+
LOG.error("Error closing replicatedEnvironment", exception);
505+
}
509506
}
507+
} finally {
508+
lock.writeLock().unlock();
510509
}
511510
}
512511

fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ public synchronized long write(JournalBatch batch) throws IOException {
131131
List<JournalBatch.Entity> entities = batch.getJournalEntities();
132132
int entitySize = entities.size();
133133
long dataSize = 0;
134-
long firstId = nextJournalId.getAndAdd(entitySize);
134+
// Reserve IDs only after successful commit to avoid burning IDs on write failure.
135+
long firstId = nextJournalId.get();
135136

136137
// Write the journals to bdb.
137138
for (int i = 0; i < RETRY_TIME; i++) {
@@ -155,6 +156,7 @@ public synchronized long write(JournalBatch batch) throws IOException {
155156

156157
txn.commit();
157158
txn = null;
159+
nextJournalId.addAndGet(entitySize);
158160

159161
if (MetricRepo.isInit) {
160162
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize);
@@ -237,8 +239,9 @@ public synchronized long write(short op, Writable writable) throws IOException {
237239
entity.setOpCode(op);
238240
entity.setData(writable);
239241

240-
// id is the key
241-
long id = nextJournalId.getAndIncrement();
242+
// id is the key. Reserve ID only after successful write to avoid burning IDs on failure.
243+
// This is safe because the method is synchronized.
244+
long id = nextJournalId.get();
242245
DatabaseEntry theKey = idToKey(id);
243246

244247
// entity is the value
@@ -273,6 +276,7 @@ public synchronized long write(short op, Writable writable) throws IOException {
273276
// Parameter null means auto commit
274277
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
275278
writeSucceed = true;
279+
nextJournalId.incrementAndGet();
276280
if (LOG.isDebugEnabled()) {
277281
LOG.debug("master write journal {} finished. db name {}, current time {}",
278282
id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,7 @@ public long await() {
167167
private final BlockingQueue<EditLogItem> logEditQueue = new LinkedBlockingQueue<>();
168168
private final Thread flushThread;
169169

170-
private EditLogOutputStream editStream = null;
171-
172-
private long txId = 0;
170+
private final AtomicLong txId = new AtomicLong(0);
173171

174172

175173
private AtomicLong numTransactions = new AtomicLong(0);
@@ -255,6 +253,14 @@ private void flushEditLog() {
255253
}
256254
}
257255
} catch (Throwable t) {
256+
// Notify all pending items so producer threads don't hang if System.exit is delayed
257+
for (EditLogItem req : batch) {
258+
synchronized (req.lock) {
259+
req.finished = true;
260+
req.logId = -1;
261+
req.lock.notifyAll();
262+
}
263+
}
258264
// Throwable contains all Exception and Error, such as IOException and
259265
// OutOfMemoryError
260266
if (journal instanceof BDBJEJournal) {
@@ -264,13 +270,13 @@ private void flushEditLog() {
264270
System.exit(-1);
265271
}
266272

267-
txId += batch.size();
273+
txId.addAndGet(batch.size());
268274
// update statistics, etc. (optional, can be added as needed)
269-
if (txId >= Config.edit_log_roll_num) {
270-
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId,
275+
if (txId.get() >= Config.edit_log_roll_num) {
276+
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId.get(),
271277
Config.edit_log_roll_num);
272278
rollEditLog();
273-
txId = 0;
279+
txId.set(0);
274280
}
275281
if (MetricRepo.isInit) {
276282
MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(Long.valueOf(batch.size()));
@@ -1557,7 +1563,13 @@ private <T extends Writable> void logEdit(short op, List<T> entries) throws IOEx
15571563
if (!batch.getJournalEntities().isEmpty()) {
15581564
journal.write(batch);
15591565
}
1560-
txId += entries.size();
1566+
long newTxId = txId.addAndGet(entries.size());
1567+
if (newTxId >= Config.edit_log_roll_num) {
1568+
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.",
1569+
newTxId, Config.edit_log_roll_num);
1570+
rollEditLog();
1571+
txId.set(0);
1572+
}
15611573
}
15621574

15631575
/**
@@ -1654,13 +1666,13 @@ private synchronized long logEditDirectly(short op, Writable writable) {
16541666
}
16551667

16561668
// get a new transactionId
1657-
txId++;
1669+
long newTxId = txId.incrementAndGet();
16581670

1659-
if (txId >= Config.edit_log_roll_num) {
1660-
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId,
1671+
if (newTxId >= Config.edit_log_roll_num) {
1672+
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", newTxId,
16611673
Config.edit_log_roll_num);
16621674
rollEditLog();
1663-
txId = 0;
1675+
txId.set(0);
16641676
}
16651677

16661678
return logId;
@@ -1709,19 +1721,12 @@ private long logEdit(short op, Writable writable) {
17091721

17101722
if (LOG.isDebugEnabled()) {
17111723
LOG.debug("nextId = {}, numTransactions = {}, totalTimeTransactions = {}, op = {} delta = {}",
1712-
txId, numTransactions, totalTimeTransactions, op, end - start);
1724+
txId.get(), numTransactions, totalTimeTransactions, op, end - start);
17131725
}
17141726

17151727
return logId;
17161728
}
17171729

1718-
/**
1719-
* Return the size of the current EditLog
1720-
*/
1721-
public synchronized long getEditLogSize() throws IOException {
1722-
return editStream.length();
1723-
}
1724-
17251730
/**
17261731
* Return the number of the current EditLog
17271732
*/

fe/fe-core/src/main/java/org/apache/doris/qe/JournalObservable.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,23 +63,20 @@ public void waitOn(Long expectedJournalVersion, int timeoutMs) throws DdlExcepti
6363
}
6464
}
6565

66-
// return min pos which is bigger than value
66+
// Return the number of elements whose targetJournalVersion <= value.
67+
// This is the standard upper_bound pattern: returns the index of the first element > value.
6768
public static int upperBound(Object[] array, int size, Long value) {
6869
int left = 0;
69-
int right = size - 1;
70+
int right = size;
7071
while (left < right) {
7172
int middle = left + ((right - left) >> 1);
72-
if (value >= ((JournalObserver) array[middle]).getTargetJournalVersion()) {
73+
if (((JournalObserver) array[middle]).getTargetJournalVersion() <= value) {
7374
left = middle + 1;
7475
} else {
75-
right = middle - 1;
76+
right = middle;
7677
}
7778
}
78-
if (right == -1) {
79-
return 0;
80-
}
81-
Long rightValue = ((JournalObserver) array[right]).getTargetJournalVersion();
82-
return value >= rightValue ? right + 1 : right;
79+
return left;
8380
}
8481

8582
public void notifyObservers(Long journalId) {

0 commit comments

Comments
 (0)