-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Refactor streaming agg query phase planning #20471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor streaming agg query phase planning #20471
Conversation
📝 WalkthroughWalkthroughThis PR refactors the streaming aggregation cost estimation architecture from a runtime collector-tree evaluation model to an early-stage aggregation-factory estimation model, introducing a new Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
❌ Gradle check result for bb004fb: ABORTED Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
bb004fb to
012735b
Compare
|
❌ Gradle check result for 012735b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
012735b to
13f2252
Compare
|
❌ Gradle check result for 13f2252: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
13f2252 to
ddf0f03
Compare
|
❌ Gradle check result for 889a7c0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
889a7c0 to
e03943d
Compare
…stant across segments, not per aggregator Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for b217ab9: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
…ator Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for 6d8c16c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
- segmentTopN for term agg Signed-off-by: bowenlan-amzn <[email protected]>
- rest layer filter streaming Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for 597b0cc: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java (1)
125-137: MakesegmentTopNrequest-scoped to avoid cross-search leakage.
segmentTopNis static and set duringestimateStreamingCost, then reused by supplier builds. Concurrent searches with differentshard_size/required_sizecan overwrite it, leading to wrong top-N selection (potentially missing buckets) or inconsistent performance. Compute it per request instead of storing globally.🐛 Suggested fix (request-scoped segmentTopN)
- if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) { + if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) { + int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, context); return createStreamStringTermsAggregator( name, factories, valuesSource, order, format, bucketCountThresholds, context, parent, showTermDocCountError, - segmentTopN, + segmentTopN, metadata ); }- if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) { + if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) { + int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, context); return createStreamNumericTermsAggregator( name, factories, numericValuesSource, format, order, bucketCountThresholds, context, parent, longFilter, includeExclude, showTermDocCountError, cardinality, - segmentTopN, + segmentTopN, metadata ); }- private int computeSegTopN(BucketCountThresholds bucketCountThresholds, BucketOrder order) { + private static int computeSegTopN(BucketCountThresholds bucketCountThresholds, BucketOrder order, SearchContext context) { int effectiveShardSize = bucketCountThresholds.getShardSize(); if (InternalOrder.isKeyOrder(order) == false && effectiveShardSize == TermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) { effectiveShardSize = BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()); } // Ensure shardSize is valid (at minimum, use requiredSize) if (effectiveShardSize < bucketCountThresholds.getRequiredSize()) { effectiveShardSize = bucketCountThresholds.getRequiredSize(); } - int minSegmentSize = queryShardContext.getIndexSettings().getStreamingAggregationMinShardSize(); + int minSegmentSize = context.indexShard().indexSettings().getStreamingAggregationMinShardSize(); return Math.max(minSegmentSize, effectiveShardSize); } - private static int segmentTopN; - `@Override` public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) { ValuesSource valuesSource = config.getValuesSource(); - segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order); + int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, searchContext); // Reject numeric aggregators with key-based ordering if (InternalOrder.isKeyOrder(order) && valuesSource instanceof ValuesSource.Numeric) { return StreamingCostMetrics.nonStreamable(); } if (valuesSource instanceof WithOrdinals || valuesSource instanceof ValuesSource.Numeric) { return new StreamingCostMetrics(true, segmentTopN); } return StreamingCostMetrics.nonStreamable(); }Also applies to: 235-250, 600-681, 685-719
🤖 Fix all issues with AI agents
In `@server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java`:
- Around line 137-144: isSubAggregationStreamable currently only checks the
immediate aggregation type and misses nested aggs under a second-level
TermsAggregationBuilder; update isSubAggregationStreamable to, when agg is a
TermsAggregationBuilder, iterate its sub-aggregations and recursively validate
each sub-aggregation (e.g., by calling isSubAggregationStreamable on each child)
so that nested unsupported aggregations (like terms { terms { unsupported_agg
}}) are rejected; ensure the recursion handles only necessary depth (or full
recursion) and uses the
TermsAggregationBuilder.getSubAggregations()/sub-aggregation accessor to locate
children.
In
`@server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java`:
- Around line 405-414: createAggregatorFactory returns a FactoryAndContext that
holds a SearchContext which isn't closed, causing resource leaks; make
FactoryAndContext implement AutoCloseable and add a close() that closes the
contained SearchContext (call searchContext.close()), then update tests to use
try-with-resources (try (FactoryAndContext fac = createAggregatorFactory(...)) {
... }) so the SearchContext is always closed; alternatively, if you prefer
test-suite level cleanup, collect returned SearchContext instances and close
them in an `@After` method, but the preferred change is to implement AutoCloseable
on FactoryAndContext and switch tests to try-with-resources.
🧹 Nitpick comments (5)
CHANGELOG.md (1)
36-36: LGTM! Changelog entry is correctly formatted and placed.The entry follows the established changelog format and is appropriately placed in the "Changed" section for a refactoring effort. The description matches the PR title and the link is correct.
💡 Optional: Consider a more detailed description
The AI summary indicates this is a significant architectural refactor (moving from runtime collector-tree evaluation to aggregation-factory estimation, introducing
StreamingCostEstimableinterface, removingStreamableinterface andAggregatorTreeEvaluator). If you want to provide more context for future readers, you could expand the description slightly:-- Refactor streaming agg query phase planning ([`#20471`](https://github.com/opensearch-project/OpenSearch/pull/20471)) +- Refactor streaming agg query phase planning to use factory-based cost estimation ([`#20471`](https://github.com/opensearch-project/OpenSearch/pull/20471))However, the current concise format is consistent with other entries in this changelog, so this is purely optional.
server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java (1)
118-121: Consider removing redundant override.This
collectDebugInfooverride only callssuper.collectDebugInfo(add)without adding any streaming-specific debug information. Since the method simply delegates to the parent class, it can be removed entirely.♻️ Suggested removal
- `@Override` - public void collectDebugInfo(BiConsumer<String, Object> add) { - super.collectDebugInfo(add); - }server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (1)
21-24: Consider usinglongfortopNSizeto align withFlushModeResolversettings.The
topNSizefield is declared asint, butFlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNTis alongsetting with default 100,000. While current values fit comfortably in anint, larger bucket counts could overflow in nested aggregation scenarios (e.g., multiplying two large topNSize values exceedsInteger.MAX_VALUE). Usinglongwould provide consistency with the settings API and improve future-proofing.server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java (1)
320-327: Consider using stream API for consistency.The loop to detect non-streamable sub-factories could be simplified using streams, aligning with patterns used elsewhere in the codebase.
Suggested refactor
- boolean hasNonStreamable = false; - for (AggregatorFactory subFactory : subFactoryArray) { - if (!(subFactory instanceof StreamingCostEstimable)) { - hasNonStreamable = true; - break; - } - } - assertTrue("Should have non-streamable sub-factory (TopHitsAggregatorFactory)", hasNonStreamable); + boolean hasNonStreamable = Arrays.stream(subFactoryArray) + .anyMatch(f -> !(f instanceof StreamingCostEstimable)); + assertTrue("Should have non-streamable sub-factory (TopHitsAggregatorFactory)", hasNonStreamable);server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java (1)
359-370: Assert-only guard for non-empty factories array.The
assert factories.length > 0on line 360 protects against empty arrays, but asserts are disabled in production. If somehow called with an empty array (despite the caller's guard at line 315),combinedwould remainnulland be returned.The current code is safe because the caller explicitly handles
factories.length == 0before calling this method (lines 315-316). Consider adding a brief comment noting this invariant for future maintainers.Optional: Add clarifying comment
private static StreamingCostMetrics estimateStreamingCostFromFactories(AggregatorFactory[] factories, SearchContext searchContext) { + // Caller (createTopLevelAggregators) handles empty case separately assert factories.length > 0 : "factories array must be non-empty";
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (34)
CHANGELOG.mdplugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.javaserver/src/main/java/org/opensearch/rest/action/search/RestSearchAction.javaserver/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.javaserver/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.javaserver/src/main/java/org/opensearch/search/aggregations/AggregatorTreeEvaluator.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.javaserver/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.javaserver/src/main/java/org/opensearch/search/streaming/FlushModeResolver.javaserver/src/main/java/org/opensearch/search/streaming/Streamable.javaserver/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.javaserver/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.javaserver/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.javaserver/src/test/java/org/opensearch/search/aggregations/AggregatorTreeEvaluatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.javaserver/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregatorTests.javaserver/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.javaserver/src/test/java/org/opensearch/search/streaming/StreamingCostMetricsTests.javatest/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java
💤 Files with no reviewable changes (8)
- server/src/main/java/org/opensearch/search/streaming/Streamable.java
- server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java
- server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java
- server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java
- server/src/main/java/org/opensearch/search/aggregations/AggregatorTreeEvaluator.java
- server/src/test/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregatorTests.java
- server/src/test/java/org/opensearch/search/aggregations/AggregatorTreeEvaluatorTests.java
- server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-02T19:23:29.698Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: The gRPC search API in OpenSearch is marked as "experimental" in official documentation, so changes to proto schemas that remove previously unsupported fields (those throwing UnsupportedOperationException) are not considered breaking changes.
Applied to files:
CHANGELOG.mdserver/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.
Applied to files:
server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.javaserver/src/main/java/org/opensearch/rest/action/search/RestSearchAction.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.javaserver/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.javaserver/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.javaserver/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.javaserver/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.javaserver/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
🧬 Code graph analysis (3)
server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java (1)
server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java (1)
NumberFieldMapper(102-2185)
server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java (1)
server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java (1)
RestSearchAction(90-468)
server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (1)
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java (1)
ExperimentalApi(29-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: Analyze (java)
🔇 Additional comments (35)
server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java (1)
74-74: LGTM! Removal ofStreamableinterface aligns with the factory-driven streaming model.The class now only implements
StarTreePreComputeCollector, consistent with the PR's architectural shift to handle streaming cost estimation at the aggregator factory level.MinAggregatorFactorycorrectly implementsStreamingCostEstimable, and no remainingStreamableimplementations exist in the metrics aggregators.plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java (2)
495-504: LGTM!The comment update accurately reflects the refactored behavior where factory-level estimation determines streaming eligibility. The test correctly validates that aggregation results remain accurate regardless of whether streaming is used.
522-587: Well-structured test with proper resource management.The test correctly:
- Documents why the setting adjustment is needed (combined topN from terms shardSize * 2^precision).
- Uses try-finally to ensure settings are restored even if assertions fail.
- Validates both profile structure (StreamCardinalityAggregator presence) and result correctness (cardinality values).
- Uses low precision threshold intentionally to keep the test within bounds.
server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java (1)
71-77: The concern about double invocation ofpreCollection()is not valid.
preCollection()is called once increateCollector()(line 76) and no caller or wrapper invokes it again. The returned collector is passed directly tosearcher.search()andsearchLeaf(), which do not callpreCollection(). SubsequentpostCollection()processing inBucketCollectorProcessoronly invokespostCollection(), neverpreCollection(). The refactor correctly consolidates the initialization into the collector creation method with no lifecycle issues.test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java (1)
336-338: LGTM!The explicit mock setup for
getFlushMode()returningFlushMode.PER_SEGMENTis appropriate for test infrastructure. This correctly bypasses the cost-estimation decision logic in the factory, ensuring tests can directly exercise the streaming aggregator path without depending on cost-based decisions.server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java (1)
75-75: LGTM!The removal of
Streamablefrom the class declaration aligns with the architectural refactor that shifts streaming cost estimation from per-aggregator interfaces to factory-levelStreamingCostEstimable. The aggregator retains its core functionality while streaming decisions are now made at the factory level.server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java (1)
51-51: LGTM!The removal of
StreamablefromProfilingAggregatoris consistent with the broader refactor. As a decorator wrapping other aggregators, it correctly delegates to the underlying aggregator for all core operations. Streaming decisions are now handled at the factory level, so the profiling wrapper no longer needs to participate in per-collector streaming metrics.server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java (1)
14-24: LGTM!Clean interface design following the single-responsibility principle. The
@ExperimentalApiannotation appropriately marks this as experimental, and the Javadoc clearly describes the contract for implementors. TheSearchContextparameter provides the necessary context for cost estimation based on field metadata and cardinality.server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java (3)
28-42: LGTM!The removal of
Streamablefrom the class declaration aligns with the refactor. The constructor properly delegates to the parentCardinalityAggregatorwith all necessary parameters.
44-79: LGTM!The
getLeafCollectorimplementation correctly manages the streaming collector lifecycle:
- Cleans up the previous collector before creating a new one
- Handles null values source appropriately
- Validates that only ordinal value sources are supported for streaming
- Creates the appropriate collector type based on ordinal count
81-95: LGTM!The
doResetimplementation correctly handles batch transitions by:
- Calling super to reset parent state
- Closing the stream collector
- Recreating
HyperLogLogPlusPluscounts (necessary since HLL lacks a public reset method)server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (3)
35-46: LGTM!The
neutral()factory method is well-designed for metric aggregations (max, min, avg, etc.) that produce single-value outputs. AtopNSizeof 1 correctly represents that these aggregations don't contribute to bucket proliferation while remaining compatible with streaming.
58-70: LGTM!The
combineWithSubAggregationcorrectly models nested aggregation cost with multiplicative effects. UsingMath.multiplyExactfor overflow detection and falling back tononStreamable()is a safe approach that prevents invalid metric propagation.
82-94: LGTM!The
combineWithSiblingcorrectly models parallel aggregation cost with additive effects. UsingMath.addExactfor overflow detection mirrors the approach incombineWithSubAggregationfor consistent handling.server/src/test/java/org/opensearch/search/streaming/StreamingCostMetricsTests.java (3)
15-31: LGTM! Tests cover the simplified constructor and neutral factory method.The tests properly verify the new 2-parameter constructor and the
neutral()factory method. The assertions correctly validate bothstreamable()andtopNSize()return values.
33-50: LGTM! Overflow handling tests are well-designed.The combination tests correctly verify multiplication semantics for sub-aggregations (100 * 50 = 5000) and that integer overflow correctly triggers a non-streamable result.
52-78: LGTM! Sibling combination and non-streamable propagation tests are correct.Tests properly verify addition semantics for siblings (100 + 200 = 300), overflow detection, and that combining with a non-streamable metric correctly propagates the non-streamable state.
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java (3)
85-94: LGTM! Decision logic is clean and straightforward.The
decideFlushModemethod correctly short-circuits on non-streamable metrics and uses a simple threshold check for bucket count.
108-120: LGTM! Eligibility check properly validates top-level aggregations.The null/empty check and iteration over top-level aggregations is correct.
48-75: Settings are not applied in thedecideFlushModedecision logic.
STREAMING_MIN_CARDINALITY_RATIOandSTREAMING_MIN_ESTIMATED_BUCKET_COUNTare defined and exposed throughDefaultSearchContextgetters, but they are not currently used within thedecideFlushModemethod. The method only considersmaxBucketCountin its decision criteria. If these thresholds are intended to gate streaming decisions (as their documentation suggests), they should be integrated into the decision logic.server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java (2)
147-158: LGTM! Streaming cost estimation correctly limits to ordinal-based sources.The estimation logic properly restricts streaming support to
Bytes.WithOrdinalsvalue sources, and the HLL register count calculation (1 << precision()) is safe within int bounds for the expected precision range.
113-119: LGTM! FlushMode checks are appropriately tightened.Both
createUnmappedanddoCreateInternalnow explicitly requireFlushMode.PER_SEGMENTrather than allowing null FlushMode, reducing ambiguity in the streaming path.Also applies to: 139-145
server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java (1)
57-57: LGTM! Neutral streaming cost is appropriate for sum aggregation.Sum aggregation produces a single value per bucket without additional overhead, so returning
neutral()correctly represents zero additional streaming cost.Also applies to: 106-109
server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java (1)
462-467: LGTM! Streaming eligibility check is properly delegated.The refactored
canUseStreamSearchcorrectly handles the null source case and delegates aggregation validation toFlushModeResolver.isEligibleForStreaming, improving separation of concerns.server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java (1)
56-56: LGTM! Neutral streaming cost is appropriate for value_count aggregation.Value count aggregation produces a single count per bucket without additional overhead, so returning
neutral()correctly represents zero additional streaming cost.Also applies to: 99-103
server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java (1)
45-46: LGTM!The implementation of
StreamingCostEstimableis clean and appropriate. ReturningStreamingCostMetrics.neutral()is correct for metric aggregations likeavgsince they don't create additional buckets and are inherently streamable.Also applies to: 57-57, 106-109
server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java (1)
45-46: LGTM!Consistent implementation with other metric aggregator factories. Returning neutral metrics is appropriate for
maxaggregation.Also applies to: 57-57, 106-109
server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java (1)
45-46: LGTM!Consistent implementation with other metric aggregator factories (
AvgAggregatorFactory,MaxAggregatorFactory). The neutral metrics approach is appropriate forminaggregation.Also applies to: 57-57, 106-109
server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java (1)
25-64: LGTM!Excellent test coverage for
FlushModeResolver.decideFlushModelogic including:
- Non-streamable metrics
- Streamable metrics with various topN values
- Boundary condition (topN exactly at max)
- Neutral metrics behavior
The tests are well-documented with inline comments explaining the expected behavior.
server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java (2)
313-330: Well-structured streaming decision logic.The streaming cost estimation workflow is properly integrated:
- Correctly guards with
isStreamSearch()andflushMode == nullcheck- Handles the edge case of empty factories by defaulting to
PER_SHARD- Uses
setFlushModeIfAbsentto avoid overwriting any previously set mode- Good debug logging for observability
383-404: LGTM!The
estimateFromFactorymethod cleanly handles the recursive cost estimation:
- Returns early for non-
StreamingCostEstimablefactories- Properly propagates non-streamable status from both factory and sub-factories
- Uses appropriate combination methods (
combineWithSubAggregationvscombineWithSibling)server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java (1)
146-218: Good expanded coverage for streaming eligibility.The new tests cover multiple-terms top-level and supported/unsupported sub-agg combinations, which should guard the updated eligibility logic well.
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java (1)
48-82: segmentTopN wiring looks consistent.Constructor injection and top-bucket selection are aligned.
Also applies to: 242-243
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java (1)
55-74: StreamingCostEstimable integration looks clean.The interface adoption and related imports fit well with the new estimation flow.
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java (1)
52-83: segmentTopN usage is consistent in numeric streaming flow.Constructor injection and selection usage line up cleanly.
Also applies to: 170-174
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
Show resolved
Hide resolved
...er/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java
Show resolved
Hide resolved
|
❌ Gradle check result for 839df89: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
…w, if not supported, will remove Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for 43fddbc: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for 213a6ab: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
|
❌ Gradle check result for ab931dd: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
60c253d to
6ddb9e1
Compare
Signed-off-by: bowenlan-amzn <[email protected]>
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for dbe69b6: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: bowenlan-amzn <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20471 +/- ##
============================================
+ Coverage 73.19% 73.35% +0.15%
- Complexity 71975 72112 +137
============================================
Files 5796 5795 -1
Lines 329539 329462 -77
Branches 47465 47458 -7
============================================
+ Hits 241220 241675 +455
+ Misses 69005 68452 -553
- Partials 19314 19335 +21 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
Streaming aggregation feature supports an agg defined with
If multiple aggs fit above constraints, it should also be good.
Segment streaming topN heuristic set to
shard_sizeAlso user has a index level setting to provide the segment topN size.
The final topN is the larger one of these 2
The topN is compared with
search.aggregations.streaming.max_estimated_bucket_countto decide whether streaming should be used or not. If exceed, streaming is considered to stream back too many buckets per segment so we will fall back to default aggregation path which send response per shard.Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.