Fix race condition in discovery manager causing duplicate aggregations#3471
Fix race condition in discovery manager causing duplicate aggregations#3471
Conversation
Use singleflight to ensure only one capability aggregation happens per cache key, even when multiple concurrent requests arrive. This prevents the recursive-looking duplicate 'Starting capability aggregation' logs.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3471 +/- ##
==========================================
- Coverage 65.28% 65.26% -0.03%
==========================================
Files 399 399
Lines 38990 38997 +7
==========================================
- Hits 25456 25451 -5
- Misses 11564 11573 +9
- Partials 1970 1973 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Retested this. The code in #3440 works fine with this change in. It does this without it |
#3471 Signed-off-by: nigel brown <nigel@stacklok.com>
jerm-dro
left a comment
There was a problem hiding this comment.
Thanks for splitting this out and the clear PR description. I see how we would need to fix this for the optimizer where startup is slower and more likely to cause repeated aggregation.
| // Check cache first (with read lock) | ||
| if caps := m.getCachedCapabilities(cacheKey); caps != nil { | ||
| logger.Debugf("Cache hit for user %s (key: %s)", identity.Subject, cacheKey) | ||
| return caps, nil | ||
| } |
There was a problem hiding this comment.
Blocker: Based on my reading of singleflight source code, singleflight actually implements a cache as well. If the function has been called for the cache key, it returns whatever results was originally produced.
Unfortunately, it's missing one important bit of functionality: expires behavior. Without this, the cache is unbounded in size and can cause OOMs for long running vMCPs. I don't see a good way to implement this on top of singleflight unfortunately, since the only delete API is Forget.
To fix this, I'd recommend:
Create a generic, time-limited cache:
func NewCacheWithTTL[V any](ttl time.Duration) Cache[V] {...}
type Cache[V any] interface {
Get(key string, loader func() (V, error)) (V, error)
}I haven't thought too deeply about the implementation, so I'll leave that up to you. Factoring it out like this is nice for a few reasons:
- This cache would easily be reusable in different circumstances.
- We can thoroughly unit test the cache's behavior. The current implementation that is coupled to capability aggregation is hard to unit test.
Summary
This PR fixes a race condition in the discovery manager that causes duplicate capability aggregations when multiple concurrent requests arrive simultaneously at startup.
Problem
Without this fix, when multiple requests arrive concurrently:
This is a distinct issue from the race condition fixed in #3450.
Solution
Uses
singleflight.Groupto deduplicate concurrent capability aggregation requests:Changes
singleflight.Groupfield toDefaultManagerDiscover()method to wrap aggregation insingleFlight.Do()Testing
Existing tests pass. The race condition is most visible under high concurrency at startup when multiple clients connect simultaneously.