fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616
Open
johannbotha wants to merge 3 commits intomainfrom
Open
fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616johannbotha wants to merge 3 commits intomainfrom
johannbotha wants to merge 3 commits intomainfrom
Conversation
…king Checkpoints already included cluster_id in edge tags, but the offset commit and produce offset backlog paths did not. This caused DSM to be unable to correctly attribute backlog data to specific Kafka clusters when multiple clusters share topic names. Changes: - Add cluster_id field to PartitionKey and ConsumerPartitionKey - Pass cluster_id through traced_commit, dsm_kafka_message_commit, and the auto-commit path in dsm_kafka_message_consume - Include kafka_cluster_id tag in serialized backlog entries - Update all tests for the new key structure Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codeowners resolved as |
Contributor
|
✨ Fix all issues with BitsAI or with Cursor
|
- Normalize cluster_id to "" before setting on core (prevents None propagation) - Use pytest.skip() instead of silent if-guard in backlog cluster_id test - Add comment explaining why any topic suffices for cluster_id lookup - Add TODO comment in aiokafka noting cluster_id gap for follow-up Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author
| if record_metadata is not None: | ||
| reported_offset = record_metadata.offset if isinstance(record_metadata.offset, int) else -1 | ||
| # TODO: aiokafka does not expose cluster_id on its ClusterMetadata class. | ||
| # cluster_id support requires an upstream PR or monkey-patch. See audit doc item #4. |
Contributor
There was a problem hiding this comment.
Can remove this line
Flaky tests: the serialization tests used a shared module-level processor whose _serialize_buckets() clears buckets as a side effect. When test execution order varied across Python versions, buckets were already drained. Fix by using a fresh DataStreamsProcessor per test. Also remove TODO comment in aiokafka.py per rob's review. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| return func(*args, **kwargs) | ||
|
|
||
| # Fetch cluster_id for offset tracking -- try cache first, then extract topic from args. | ||
| # Any topic suffices for _get_cluster_id because cluster_id is per-cluster, not per-topic. |
Contributor
There was a problem hiding this comment.
[nit] I don't think these comments are too useful
|
|
||
| # Fetch cluster_id for offset tracking -- try cache first, then extract topic from args. | ||
| # Any topic suffices for _get_cluster_id because cluster_id is per-cluster, not per-topic. | ||
| cluster_id = getattr(instance, "_dd_cluster_id", None) |
Contributor
There was a problem hiding this comment.
If this defaults to "" rather than None, I think we can remove a number of the cluster_id or "" usages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Summary
kafka_cluster_idto Kafka offset/backlog tracking for confluent-kafka in Data Streams Monitoringcluster_idin edge tags, but offset commit and produce offset backlog paths did notChanges
ddtrace/internal/datastreams/processor.py:cluster_idfield toPartitionKeyandConsumerPartitionKeyNamedTuples (with""default for backward compat)cluster_id=""parameter totrack_kafka_produce()andtrack_kafka_commit()_serialize_buckets()to conditionally emitkafka_cluster_idtag in backlog entriesddtrace/contrib/internal/kafka/patch.py:traced_commit()to fetch cluster_id (using instance cache first, then extracting topic from args) and set it viacore.set_item()ddtrace/internal/datastreams/kafka.py:dsm_kafka_message_produce()callback to passcluster_idtotrack_kafka_produce()dsm_kafka_message_consume()to passcluster_idtotrack_kafka_commit()dsm_kafka_message_commit()to readcluster_idfrom core and pass totrack_kafka_commit()Tests:
PartitionKey/ConsumerPartitionKeyreferences in all test filestest_kafka_offset_monitoring_with_cluster_id— verifies serialized backlogs includekafka_cluster_idtagtest_kafka_offset_monitoring_without_cluster_id_omits_tag— verifies tag is omitted when cluster_id is emptytest_data_streams_kafka_offset_backlog_has_cluster_id— integration test against real brokerDesign Decisions
cluster_idis part of the key (not metadata): Two Kafka clusters with the same topic name have independent offset spaces. Ifcluster_idwere metadata, last-write-wins would corrupt backlog calculations.cluster_id="": Maintains backward compatibility with aiokafka (which doesn't fetch cluster_id) and any other callers.kafka_cluster_id:<value>is consistent with Java'sDataStreamsTags.createWithPartition().Not in scope
cluster_idon itsClusterMetadataclass. Needs upstream PR or monkey-patch — separate effort.Test plan
🤖 Generated with Claude Code