Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion docs/eventsources/setup/aws-sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The structure of an event dispatched by the event-source over the eventbus looks
// see Amazon SQS Message Attributes
// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html)
// in the Amazon Simple Queue Service Developer Guide.
"messageAttributes": "message attributes",
"messageAttributes": "message attributes",
"body": "Body is the message data",
}
}
Expand Down Expand Up @@ -66,6 +66,57 @@ The structure of an event dispatched by the event-source over the eventbus looks

1. Once a message is published, an argo workflow will be triggered. Run `argo list` to find the workflow.

## Configuration Options

### Capacity-Based Polling Control

The `skipPollingWhenEventBusFull` option enables intelligent backpressure handling to prevent message loss when the event bus is at capacity.

**How it works:**

- When enabled, the event source checks the JetStream event bus capacity before each SQS poll
- If the event bus is full or unavailable, SQS polling is skipped
- Messages remain in SQS with their visibility timeout intact
- Polling automatically resumes when event bus capacity becomes available
- Prevents SQS messages from being moved to DLQ due to event bus capacity issues

**Configuration:**

```yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: aws-sqs
spec:
sqs:
example:
# ... other configuration ...

# Skip polling SQS when event bus is at capacity
# Recommended: true (default: false)
skipPollingWhenEventBusFull: true

# Wait duration (in seconds) between EventBus capacity checks when full
# Default: 10 seconds
# Optional: Configure to adjust check frequency based on your needs
eventBusFullWaitSeconds: 10

# SQS batch size: number of messages to fetch per ReceiveMessage call
# Valid values: 1 to 10. Default: 10
# Optional
batchSize: 10
```

**Benefits:**

- **Prevents message loss**: Messages stay in SQS when the event bus can't accept them
- **Reduces API calls**: Avoids unnecessary SQS polling when downstream is saturated
- **Self-healing**: Automatically resumes when capacity is available

**Recommendations:**

- Enable this option when using JetStream event bus with limited capacity

## Troubleshoot

Please read the [FAQ](https://argoproj.github.io/argo-events/FAQ/).
10 changes: 10 additions & 0 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,16 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
// Continue starting other event services instead of failing all of them
continue
}

// Set EventBus connection for SQS event sources to enable capacity checking
// The connection reference is automatically updated on reconnection
if sqsServer, ok := server.(*awssqs.EventListener); ok {
sqsServer.SetEventBusConnection(e.eventBusConn)
logger.Debugw("Set EventBus connection for SQS event source",
zap.String("eventSource", server.GetEventSourceName()),
zap.String("eventName", server.GetEventName()))
}

wg.Add(1)
go func(s EventingServer) {
defer wg.Done()
Expand Down
119 changes: 118 additions & 1 deletion eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
sqslib "github.com/aws/aws-sdk-go/service/sqs"
"go.uber.org/zap"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
eventsource "github.com/argoproj/argo-events/eventbus/jetstream/eventsource"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
awscommon "github.com/argoproj/argo-events/eventsources/common/aws"
"github.com/argoproj/argo-events/eventsources/sources"
Expand All @@ -44,6 +47,9 @@ type EventListener struct {
EventName string
SQSEventSource v1alpha1.SQSEventSource
Metrics *metrics.Metrics
// EventBus connection for capacity checking (set during initialization)
// Stores reference to the connection, which is automatically updated on reconnection
EventBusConn eventbuscommon.EventSourceConnection
}

// GetEventSourceName returns name of event source
Expand All @@ -61,6 +67,64 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
return apicommon.SQSEvent
}

// The connection reference is automatically updated on reconnection,
// ensuring capacity checks always use the current active connection
func (el *EventListener) SetEventBusConnection(conn eventbuscommon.EventSourceConnection) {
el.EventBusConn = conn
}

// isEventBusFull checks if the event bus is at capacity or unavailable
func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should isEventBusFull logic be outside of awssqs event source logic?
what happens if RMX switch from SQS to Kafka upstream?

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scope of this solution is limited to:

  1. SQS EventSource
  2. Jetstream Eventbus

If RMX switches to Kafka (which I believe they don't plan to at least in the near future as they shared in our recent sync) we can revisit as a fast follow.
Scope for design defined in the design doc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.

Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should ensure that we have a process in place where we are consulted before RMX decides to update an EventSource as it would be a platform level functionality that we'd need to support.
To close the loop on this thread, I wanted to share that I also reached out to Will and he confirmed as well that there are no plans to move to Kafka:

Between the python migration efforts, some data contract workstreams within rich media, and the ongoing science efforts for UWT and UFO, we aren't currently prioritizing that investigation or making those changes

Copy link

@talebzeghmi talebzeghmi Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What guardrails do we have in place if RMX in the future changes to Kafka without notifying us (this is a human process that could break) [1].

Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?

[1] where even if we are notified, we could have forgotten that throttling only works on SQS or we have other SMEs working in the area.

Copy link
Collaborator Author

@dhanashritidke11 dhanashritidke11 Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?

Hey Taleb, I understand your concern about RMX changing the EventSource to Kafka and potentially run into EventSource errors because the Eventbus is full. I believe we should have all the relevant details in logs (like the EventSource errors and the EventBus config that the Eventsource relies on) and can leverage that info to add an alert with additional recommendations (e.g.recommend updating the Eventbus config based on EventSource)

As an additional precaution, I will ensure to add a NOTE in our eventing documentation with these recommendations so this information can be easily gleaned. (Jira ticket to track)

As a final resort, I believe RMX can raise a support request with us and from our documentation we/SME should be able to guide them.

Do you have any other ideas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this ticket (link) to add guardrail in CICD for non SQS eventsources

// Check if connection is available and active
if el.EventBusConn == nil || el.EventBusConn.IsClosed() {
log.Warnw("EventBus connection not available or closed, treating as unavailable to prevent message loss",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()))
return true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if JSContext is missing; it means the eventBus isnt using jetStream underlying; its using something else like kafka. In that case; shouldnt we fall back to legacy behavior even if skipPollingWhenEventBusFull is set to true. by legacy behavior i mean, no blocking at all. Currently; it seems if JSContext is nil (assuming eventBus is kafka) and skipPollingWhenEventBusFull is enabled; the eventSource will permanently block.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular condition was intended to capture the case where the jetstream connection itself is not available. The intent was to use SkipPollingWhenEventBusFull only for Jetstream eventbus. While we have recommended this in the docs, I see how it will be helpful to add additional defensive checks to only call isEventBusFull for a jetstream eventbus and fallback to legacy behavior for others like Kafka as you pointed. Let me add this check before calling isEventBusFull.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

}

jetstreamConn := el.EventBusConn.(*eventsource.JetstreamSourceConn)

if jetstreamConn.JSContext == nil {
log.Warnw("JetStream context not initialized, treating as unavailable to prevent message loss",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()))
return true
}

// Use the current connection's JSContext (automatically updated on reconnection)
streamInfo, err := jetstreamConn.JSContext.StreamInfo(common.JetStreamStreamName)
if err != nil {
// If we can't check capacity, treat as unavailable to prevent message loss
log.Warnw("Failed to get stream info, treating as unavailable to prevent message loss",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()),
zap.Error(err))
return true
}

// Check if stream is at capacity based on MaxMsgs
if streamInfo.Config.MaxMsgs > 0 && streamInfo.State.Msgs >= uint64(streamInfo.Config.MaxMsgs) {
log.Infow("EventBus at capacity - MaxMsgs limit reached",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()),
zap.Uint64("currentMsgs", streamInfo.State.Msgs),
zap.Int64("maxMsgs", streamInfo.Config.MaxMsgs))
return true
}

// Event bus has available capacity
log.Debugw("EventBus has available capacity, proceeding with SQS poll",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()),
zap.Uint64("currentMsgs", streamInfo.State.Msgs),
zap.Int64("maxMsgs", streamInfo.Config.MaxMsgs),
zap.Uint64("currentBytes", streamInfo.State.Bytes),
zap.Int64("maxBytes", streamInfo.Config.MaxBytes))

return false
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
log := logging.FromContext(ctx).
Expand Down Expand Up @@ -93,14 +157,67 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

log.Info("listening for messages on the queue...")

// Log capacity-based polling control status
if sqsEventSource.SkipPollingWhenEventBusFull {
log.Info("Capacity-based polling control enabled - will check event bus capacity before each poll")
}

var eventBusFullStartTime *time.Time

for {
select {
case <-ctx.Done():
log.Info("exiting SQS event listener...")
return nil
default:
}
messages, err := fetchMessages(ctx, sqsClient, *queueURL.QueueUrl, 10, sqsEventSource.WaitTimeSeconds)

// Check event bus capacity before polling if enabled
if sqsEventSource.SkipPollingWhenEventBusFull {
// Verify we have a JetStream connection before checking capacity
if el.EventBusConn != nil && !el.EventBusConn.IsClosed() {
if _, ok := el.EventBusConn.(*eventsource.JetstreamSourceConn); ok {
if el.isEventBusFull(log) {
// Track when EventBus becomes full
if eventBusFullStartTime == nil {
now := time.Now()
eventBusFullStartTime = &now
el.Metrics.SetEventBusFull(el.GetEventSourceName(), el.GetEventName(), true)
log.Infow("EventBus is full, skipping SQS poll",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()))
}

waitSeconds := sqsEventSource.EventBusFullWaitSeconds
if waitSeconds <= 0 {
waitSeconds = 10
}
time.Sleep(time.Duration(waitSeconds) * time.Second)
continue
}

// EventBus capacity available - record duration if it was full
if eventBusFullStartTime != nil {
duration := time.Since(*eventBusFullStartTime)
el.Metrics.EventBusFullDuration(el.GetEventSourceName(), el.GetEventName(), duration.Seconds())
el.Metrics.SetEventBusFull(el.GetEventSourceName(), el.GetEventName(), false)
log.Infow("EventBus capacity available again, resuming SQS polling",
zap.String("eventSource", el.GetEventSourceName()),
zap.String("eventName", el.GetEventName()),
zap.Duration("wasFull", duration))
eventBusFullStartTime = nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logs could be replaced with a single log and prometheus metric to easily dashboard and create alerts for

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to add prometheus metric and verified in sandbox (metric)

}
}
}

batchSize := sqsEventSource.BatchSize
if batchSize <= 0 || batchSize > 10 {
batchSize = 10 // SQS maximum is 10
}

messages, err := fetchMessages(ctx, sqsClient, *queueURL.QueueUrl, batchSize, sqsEventSource.WaitTimeSeconds)
if err != nil {
log.Errorw("failed to get messages from SQS", zap.Error(err))
awsError, ok := err.(awserr.Error)
Expand Down
12 changes: 12 additions & 0 deletions examples/event-sources/aws-sqs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ spec:
# The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
# MUST BE > 0 AND <= 20
waitTimeSeconds: 20
# The number of messages to fetch per SQS ReceiveMessage call (batch size).
# Valid values: 1 to 10. Defaults to 10 if not specified.
# +optional
batchSize: 10

# CAPACITY-BASED POLLING CONTROL
# Skip polling SQS when event bus is at capacity
# When enabled, the event source will check event bus capacity before each
# poll attempt and skip polling when the event bus is full, reducing API
# calls and preventing message accumulation when downstream processing is slow.
# +optional
skipPollingWhenEventBusFull: true

# example-without-credentials:
# # If AWS access credentials are already present on the Pod's IAM role running the EventSource,
Expand Down
34 changes: 34 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Metrics struct {
eventsSentFailed *prometheus.CounterVec
eventsProcessingFailed *prometheus.CounterVec
eventProcessingDuration *prometheus.SummaryVec
eventBusFullDuration *prometheus.SummaryVec
eventBusFull *prometheus.GaugeVec
actionTriggered *prometheus.CounterVec
actionFailed *prometheus.CounterVec
actionRetriesFailed *prometheus.CounterVec
Expand Down Expand Up @@ -93,6 +95,22 @@ func NewMetrics(namespace string) *Metrics {
labelNamespace: namespace,
},
}, []string{labelEventSourceName, labelEventName}),
eventBusFullDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: prefix,
Name: "eventbus_full_duration_seconds",
Help: "Summary of durations (in seconds) that the EventBus was at capacity and SQS polling was skipped. https://argoproj.github.io/argo-events/metrics/#argo_events_eventbus_full_duration_seconds",
ConstLabels: prometheus.Labels{
labelNamespace: namespace,
},
}, []string{labelEventSourceName, labelEventName}),
eventBusFull: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prefix,
Name: "eventbus_full",
Help: "Current state of EventBus capacity: 1 if full, 0 if available. Used for alerting when EventBus is full for extended periods. https://argoproj.github.io/argo-events/metrics/#argo_events_eventbus_full",
ConstLabels: prometheus.Labels{
labelNamespace: namespace,
},
}, []string{labelEventSourceName, labelEventName}),
actionTriggered: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: prefix,
Name: "action_triggered_total",
Expand Down Expand Up @@ -134,6 +152,8 @@ func (m *Metrics) Collect(ch chan<- prometheus.Metric) {
m.eventsSentFailed.Collect(ch)
m.eventsProcessingFailed.Collect(ch)
m.eventProcessingDuration.Collect(ch)
m.eventBusFullDuration.Collect(ch)
m.eventBusFull.Collect(ch)
m.actionTriggered.Collect(ch)
m.actionFailed.Collect(ch)
m.actionRetriesFailed.Collect(ch)
Expand All @@ -146,6 +166,8 @@ func (m *Metrics) Describe(ch chan<- *prometheus.Desc) {
m.eventsSentFailed.Describe(ch)
m.eventsProcessingFailed.Describe(ch)
m.eventProcessingDuration.Describe(ch)
m.eventBusFullDuration.Describe(ch)
m.eventBusFull.Describe(ch)
m.actionTriggered.Describe(ch)
m.actionFailed.Describe(ch)
m.actionRetriesFailed.Describe(ch)
Expand Down Expand Up @@ -176,6 +198,18 @@ func (m *Metrics) EventProcessingDuration(eventSourceName, eventName string, num
m.eventProcessingDuration.WithLabelValues(eventSourceName, eventName).Observe(num)
}

func (m *Metrics) EventBusFullDuration(eventSourceName, eventName string, durationSeconds float64) {
m.eventBusFullDuration.WithLabelValues(eventSourceName, eventName).Observe(durationSeconds)
}

func (m *Metrics) SetEventBusFull(eventSourceName, eventName string, isFull bool) {
if isFull {
m.eventBusFull.WithLabelValues(eventSourceName, eventName).Set(1)
} else {
m.eventBusFull.WithLabelValues(eventSourceName, eventName).Set(0)
}
}

func (m *Metrics) ActionTriggered(sensorName, triggerName string) {
m.actionTriggered.WithLabelValues(sensorName, triggerName).Inc()
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/eventsource/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,17 @@ type SQSEventSource struct {
// SessionToken refers to K8s secret containing AWS temporary credentials(STS) session token
// +optional
SessionToken *corev1.SecretKeySelector `json:"sessionToken,omitempty" protobuf:"bytes,13,opt,name=sessionToken"`
// SkipPollingWhenEventBusFull skips polling SQS when event bus is at capacity
// +optional
SkipPollingWhenEventBusFull bool `json:"skipPollingWhenEventBusFull,omitempty" protobuf:"varint,14,opt,name=skipPollingWhenEventBusFull"`
// EventBusFullWaitSeconds specifies the duration (in seconds) to wait before checking EventBus capacity again when it is full.
// The default value is 10 seconds.
// +optional
EventBusFullWaitSeconds int64 `json:"eventBusFullWaitSeconds,omitempty" protobuf:"varint,15,opt,name=eventBusFullWaitSeconds"`
// BatchSize is the number of messages to fetch per SQS ReceiveMessage call (SQS batch size).
// Valid values: 1 to 10. Defaults to 10.
// +optional
BatchSize int64 `json:"batchSize,omitempty" protobuf:"varint,16,opt,name=batchSize"`
}

// PubSubEventSource refers to event-source for GCP PubSub related events.
Expand Down