-
Notifications
You must be signed in to change notification settings - Fork 0
AIP-9757: Skip SQS Poll when Jetstream Eventbus is full #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
29d09bd
1f536b6
eda9e83
fbb05a2
5d6d924
2a57d97
4fad7ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,10 @@ import ( | |
| sqslib "github.com/aws/aws-sdk-go/service/sqs" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/argoproj/argo-events/common" | ||
| "github.com/argoproj/argo-events/common/logging" | ||
| eventbuscommon "github.com/argoproj/argo-events/eventbus/common" | ||
| eventsource "github.com/argoproj/argo-events/eventbus/jetstream/eventsource" | ||
| eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" | ||
| awscommon "github.com/argoproj/argo-events/eventsources/common/aws" | ||
| "github.com/argoproj/argo-events/eventsources/sources" | ||
|
|
@@ -44,6 +47,9 @@ type EventListener struct { | |
| EventName string | ||
| SQSEventSource v1alpha1.SQSEventSource | ||
| Metrics *metrics.Metrics | ||
| // EventBus connection for capacity checking (set during initialization) | ||
| // Stores reference to the connection, which is automatically updated on reconnection | ||
| EventBusConn eventbuscommon.EventSourceConnection | ||
| } | ||
|
|
||
| // GetEventSourceName returns name of event source | ||
|
|
@@ -61,6 +67,64 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { | |
| return apicommon.SQSEvent | ||
| } | ||
|
|
||
| // The connection reference is automatically updated on reconnection, | ||
| // ensuring capacity checks always use the current active connection | ||
| func (el *EventListener) SetEventBusConnection(conn eventbuscommon.EventSourceConnection) { | ||
| el.EventBusConn = conn | ||
| } | ||
|
|
||
| // isEventBusFull checks if the event bus is at capacity or unavailable | ||
| func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool { | ||
| // Check if connection is available and active | ||
| if el.EventBusConn == nil || el.EventBusConn.IsClosed() { | ||
| log.Warnw("EventBus connection not available or closed, treating as unavailable to prevent message loss", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName())) | ||
| return true | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if JSContext is missing; it means the eventBus isnt using jetStream underlying; its using something else like kafka. In that case; shouldnt we fall back to legacy behavior even if
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This particular condition was intended to capture the case where the jetstream connection itself is not available. The intent was to use
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks good |
||
| } | ||
|
|
||
| jetstreamConn := el.EventBusConn.(*eventsource.JetstreamSourceConn) | ||
|
|
||
| if jetstreamConn.JSContext == nil { | ||
| log.Warnw("JetStream context not initialized, treating as unavailable to prevent message loss", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName())) | ||
| return true | ||
| } | ||
|
|
||
| // Use the current connection's JSContext (automatically updated on reconnection) | ||
| streamInfo, err := jetstreamConn.JSContext.StreamInfo(common.JetStreamStreamName) | ||
| if err != nil { | ||
| // If we can't check capacity, treat as unavailable to prevent message loss | ||
| log.Warnw("Failed to get stream info, treating as unavailable to prevent message loss", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName()), | ||
| zap.Error(err)) | ||
| return true | ||
| } | ||
|
|
||
| // Check if stream is at capacity based on MaxMsgs | ||
| if streamInfo.Config.MaxMsgs > 0 && streamInfo.State.Msgs >= uint64(streamInfo.Config.MaxMsgs) { | ||
| log.Infow("EventBus at capacity - MaxMsgs limit reached", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName()), | ||
| zap.Uint64("currentMsgs", streamInfo.State.Msgs), | ||
| zap.Int64("maxMsgs", streamInfo.Config.MaxMsgs)) | ||
| return true | ||
| } | ||
|
|
||
| // Event bus has available capacity | ||
| log.Debugw("EventBus has available capacity, proceeding with SQS poll", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName()), | ||
| zap.Uint64("currentMsgs", streamInfo.State.Msgs), | ||
| zap.Int64("maxMsgs", streamInfo.Config.MaxMsgs), | ||
| zap.Uint64("currentBytes", streamInfo.State.Bytes), | ||
| zap.Int64("maxBytes", streamInfo.Config.MaxBytes)) | ||
|
|
||
| return false | ||
| } | ||
|
|
||
| // StartListening starts listening events | ||
| func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { | ||
| log := logging.FromContext(ctx). | ||
|
|
@@ -93,14 +157,67 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt | |
| } | ||
|
|
||
| log.Info("listening for messages on the queue...") | ||
|
|
||
| // Log capacity-based polling control status | ||
| if sqsEventSource.SkipPollingWhenEventBusFull { | ||
| log.Info("Capacity-based polling control enabled - will check event bus capacity before each poll") | ||
| } | ||
|
|
||
| var eventBusFullStartTime *time.Time | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| log.Info("exiting SQS event listener...") | ||
| return nil | ||
| default: | ||
| } | ||
| messages, err := fetchMessages(ctx, sqsClient, *queueURL.QueueUrl, 10, sqsEventSource.WaitTimeSeconds) | ||
|
|
||
| // Check event bus capacity before polling if enabled | ||
| if sqsEventSource.SkipPollingWhenEventBusFull { | ||
| // Verify we have a JetStream connection before checking capacity | ||
| if el.EventBusConn != nil && !el.EventBusConn.IsClosed() { | ||
| if _, ok := el.EventBusConn.(*eventsource.JetstreamSourceConn); ok { | ||
| if el.isEventBusFull(log) { | ||
| // Track when EventBus becomes full | ||
| if eventBusFullStartTime == nil { | ||
| now := time.Now() | ||
| eventBusFullStartTime = &now | ||
| el.Metrics.SetEventBusFull(el.GetEventSourceName(), el.GetEventName(), true) | ||
| log.Infow("EventBus is full, skipping SQS poll", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName())) | ||
| } | ||
|
|
||
| waitSeconds := sqsEventSource.EventBusFullWaitSeconds | ||
| if waitSeconds <= 0 { | ||
| waitSeconds = 10 | ||
| } | ||
| time.Sleep(time.Duration(waitSeconds) * time.Second) | ||
| continue | ||
| } | ||
|
|
||
| // EventBus capacity available - record duration if it was full | ||
| if eventBusFullStartTime != nil { | ||
| duration := time.Since(*eventBusFullStartTime) | ||
| el.Metrics.EventBusFullDuration(el.GetEventSourceName(), el.GetEventName(), duration.Seconds()) | ||
| el.Metrics.SetEventBusFull(el.GetEventSourceName(), el.GetEventName(), false) | ||
| log.Infow("EventBus capacity available again, resuming SQS polling", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName()), | ||
| zap.Duration("wasFull", duration)) | ||
| eventBusFullStartTime = nil | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These logs could be replaced with a single log and prometheus metric to easily dashboard and create alerts for
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to add prometheus metric and verified in sandbox (metric) |
||
| } | ||
| } | ||
| } | ||
|
|
||
| batchSize := sqsEventSource.BatchSize | ||
| if batchSize <= 0 || batchSize > 10 { | ||
| batchSize = 10 // SQS maximum is 10 | ||
| } | ||
|
|
||
| messages, err := fetchMessages(ctx, sqsClient, *queueURL.QueueUrl, batchSize, sqsEventSource.WaitTimeSeconds) | ||
| if err != nil { | ||
| log.Errorw("failed to get messages from SQS", zap.Error(err)) | ||
| awsError, ok := err.(awserr.Error) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should
isEventBusFulllogic be outside of awssqs event source logic?what happens if RMX switch from SQS to Kafka upstream?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scope of this solution is limited to:
If RMX switches to Kafka (which I believe they don't plan to at least in the near future as they shared in our recent sync) we can revisit as a fast follow.
Scope for design defined in the design doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.
Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we should ensure that we have a process in place where we are consulted before RMX decides to update an EventSource as it would be a platform level functionality that we'd need to support.
To close the loop on this thread, I wanted to share that I also reached out to Will and he confirmed as well that there are no plans to move to Kafka:
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What guardrails do we have in place if RMX in the future changes to Kafka without notifying us (this is a human process that could break) [1].
Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?
[1] where even if we are notified, we could have forgotten that throttling only works on SQS or we have other SMEs working in the area.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Taleb, I understand your concern about RMX changing the EventSource to Kafka and potentially run into EventSource errors because the Eventbus is full. I believe we should have all the relevant details in logs (like the EventSource errors and the EventBus config that the Eventsource relies on) and can leverage that info to add an alert with additional recommendations (e.g.recommend updating the Eventbus config based on EventSource)
As an additional precaution, I will ensure to add a NOTE in our eventing documentation with these recommendations so this information can be easily gleaned. (Jira ticket to track)
As a final resort, I believe RMX can raise a support request with us and from our documentation we/SME should be able to guide them.
Do you have any other ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created this ticket (link) to add guardrail in CICD for non SQS eventsources