Skip to content

AIP-9757: Skip SQS Poll when Jetstream Eventbus is full#11

Merged
dhanashritidke11 merged 7 commits intofeature/zgfrom
dhanashrit/AIP-9757/skip-sqs-poll-when-jetstream-eventbus-full
Nov 6, 2025
Merged

AIP-9757: Skip SQS Poll when Jetstream Eventbus is full#11
dhanashritidke11 merged 7 commits intofeature/zgfrom
dhanashrit/AIP-9757/skip-sqs-poll-when-jetstream-eventbus-full

Conversation

@dhanashritidke11
Copy link
Collaborator

Description

Implements capacity-based polling control for AWS SQS EventSource with backpressure handling to prevent message loss when the JetStream event bus is at capacity.

Added optional skipPollingWhenEventBusFull configuration that checks event bus capacity before each SQS poll. When enabled:

  • Event bus capacity is checked before polling SQS
  • If event bus is full, polling is skipped and messages remain in SQS
  • Polling automatically resumes when capacity becomes available

Configuration

apiVersion: argoproj.io/v1alpha1
  kind: EventSource
  metadata:
    name: aws-sqs
  spec:
    sqs:
      example:
        queue: "my-queue"
        region: "us-west-2"
        accessKey:
          name: aws-secret
          key: accesskey
        secretKey:
          name: aws-secret
          key: secretkey
        waitTimeSeconds: 20

        # Skip polling SQS when event bus is at capacity
        # Recommended: true for production use with JetStream
        skipPollingWhenEventBusFull: true

Backward Compatibility

✅ Fully backward compatible

  • The skipPollingWhenEventBusFull field is optional and defaults to false
  • Existing SQS EventSource configurations continue to work without any changes
  • No breaking changes to the API or existing behavior
  • When disabled (default), SQS polling behaves exactly as before
  • No migration required for existing deployments

Test Results

  1. Capacity Saturation Test
  • Setup: Configure JetStream event bus with low MaxMsgs limit, enable skipPollingWhenEventBusFull: true
  • Expected: SQS polling stops when MaxMsgs reached, logs indicate capacity full
  • Verified: No SQS ReceiveMessage API calls while event bus is full
2025-10-27T16:59:21.719-07:00{"level":"info","ts":1761609561.7191703,"logger":"argo-events.eventsource","caller":"awssqs/start.go:94","msg":"EventBus at capacity - MaxMsgs limit reached","eventSourceName":"aip-test","eventSourceType":"sqs","eventName":"example","eventSource":"aip-test","eventName":"example","currentMsgs":1,"maxMsgs":1}
  1. Capacity Recovery Test
  • Setup: Fill event bus to capacity, then consume messages to free space
  • Expected: Polling automatically resumes, log shows "EventBus capacity available again, resuming SQS polling"
  • Verified: SQS messages are processed after capacity becomes available
{"level":"info","ts":1761611036.2187438,"logger":"argo-events.eventsource","caller":"awssqs/start.go:186","msg":"EventBus capacity available again, resuming SQS polling","eventSourceName":"aip-test","eventSourceType":"sqs","eventName":"example","eventSource":"aip-test","eventName":"example","wasFull":1460}
  1. Message Integrity Test
  • Setup: Saturate event bus, publish messages to SQS, wait for capacity to free
  • Expected: All SQS messages processed after capacity recovery, none moved to DLQ
  • Verified: Message count matches, no messages in DLQ (Attached below)
  1. Backward Compatibility Test
  • Setup: Existing SQS EventSource without skipPollingWhenEventBusFull field
  • Expected: SQS polling continues normally regardless of event bus capacity
  • Verified: Default behavior unchanged
  1. JetStream Unavailability Test
  • Setup: Disconnect or crash JetStream, enable skipPollingWhenEventBusFull: true
  • Expected: SQS polling stops (fail-safe), logs show "JetStream context not available"
  • Verified: Messages remain in SQS, not lost (Attached below)
  1. Extended Saturation Logging Test
  • Setup: Keep event bus full for >10 minutes
  • Expected: Warning logs every 10 minutes with duration tracking
  • Verified: Log format: "EventBus has been full for extended period, duration: 10m0s"
2025-10-27T17:09:11.929-07:00{"level":"warn","ts":1761610151.9296772,"logger":"argo-events.eventsource","caller":"awssqs/start.go:169","msg":"EventBus has been full for extended period","eventSourceName":"aip-test","eventSourceType":"sqs","eventName":"example","eventSource":"aip-test","eventName":"example","duration":600,"consecutiveChecks":60}
Screenshot 2025-10-27 at 6 03 22 PM Screenshot 2025-10-27 at 6 02 37 PM

@dhanashritidke11 dhanashritidke11 self-assigned this Oct 28, 2025
@dhanashritidke11 dhanashritidke11 marked this pull request as ready for review October 28, 2025 18:05
zap.String("eventName", el.GetEventName()))
}

time.Sleep(10 * time.Second) // Wait before checking capacity again

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be a configurable setting with a default?

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider making this setting configurable, however I decided other wise in favor of the following:

  1. Simplicity
  2. Not expose too much control over this field to customers as we might need to consider adding guardrails: e.g. an allowable upper limit
  3. Consistent with other places where it is hardcoded - example here

return true
}

streamInfo, err := el.JSContext.StreamInfo("default")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a name to use? could it the stream be named other than default ?
like here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common.JetStreamStreamName gets evaluated to default defined here:

JetStreamStreamName = "default"

However, it would be better to use the constant instead of hardcoding it. Thanks for catching it!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

}

// isEventBusFull checks if the event bus is at capacity or unavailable
func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should isEventBusFull logic be outside of awssqs event source logic?
what happens if RMX switch from SQS to Kafka upstream?

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scope of this solution is limited to:

  1. SQS EventSource
  2. Jetstream Eventbus

If RMX switches to Kafka (which I believe they don't plan to at least in the near future as they shared in our recent sync) we can revisit as a fast follow.
Scope for design defined in the design doc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.

Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should ensure that we have a process in place where we are consulted before RMX decides to update an EventSource as it would be a platform level functionality that we'd need to support.
To close the loop on this thread, I wanted to share that I also reached out to Will and he confirmed as well that there are no plans to move to Kafka:

Between the python migration efforts, some data contract workstreams within rich media, and the ongoing science efforts for UWT and UFO, we aren't currently prioritizing that investigation or making those changes

Copy link

@talebzeghmi talebzeghmi Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What guardrails do we have in place if RMX in the future changes to Kafka without notifying us (this is a human process that could break) [1].

Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?

[1] where even if we are notified, we could have forgotten that throttling only works on SQS or we have other SMEs working in the area.

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?

Hey Taleb, I understand your concern about RMX changing the EventSource to Kafka and potentially run into EventSource errors because the Eventbus is full. I believe we should have all the relevant details in logs (like the EventSource errors and the EventBus config that the Eventsource relies on) and can leverage that info to add an alert with additional recommendations (e.g.recommend updating the Eventbus config based on EventSource)

As an additional precaution, I will ensure to add a NOTE in our eventing documentation with these recommendations so this information can be easily gleaned. (Jira ticket to track)

As a final resort, I believe RMX can raise a support request with us and from our documentation we/SME should be able to guide them.

Do you have any other ideas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this ticket (link) to add guardrail in CICD for non SQS eventsources

continue
}

// Set JetStream context for SQS event sources to enable capacity checking

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set only for SQS?
What happens of there is no sqs event listener?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the scope of this solution is limited to SQS EventSource.
So for any EventSource other than SQS, it would be a No-op.

In the future, If we want to expand this solution to allow additional eventsources (webhook, Kafka, etc) we can check for other sources and set context as needed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to architect this pattern around all sources?
What happens if another upstream source is selected without our knowledge?

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should abstract this out to account for all sources once we have at least 2-3 use cases which is why I view this as a v2/fast follow.
Currently maintaining this feature specifically for SQS and Jetstream helps avoid complexity.

What happens if another upstream source is selected without our knowledge?

The other sources will continue to behave as they do today as we ensure backward compatibility is maintained. We can add an additional check to set context only when skipPollingWhenEventBusFull is enabled for an additional layer of security however setting the context itself won't change any behavior.

log.Warnw("JetStream context not available, treating as unavailable to prevent message loss",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()))
return true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if JSContext is missing; it means the eventBus isnt using jetStream underlying; its using something else like kafka. In that case; shouldnt we fall back to legacy behavior even if skipPollingWhenEventBusFull is set to true. by legacy behavior i mean, no blocking at all. Currently; it seems if JSContext is nil (assuming eventBus is kafka) and skipPollingWhenEventBusFull is enabled; the eventSource will permanently block.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular condition was intended to capture the case where the jetstream connection itself is not available. The intent was to use SkipPollingWhenEventBusFull only for Jetstream eventbus. While we have recommended this in the docs, I see how it will be helpful to add additional defensive checks to only call isEventBusFull for a jetstream eventbus and fallback to legacy behavior for others like Kafka as you pointed. Let me add this check before calling isEventBusFull.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

}

// isEventBusFull checks if the event bus is at capacity or unavailable
func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.

Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.

… Rely on eventbus connection to account for getting accurate eventbus capacity to account for reconnects
Copy link

@talebzeghmi talebzeghmi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for doing this and addressing feedback!

logger.Debugw("Set EventBus connection for SQS event source",
zap.String("eventSource", server.GetEventSourceName()),
zap.String("eventName", server.GetEventName()))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: the tabbing of this code is off.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it thanks for catching!

zap.String("eventName", el.GetEventName()),
zap.Duration("wasFull", time.Duration(consecutiveFullChecks*10)*time.Second))
consecutiveFullChecks = 0
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logs could be replaced with a single log and prometheus metric to easily dashboard and create alerts for

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to add prometheus metric and verified in sandbox (metric)

zap.String("eventName", el.GetEventName()))
}

time.Sleep(10 * time.Second) // Wait before checking capacity again

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be configurable, as some downstream sensor may need sleep in the order of milliseconds or 100ms..
Not in our use case yet, but if it's easy to be configurable that'd be helpful

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to be configurable

@dhanashritidke11 dhanashritidke11 merged commit edfe119 into feature/zg Nov 6, 2025
1 check passed
dhanashritidke11 added a commit that referenced this pull request Nov 14, 2025
…when-jetstream-eventbus-full

AIP-9757: Skip SQS Poll when Jetstream Eventbus is full
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants