Skip to content

KAFKA-15529: Fix race condition between isConsumed and position updates in AsyncKafkaConsumer fetch path#21476

Open
majialoong wants to merge 1 commit intoapache:trunkfrom
majialoong:KAFKA-15529
Open

KAFKA-15529: Fix race condition between isConsumed and position updates in AsyncKafkaConsumer fetch path#21476
majialoong wants to merge 1 commit intoapache:trunkfrom
majialoong:KAFKA-15529

Conversation

@majialoong
Copy link
Contributor

Previously, isConsumed was not volatile and drain() set isConsumed = true before the position was updated. In the window between drain() and the position update, the background thread could race in, observe isConsumed == true but read a stale position, leading to duplicate fetch requests with the old offset.

This PR declares isConsumed as volatile and reorders the operations so that the subscription position is always updated before drain() sets isConsumed = true, ensuring the background thread always sees the updated position when it observes isConsumed == true.

@github-actions github-actions bot added triage PRs from the community consumer build Gradle build or GitHub Actions clients small Small PRs labels Feb 13, 2026
@majialoong
Copy link
Contributor Author

To analyze this problem, I added logs in the following four places:

  1. AbstractFetch#prepareFetchRequests: This log can record the position used when built a fetch request.
image
  1. CompletedFetch#drain: This log can record the update time of isConsumed.
image
  1. FetchCollector#fetchRecords: This log can record the update time of the position.
image
  1. FetchCollector#handleInitializeSuccess: This log indicates that the client did indeed receive a duplicate offset response.
image

When a flaky condition occurs, the log output is as follows, showing that two requests were made for offset=2:
image

The log output was out of order. The logs were reordered based on time, and the results are as follows:

[RACE-DIAG] fetch request built for partition topicA-0 at offset 0 thread=consumer_background_thread time=3109265203665

[RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with nextFetchOffset=1 thread=Test worker time=3109276422998
[RACE-DIAG] position updated for partition topicA-0 from offset 0 to offset 1 thread=Test worker time=3109276560248
[RACE-DIAG] fetch request built for partition topicA-0 at offset 1 thread=consumer_background_thread time=3109277235165

[RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with nextFetchOffset=2 thread=Test worker time=3109283355457
[RACE-DIAG] position updated for partition topicA-0 from offset 1 to offset 2 thread=Test worker time=3109283482540
[RACE-DIAG] fetch request built for partition topicA-0 at offset 2 thread=consumer_background_thread time=3109284318457

[RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with nextFetchOffset=3 thread=Test worker time=3109297402665
// ---> The Fetch request was built before the position update, so the offset is still 2.
[RACE-DIAG] fetch request built for partition topicA-0 at offset 2 thread=consumer_background_thread time=3109297466248
[RACE-DIAG] position updated for partition topicA-0 from offset 2 to offset 3 thread=Test worker time=3109297562290
// ---> This indicates that the client received a data response with an incorrect offset.
[RACE-DIAG] Discarding stale fetch response for partition topicA-0 fetchOffset=2 currentPosition=FetchPosition{offset=3, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:44561 (id: 1 rack: null isFenced: false)], epoch=1}} thread=Test worker time=3109339984332
[RACE-DIAG] fetch request built for partition topicA-0 at offset 3 thread=consumer_background_thread time=3109348331623

[RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with nextFetchOffset=4 thread=Test worker time=3109349705332
[RACE-DIAG] position updated for partition topicA-0 from offset 3 to offset 4 thread=Test worker time=3109349783498
[RACE-DIAG] fetch request built for partition topicA-0 at offset 4 thread=consumer_background_thread time=3109350251540 

The logs indicate that the fetch request was built using the old position and was discarded on the client side.

I ran the test method 500 times locally using the modified code from the current PR, and the flaky issue no longer occurred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions ci-approved clients consumer small Small PRs triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants