fix: enforce disableBufferFullDiscard when qos>=1#4024
fix: enforce disableBufferFullDiscard when qos>=1#4024pulkitvats2007-crypto wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent silent data loss under congestion by ensuring that for QoS ≥ 1 (at-least-once / exactly-once) the pipeline uses backpressure (blocking) instead of dropping buffered messages.
Changes:
- Changed
RuleOption.DisableBufferFullDiscardfromboolto*boolto distinguish “unset” from “explicitly false”. - Defaulted
DisableBufferFullDiscardtotrueduring rule option validation whenQos >= AtLeastOnce, and added a warning for inconsistent configuration. - Updated planner/node/test code to handle the pointer semantics (
nilvstrue/false).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/topo/topotest/rule_test.go | Updates test options to pass *bool for DisableBufferFullDiscard. |
| internal/topo/planner/planner_test.go | Updates planner test to use *bool for DisableBufferFullDiscard. |
| internal/topo/planner/planner.go | Treats DisableBufferFullDiscard == nil as disabled; avoids nil deref. |
| internal/topo/node/node.go | Initializes node discard behavior from *bool rule option. |
| internal/pkg/def/rule.go | Changes DisableBufferFullDiscard type to *bool in RuleOption. |
| internal/conf/conf.go | Enforces default disableBufferFullDiscard=true for QoS≥1 when unset; warns on explicit false. |
Comments suppressed due to low confidence (1)
internal/topo/planner/planner.go:515
- With DisableBufferFullDiscard now being auto-defaulted for QoS>=1 (ValidateRuleOption), this error can be triggered even when the user did not explicitly enable disableBufferFullDiscard. The message "disableBufferFullDiscard can't be enabled..." may be misleading in that case; consider making the error mention that QoS>=1 requires backpressure/no-discard and that shared streams are therefore incompatible (or otherwise clarify the root cause).
if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
return nil
}
for _, stream := range streams {
if stream.stmt.Options.SHARED {
return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5cb7238 to
040b5ee
Compare
|
Thank you for your contribution to addressing this critical issue! The changes look promising and are a great step toward ensuring data integrity under higher QoS levels. We noticed that the PR has been inactive for a while. Could you kindly let us know if you plan to continue working on this? If there are any blockers or areas where you need assistance, feel free to let us know, and we’d be happy to help. Looking forward to your updates! |
040b5ee to
a7127c4
Compare
…tage Signed-off-by: pulkitvats2007-crypto <pulkitvats2007@gmail.com>
370f466 to
6ea3b5d
Compare
9b062b5 to
9a5b04c
Compare
Signed-off-by: Jiayin Ng <ngjaying@gmail.com>
9a5b04c to
9fa5376
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
internal/topo/planner/planner.go:515
- checkSharedSourceOption now treats DisableBufferFullDiscard=nil as disabled, but for QoS>=AtLeastOnce ValidateRuleOption can auto-set DisableBufferFullDiscard=true. In that case, this error will be triggered even if the user never explicitly enabled the flag, and the message "disableBufferFullDiscard can't be enabled..." becomes misleading. Consider adjusting the error to mention that QoS>=1 requires disableBufferFullDiscard and that QoS>=1 is incompatible with shared streams (or include the effective QoS/option values in the message).
if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
return nil
}
for _, stream := range streams {
if stream.stmt.Options.SHARED {
return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
}
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #4024 +/- ##
==========================================
- Coverage 70.70% 70.68% -0.02%
==========================================
Files 460 460
Lines 53454 53581 +127
==========================================
+ Hits 37792 37873 +81
- Misses 12728 12764 +36
- Partials 2934 2944 +10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
This PR fixes a critical data loss issue in the streaming pipeline where messages could be silently dropped under backpressure, even when QoS was set to At-Least-Once (1) or Exactly-Once (2).
Previously, when output channels became full, the system would drop the oldest messages by default (
disableBufferFullDiscard == false). This behavior was not overridden for higher QoS levels, breaking delivery guarantees and potentially causing permanent data loss.Fix
Updated
SetQosininternal/topo/node/node.goto enforce safe backpressure handling:This ensures that for QoS ≥ 1, the system blocks instead of dropping messages, preserving data integrity.
Testing
• Added TestSetQos to verify correct behavior of disableBufferFullDiscard for different QoS levels.
• All new tests pass successfully.
• Existing unrelated test failures were observed but are pre-existing.
Bug Summary
• Issue: Silent data loss due to buffer overflow handling ignoring QoS guarantees
• Root Cause: Lossy default backpressure strategy not overridden for QoS ≥ 1
• Affected Area: doBroadcast and QoS configuration logic
Impact
• Restores correct At-Least-Once and Exactly-Once semantics
• Prevents silent data loss under high throughput or slow sinks
• Ensures checkpoint integrity and accurate offset commits
Fixes #4023