Skip to content

Comments

fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616

Open
johannbotha wants to merge 3 commits intomainfrom
jj.botha/kafka-dsm-cluster-id-offsets
Open

fix(dsm): add kafka_cluster_id to confluent-kafka offset/backlog tracking#16616
johannbotha wants to merge 3 commits intomainfrom
jj.botha/kafka-dsm-cluster-id-offsets

Conversation

@johannbotha
Copy link

Summary

  • Adds kafka_cluster_id to Kafka offset/backlog tracking for confluent-kafka in Data Streams Monitoring
  • Checkpoints already included cluster_id in edge tags, but offset commit and produce offset backlog paths did not
  • This caused DSM to be unable to correctly attribute backlog data to specific Kafka clusters when customers have multiple clusters sharing topic names
  • Addresses item Instrument Elasticsearch #3 (Python confluent-kafka offsets) from the cross-tracer Kafka DSM cluster ID audit

Changes

ddtrace/internal/datastreams/processor.py:

  • Added cluster_id field to PartitionKey and ConsumerPartitionKey NamedTuples (with "" default for backward compat)
  • Added cluster_id="" parameter to track_kafka_produce() and track_kafka_commit()
  • Updated _serialize_buckets() to conditionally emit kafka_cluster_id tag in backlog entries

ddtrace/contrib/internal/kafka/patch.py:

  • Updated traced_commit() to fetch cluster_id (using instance cache first, then extracting topic from args) and set it via core.set_item()

ddtrace/internal/datastreams/kafka.py:

  • Updated dsm_kafka_message_produce() callback to pass cluster_id to track_kafka_produce()
  • Updated auto-commit path in dsm_kafka_message_consume() to pass cluster_id to track_kafka_commit()
  • Updated dsm_kafka_message_commit() to read cluster_id from core and pass to track_kafka_commit()

Tests:

  • Updated PartitionKey/ConsumerPartitionKey references in all test files
  • Added test_kafka_offset_monitoring_with_cluster_id — verifies serialized backlogs include kafka_cluster_id tag
  • Added test_kafka_offset_monitoring_without_cluster_id_omits_tag — verifies tag is omitted when cluster_id is empty
  • Added test_data_streams_kafka_offset_backlog_has_cluster_id — integration test against real broker

Design Decisions

  • cluster_id is part of the key (not metadata): Two Kafka clusters with the same topic name have independent offset spaces. If cluster_id were metadata, last-write-wins would corrupt backlog calculations.
  • Default cluster_id="": Maintains backward compatibility with aiokafka (which doesn't fetch cluster_id) and any other callers.
  • Tag format matches Java tracer: kafka_cluster_id:<value> is consistent with Java's DataStreamsTags.createWithPartition().

Not in scope

  • aiokafka cluster_id: aiokafka does not expose cluster_id on its ClusterMetadata class. Needs upstream PR or monkey-patch — separate effort.

Test plan

  • Processor unit tests pass (test_kafka_offset_monitoring, test_kafka_offset_monitoring_with_cluster_id, test_kafka_offset_monitoring_without_cluster_id_omits_tag)
  • confluent-kafka integration tests pass (offset monitoring with messages, offsets, auto-commit, backlog cluster_id)
  • aiokafka tests pass with empty cluster_id (backward compat)
  • Existing checkpoint tests still pass (no regression)

🤖 Generated with Claude Code

…king

Checkpoints already included cluster_id in edge tags, but the offset
commit and produce offset backlog paths did not. This caused DSM to be
unable to correctly attribute backlog data to specific Kafka clusters
when multiple clusters share topic names.

Changes:
- Add cluster_id field to PartitionKey and ConsumerPartitionKey
- Pass cluster_id through traced_commit, dsm_kafka_message_commit,
  and the auto-commit path in dsm_kafka_message_consume
- Include kafka_cluster_id tag in serialized backlog entries
- Update all tests for the new key structure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@cit-pr-commenter-54b7da
Copy link

cit-pr-commenter-54b7da bot commented Feb 20, 2026

Codeowners resolved as

ddtrace/internal/datastreams/aiokafka.py                                @DataDog/data-streams-monitoring
tests/datastreams/test_processor.py                                     @DataDog/data-streams-monitoring

@datadog-datadog-prod-us1
Copy link
Contributor

datadog-datadog-prod-us1 bot commented Feb 20, 2026

⚠️ Tests

Fix all issues with BitsAI or with Cursor

⚠️ Warnings

❄️ 1 New flaky test detected

test_data_streams_kafka_offset_monitoring_auto_commit[py3.12] from test_kafka_dsm.py (Datadog) (Fix with Cursor)
assert 6 == 5

ℹ️ Info

🧪 All tests passed

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: c242eb6 | Docs | Datadog PR Page | Was this helpful? Give us feedback!

- Normalize cluster_id to "" before setting on core (prevents None propagation)
- Use pytest.skip() instead of silent if-guard in backlog cluster_id test
- Add comment explaining why any topic suffices for cluster_id lookup
- Add TODO comment in aiokafka noting cluster_id gap for follow-up

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@johannbotha
Copy link
Author

johannbotha commented Feb 20, 2026

Test setup
image (17)

Before

Screenshot 2026-02-20 at 2 11 46 PM

After
image

@johannbotha johannbotha marked this pull request as ready for review February 20, 2026 19:34
@johannbotha johannbotha requested review from a team as code owners February 20, 2026 19:34
if record_metadata is not None:
reported_offset = record_metadata.offset if isinstance(record_metadata.offset, int) else -1
# TODO: aiokafka does not expose cluster_id on its ClusterMetadata class.
# cluster_id support requires an upstream PR or monkey-patch. See audit doc item #4.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this line

Flaky tests: the serialization tests used a shared module-level
processor whose _serialize_buckets() clears buckets as a side effect.
When test execution order varied across Python versions, buckets were
already drained. Fix by using a fresh DataStreamsProcessor per test.

Also remove TODO comment in aiokafka.py per rob's review.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
return func(*args, **kwargs)

# Fetch cluster_id for offset tracking -- try cache first, then extract topic from args.
# Any topic suffices for _get_cluster_id because cluster_id is per-cluster, not per-topic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I don't think these comments are too useful


# Fetch cluster_id for offset tracking -- try cache first, then extract topic from args.
# Any topic suffices for _get_cluster_id because cluster_id is per-cluster, not per-topic.
cluster_id = getattr(instance, "_dd_cluster_id", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this defaults to "" rather than None, I think we can remove a number of the cluster_id or "" usages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants