Skip to content

fix: enforce disableBufferFullDiscard when qos>=1#4024

Open
pulkitvats2007-crypto wants to merge 2 commits into
lf-edge:masterfrom
pulkitvats2007-crypto:fix-qos-buffer-full-discard
Open

fix: enforce disableBufferFullDiscard when qos>=1#4024
pulkitvats2007-crypto wants to merge 2 commits into
lf-edge:masterfrom
pulkitvats2007-crypto:fix-qos-buffer-full-discard

Conversation

@pulkitvats2007-crypto

Copy link
Copy Markdown

Description

This PR fixes a critical data loss issue in the streaming pipeline where messages could be silently dropped under backpressure, even when QoS was set to At-Least-Once (1) or Exactly-Once (2).

Previously, when output channels became full, the system would drop the oldest messages by default (disableBufferFullDiscard == false). This behavior was not overridden for higher QoS levels, breaking delivery guarantees and potentially causing permanent data loss.

Fix

Updated SetQos in internal/topo/node/node.go to enforce safe backpressure handling:

func (o *defaultNode) SetQos(qos def.Qos) {
    o.qos = qos
    if qos >= def.AtLeastOnce {
        o.disableBufferFullDiscard = true
    }
}

This ensures that for QoS ≥ 1, the system blocks instead of dropping messages, preserving data integrity.

Testing

• Added TestSetQos to verify correct behavior of disableBufferFullDiscard for different QoS levels.
• All new tests pass successfully.
• Existing unrelated test failures were observed but are pre-existing.

Bug Summary

• Issue: Silent data loss due to buffer overflow handling ignoring QoS guarantees
• Root Cause: Lossy default backpressure strategy not overridden for QoS ≥ 1
• Affected Area: doBroadcast and QoS configuration logic

Impact

• Restores correct At-Least-Once and Exactly-Once semantics
• Prevents silent data loss under high throughput or slow sinks
• Ensures checkpoint integrity and accurate offset commits

Fixes #4023

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent silent data loss under congestion by ensuring that for QoS ≥ 1 (at-least-once / exactly-once) the pipeline uses backpressure (blocking) instead of dropping buffered messages.

Changes:

  • Changed RuleOption.DisableBufferFullDiscard from bool to *bool to distinguish “unset” from “explicitly false”.
  • Defaulted DisableBufferFullDiscard to true during rule option validation when Qos >= AtLeastOnce, and added a warning for inconsistent configuration.
  • Updated planner/node/test code to handle the pointer semantics (nil vs true/false).

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/topo/topotest/rule_test.go Updates test options to pass *bool for DisableBufferFullDiscard.
internal/topo/planner/planner_test.go Updates planner test to use *bool for DisableBufferFullDiscard.
internal/topo/planner/planner.go Treats DisableBufferFullDiscard == nil as disabled; avoids nil deref.
internal/topo/node/node.go Initializes node discard behavior from *bool rule option.
internal/pkg/def/rule.go Changes DisableBufferFullDiscard type to *bool in RuleOption.
internal/conf/conf.go Enforces default disableBufferFullDiscard=true for QoS≥1 when unset; warns on explicit false.
Comments suppressed due to low confidence (1)

internal/topo/planner/planner.go:515

  • With DisableBufferFullDiscard now being auto-defaulted for QoS>=1 (ValidateRuleOption), this error can be triggered even when the user did not explicitly enable disableBufferFullDiscard. The message "disableBufferFullDiscard can't be enabled..." may be misleading in that case; consider making the error mention that QoS>=1 requires backpressure/no-discard and that shared streams are therefore incompatible (or otherwise clarify the root cause).
	if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
		return nil
	}
	for _, stream := range streams {
		if stream.stmt.Options.SHARED {
			return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
		}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/topo/node/node.go
Comment thread internal/conf/conf.go
Comment thread internal/conf/conf.go
Comment thread internal/topo/topotest/rule_test.go Outdated
Comment thread internal/topo/planner/planner_test.go
Comment thread internal/topo/topotest/rule_test.go Outdated
Comment thread internal/topo/topotest/rule_test.go Outdated
@pulkitvats2007-crypto pulkitvats2007-crypto force-pushed the fix-qos-buffer-full-discard branch 2 times, most recently from 5cb7238 to 040b5ee Compare April 17, 2026 11:21
@ngjaying ngjaying changed the title fix: prevent silent data loss by enforcing backpressure for QoS >= 1 fix: enforce disableBufferFullDiscard when qos>=1 Apr 30, 2026
@ngjaying ngjaying added this to the v2.4.0 milestone May 8, 2026
@ngjaying

ngjaying commented May 8, 2026

Copy link
Copy Markdown
Collaborator

Hi @pulkitvats2007-crypto,

Thank you for your contribution to addressing this critical issue! The changes look promising and are a great step toward ensuring data integrity under higher QoS levels.

We noticed that the PR has been inactive for a while. Could you kindly let us know if you plan to continue working on this? If there are any blockers or areas where you need assistance, feel free to let us know, and we’d be happy to help.

Looking forward to your updates!

@ngjaying ngjaying force-pushed the fix-qos-buffer-full-discard branch from 040b5ee to a7127c4 Compare May 13, 2026 07:03
…tage

Signed-off-by: pulkitvats2007-crypto <pulkitvats2007@gmail.com>
@ngjaying ngjaying force-pushed the fix-qos-buffer-full-discard branch 4 times, most recently from 370f466 to 6ea3b5d Compare May 13, 2026 07:27
@ngjaying ngjaying requested a review from Copilot May 13, 2026 07:48

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Comment thread internal/topo/node/node.go
Comment thread internal/pkg/def/rule.go
@ngjaying ngjaying force-pushed the fix-qos-buffer-full-discard branch 2 times, most recently from 9b062b5 to 9a5b04c Compare May 13, 2026 08:05
@ngjaying ngjaying requested a review from Copilot May 13, 2026 08:11
Signed-off-by: Jiayin Ng <ngjaying@gmail.com>
@ngjaying ngjaying force-pushed the fix-qos-buffer-full-discard branch from 9a5b04c to 9fa5376 Compare May 13, 2026 08:14

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

internal/topo/planner/planner.go:515

  • checkSharedSourceOption now treats DisableBufferFullDiscard=nil as disabled, but for QoS>=AtLeastOnce ValidateRuleOption can auto-set DisableBufferFullDiscard=true. In that case, this error will be triggered even if the user never explicitly enabled the flag, and the message "disableBufferFullDiscard can't be enabled..." becomes misleading. Consider adjusting the error to mention that QoS>=1 requires disableBufferFullDiscard and that QoS>=1 is incompatible with shared streams (or include the effective QoS/option values in the message).
	if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
		return nil
	}
	for _, stream := range streams {
		if stream.stmt.Options.SHARED {
			return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
		}

@codecov

codecov Bot commented May 13, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 70.68%. Comparing base (d385e9f) to head (9fa5376).
⚠️ Report is 3 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4024      +/-   ##
==========================================
- Coverage   70.70%   70.68%   -0.02%     
==========================================
  Files         460      460              
  Lines       53454    53581     +127     
==========================================
+ Hits        37792    37873      +81     
- Misses      12728    12764      +36     
- Partials     2934     2944      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Silent Data Loss and Broken QoS Guarantees Due To Default Buffer-Full Discard

3 participants