Skip to content

Add pre-fetch quota check for JetStream Sensor backpressure#13

Merged
abdulazillow merged 9 commits intofeature/zgfrom
abdula/AIP-9985-backpressure-prefetch
Jan 15, 2026
Merged

Add pre-fetch quota check for JetStream Sensor backpressure#13
abdulazillow merged 9 commits intofeature/zgfrom
abdula/AIP-9985-backpressure-prefetch

Conversation

@abdulazillow
Copy link
Collaborator

@abdulazillow abdulazillow commented Jan 7, 2026

Add pre-fetch quota check for JetStream Sensor backpressure

Description

Implements pre-fetch quota check for JetStream Sensor to prevent message loss during resource quota exhaustion. When downstream workflow quota is near capacity, the sensor blocks before fetching messages from JetStream, keeping them safely in durable storage.

Problem: When ResourceQuota is full for extended periods, messages held in sensor memory during retries can be lost if the NATS connection drops (orphaned ACK bug).

Solution: Check ResourceQuota capacity before fetching messages from JetStream. Messages stay in durable storage until there's capacity to process them.

Configuration

env:

  • name: BACKPRESSURE_QUOTA_NAME # Required
    value: "workflow-quota"
  • name: BACKPRESSURE_CAPACITY_RATIO # Optional, default: 0.95 (5% buffer)
    value: "0.95"
  • name: BACKPRESSURE_POLL_INTERVAL # Optional, default: 30 (seconds)
    value: "30"### New Metric

argo_events_sensor_quota_blocked - Gauge: 1 when blocked, 0 when processing normally

Testing

Unit Tests: Added backpressure_test.go covering:

  • Default config values and validation
  • HasCapacity() threshold calculations
  • WaitForCapacity() blocking and context cancellation
  • Various quota/ratio combinations (5, 100, 150 workflows)

Integration Testing:

  • ResourceQuota: 75, CapacityRatio: 0.95 (threshold: 71)
  • Sent 80 messages in burst
  • Result: 0 quota errors - sensor blocked at threshold, resumed when capacity freed
  • Verified sensor_quota_blocked metric toggles correctly

Long-Running Test:

  • 8 forced EventBus disconnections over ~2 hours (~15 min intervals)
  • 970 messages processed
  • Result: 0 quota errors, 0 stuck sensors

Additional Fixes

  • Reconnection handling: Added setupBackpressure() helper called from both initial connection and reconnection paths
  • Close signal check: Added closeCh check after WaitForCapacity() to handle connection drops during blocking

Backward Compatibility

✅ Fully backward compatible - Backpressure is disabled by default (requires BACKPRESSURE_QUOTA_NAME env var)

- Add BackpressureWaiter to check ResourceQuota before fetching from JetStream
- Integrate into trigger_conn.go pullSubscribe loop
- Add sensor_quota_blocked metric
- Add env var configuration (BACKPRESSURE_QUOTA_NAME, etc.)
- Add implementation documentation

This prevents message loss when NATS connection is lost during
extended backpressure periods (quota full for hours).
TODO: Remove [BACKPRESSURE-TEST] logs after verification complete
- Add setupBackpressure() helper called from both initial and reconnect paths
- Add closeCh check after WaitForCapacity to catch close signals during poll
@dhanashritidke11
Copy link
Collaborator

Thanks, was able to verify that the fix is working.

Copy link
Collaborator

@dhanashritidke11 dhanashritidke11 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Approved

@abdulazillow abdulazillow merged commit c4c9000 into feature/zg Jan 15, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants