AIP-9757: Skip SQS Poll when Jetstream Eventbus is full#11
Conversation
eventsources/sources/awssqs/start.go
Outdated
| zap.String("eventName", el.GetEventName())) | ||
| } | ||
|
|
||
| time.Sleep(10 * time.Second) // Wait before checking capacity again |
There was a problem hiding this comment.
can this be a configurable setting with a default?
There was a problem hiding this comment.
I did consider making this setting configurable, however I decided other wise in favor of the following:
- Simplicity
- Not expose too much control over this field to customers as we might need to consider adding guardrails: e.g. an allowable upper limit
- Consistent with other places where it is hardcoded - example here
eventsources/sources/awssqs/start.go
Outdated
| return true | ||
| } | ||
|
|
||
| streamInfo, err := el.JSContext.StreamInfo("default") |
There was a problem hiding this comment.
is there a name to use? could it the stream be named other than default ?
like here
There was a problem hiding this comment.
common.JetStreamStreamName gets evaluated to default defined here:
Line 91 in 111404f
However, it would be better to use the constant instead of hardcoding it. Thanks for catching it!
| } | ||
|
|
||
| // isEventBusFull checks if the event bus is at capacity or unavailable | ||
| func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool { |
There was a problem hiding this comment.
Should isEventBusFull logic be outside of awssqs event source logic?
what happens if RMX switch from SQS to Kafka upstream?
There was a problem hiding this comment.
The scope of this solution is limited to:
- SQS EventSource
- Jetstream Eventbus
If RMX switches to Kafka (which I believe they don't plan to at least in the near future as they shared in our recent sync) we can revisit as a fast follow.
Scope for design defined in the design doc
There was a problem hiding this comment.
I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.
Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.
There was a problem hiding this comment.
I agree we should ensure that we have a process in place where we are consulted before RMX decides to update an EventSource as it would be a platform level functionality that we'd need to support.
To close the loop on this thread, I wanted to share that I also reached out to Will and he confirmed as well that there are no plans to move to Kafka:
Between the python migration efforts, some data contract workstreams within rich media, and the ongoing science efforts for UWT and UFO, we aren't currently prioritizing that investigation or making those changes
There was a problem hiding this comment.
What guardrails do we have in place if RMX in the future changes to Kafka without notifying us (this is a human process that could break) [1].
Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?
[1] where even if we are notified, we could have forgotten that throttling only works on SQS or we have other SMEs working in the area.
There was a problem hiding this comment.
Would the EventSource raise errors and alerts because it is trying to connect to the EventBus and receives an error because it is full, with the DiscardNew setting?
Hey Taleb, I understand your concern about RMX changing the EventSource to Kafka and potentially run into EventSource errors because the Eventbus is full. I believe we should have all the relevant details in logs (like the EventSource errors and the EventBus config that the Eventsource relies on) and can leverage that info to add an alert with additional recommendations (e.g.recommend updating the Eventbus config based on EventSource)
As an additional precaution, I will ensure to add a NOTE in our eventing documentation with these recommendations so this information can be easily gleaned. (Jira ticket to track)
As a final resort, I believe RMX can raise a support request with us and from our documentation we/SME should be able to guide them.
Do you have any other ideas?
There was a problem hiding this comment.
Created this ticket (link) to add guardrail in CICD for non SQS eventsources
eventsources/eventing.go
Outdated
| continue | ||
| } | ||
|
|
||
| // Set JetStream context for SQS event sources to enable capacity checking |
There was a problem hiding this comment.
Why set only for SQS?
What happens of there is no sqs event listener?
There was a problem hiding this comment.
Currently the scope of this solution is limited to SQS EventSource.
So for any EventSource other than SQS, it would be a No-op.
In the future, If we want to expand this solution to allow additional eventsources (webhook, Kafka, etc) we can check for other sources and set context as needed.
There was a problem hiding this comment.
Is it possible to architect this pattern around all sources?
What happens if another upstream source is selected without our knowledge?
There was a problem hiding this comment.
I agree we should abstract this out to account for all sources once we have at least 2-3 use cases which is why I view this as a v2/fast follow.
Currently maintaining this feature specifically for SQS and Jetstream helps avoid complexity.
What happens if another upstream source is selected without our knowledge?
The other sources will continue to behave as they do today as we ensure backward compatibility is maintained. We can add an additional check to set context only when skipPollingWhenEventBusFull is enabled for an additional layer of security however setting the context itself won't change any behavior.
| log.Warnw("JetStream context not available, treating as unavailable to prevent message loss", | ||
| zap.String("eventSource", el.GetEventSourceName()), | ||
| zap.String("eventName", el.GetEventName())) | ||
| return true |
There was a problem hiding this comment.
if JSContext is missing; it means the eventBus isnt using jetStream underlying; its using something else like kafka. In that case; shouldnt we fall back to legacy behavior even if skipPollingWhenEventBusFull is set to true. by legacy behavior i mean, no blocking at all. Currently; it seems if JSContext is nil (assuming eventBus is kafka) and skipPollingWhenEventBusFull is enabled; the eventSource will permanently block.
There was a problem hiding this comment.
This particular condition was intended to capture the case where the jetstream connection itself is not available. The intent was to use SkipPollingWhenEventBusFull only for Jetstream eventbus. While we have recommended this in the docs, I see how it will be helpful to add additional defensive checks to only call isEventBusFull for a jetstream eventbus and fallback to legacy behavior for others like Kafka as you pointed. Let me add this check before calling isEventBusFull.
| } | ||
|
|
||
| // isEventBusFull checks if the event bus is at capacity or unavailable | ||
| func (el *EventListener) isEventBusFull(log *zap.SugaredLogger) bool { |
There was a problem hiding this comment.
I want to warn that pausing kakfa upstream is more complicated than pausing polling sqs; because of the multiple consumers involved in a single consumer group reading from multiple partitions. having a generic solution would involve creating an interface with methods like capacity checker.
Also; i am pretty sure RMX wouldnt switch to kafka without letting us know and last time we checked it wasnt in their road map for medium term.
… Rely on eventbus connection to account for getting accurate eventbus capacity to account for reconnects
talebzeghmi
left a comment
There was a problem hiding this comment.
Thank you for doing this and addressing feedback!
eventsources/eventing.go
Outdated
| logger.Debugw("Set EventBus connection for SQS event source", | ||
| zap.String("eventSource", server.GetEventSourceName()), | ||
| zap.String("eventName", server.GetEventName())) | ||
| } |
There was a problem hiding this comment.
style: the tabbing of this code is off.
There was a problem hiding this comment.
Fixed it thanks for catching!
| zap.String("eventName", el.GetEventName()), | ||
| zap.Duration("wasFull", time.Duration(consecutiveFullChecks*10)*time.Second)) | ||
| consecutiveFullChecks = 0 | ||
| } |
There was a problem hiding this comment.
These logs could be replaced with a single log and prometheus metric to easily dashboard and create alerts for
There was a problem hiding this comment.
Updated to add prometheus metric and verified in sandbox (metric)
eventsources/sources/awssqs/start.go
Outdated
| zap.String("eventName", el.GetEventName())) | ||
| } | ||
|
|
||
| time.Sleep(10 * time.Second) // Wait before checking capacity again |
There was a problem hiding this comment.
This could be configurable, as some downstream sensor may need sleep in the order of milliseconds or 100ms..
Not in our use case yet, but if it's easy to be configurable that'd be helpful
There was a problem hiding this comment.
Updated to be configurable
…when-jetstream-eventbus-full AIP-9757: Skip SQS Poll when Jetstream Eventbus is full
Description
Implements capacity-based polling control for AWS SQS EventSource with backpressure handling to prevent message loss when the JetStream event bus is at capacity.
Added optional
skipPollingWhenEventBusFullconfiguration that checks event bus capacity before each SQS poll. When enabled:Configuration
Backward Compatibility
✅ Fully backward compatible
Test Results