Skip to content

Conversation

@bowenlan-amzn
Copy link
Member

@bowenlan-amzn bowenlan-amzn commented Jan 23, 2026

Description

Streaming aggregation feature supports an agg defined with

  • Top level should be string terms agg and numeric terms agg
  • Second level/sub aggregation could be numeric terms, cardinality, max, min, sum (we should add avg and value_count soon)
    If multiple aggs fit above constraints, it should also be good.

Segment streaming topN heuristic set to shard_size
Also user has a index level setting to provide the segment topN size.
The final topN is the larger one of these 2

The topN is compared with search.aggregations.streaming.max_estimated_bucket_count to decide whether streaming should be used or not. If exceed, streaming is considered to stream back too many buckets per segment so we will fall back to default aggregation path which send response per shard.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 23, 2026

📝 Walkthrough

Walkthrough

This PR refactors the streaming aggregation cost estimation architecture from a runtime collector-tree evaluation model to an early-stage aggregation-factory estimation model, introducing a new StreamingCostEstimable interface while removing the Streamable interface and eliminating the AggregatorTreeEvaluator class that previously managed streaming decisions.

Changes

Cohort / File(s) Summary
Streaming Architecture Refactoring
CHANGELOG.md, server/src/main/java/org/opensearch/search/streaming/Streamable.java, server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java, server/src/main/java/org/opensearch/search/aggregations/AggregatorTreeEvaluator.java, server/src/test/java/org/opensearch/search/aggregations/AggregatorTreeEvaluatorTests.java
Removes Streamable interface and replaces with new StreamingCostEstimable interface; eliminates AggregatorTreeEvaluator that previously evaluated streaming decisions at runtime; shifts cost estimation to factory-level before aggregator creation.
FlushModeResolver Refactoring
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
Major refactoring from collector-tree analysis to aggregation-builder focused approach; replaces resolve(Collector...) with isEligibleForStreaming(AggregatorFactories.Builder) and adds new settings for streaming thresholds; introduces per-aggregation type streaming validation.
StreamingCostMetrics Simplification
server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java, server/src/test/java/org/opensearch/search/streaming/StreamingCostMetricsTests.java, server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java
Reduces record fields from five to two (streamable, topNSize); adds neutral() factory method; updates combination logic to work only with topNSize; refactors tests from integration-style to isolated decision logic.
Terms Aggregator Factory Streaming
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
Implements StreamingCostEstimable; adds computeSegTopN() helper; requires FlushMode.PER_SEGMENT explicitly for streaming (previously allowed null); propagates segmentTopN parameter through streaming aggregator creation paths.
Terms Streaming Aggregators
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java, server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
Removes Streamable implementation; introduces new segmentTopN parameter; replaces internal segmentSize logic with segmentTopN for top-bucket selection; removes streaming cost metrics methods.
Cardinality Aggregator Streaming
server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java, server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java
Factory now implements StreamingCostEstimable with ordinals-aware estimation; aggregator removes Streamable and adds explicit lifecycle methods (getLeafCollector, doReset, doPostCollection, doClose).
Metric Aggregator Factories (Streamable Removal & StreamingCostEstimable Addition)
server/src/main/java/org/opensearch/search/aggregations/metrics/{AvgAggregatorFactory,MaxAggregatorFactory,MinAggregatorFactory,SumAggregatorFactory,ValueCountAggregatorFactory}.java
Each now implements StreamingCostEstimable returning neutral cost metrics; removes responsibility for runtime streaming cost estimation.
Metric Aggregators (Streamable Removal)
server/src/main/java/org/opensearch/search/aggregations/metrics/{MaxAggregator,MinAggregator}.java
Removes Streamable interface implementation and getStreamingCostMetrics() method from both classes.
Aggregator Factories & Collection Management
server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java, server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java
AggregatorFactories adds streaming cost estimation workflow for top-level aggregators using new factory-level cost analysis; AggregationCollectorManager removes dynamic evaluation/recreation step in createCollector.
Profiling & Search Integration
server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java, server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
Removes Streamable implementation from ProfilingAggregator; simplifies canUseStreamSearch to delegate to FlushModeResolver.isEligibleForStreaming() instead of checking specific aggregation types.
Test Coverage Updates
server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java, server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java, server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java, server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java, server/src/test/java/org/opensearch/search/aggregations/metrics/{MaxAggregatorTests,MinAggregatorTests,StreamCardinalityAggregatorTests}.java
Removes old streaming cost metrics tests; adds comprehensive new factory-level streaming cost estimation tests; updates integration tests to use profile-based validation instead of assertions on Streamable interface.
Integration Tests & Test Framework
plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java, test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java
Integration tests refactored to use profile inspection for streaming validation with dynamic settings adjustments; test framework explicitly sets FlushMode.PER_SEGMENT in streaming aggregator creation paths.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

enhancement, Search, Search:Aggregations, Search:Performance

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.12% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Refactor streaming agg query phase planning' directly and concisely describes the main change—a refactoring of streaming aggregation query phase planning logic.
Description check ✅ Passed The pull request description includes all required sections: a clear description of the feature (streaming aggregation constraints), related issues placeholder, and a completed checklist with testing and documentation confirmations.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

❌ Gradle check result for bb004fb: ABORTED

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the streaming-agg-planning-refactor branch from bb004fb to 012735b Compare January 24, 2026 18:03
@github-actions
Copy link
Contributor

❌ Gradle check result for 012735b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the streaming-agg-planning-refactor branch from 012735b to 13f2252 Compare January 24, 2026 19:35
@github-actions
Copy link
Contributor

❌ Gradle check result for 13f2252: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the streaming-agg-planning-refactor branch from 13f2252 to ddf0f03 Compare January 27, 2026 19:05
@github-actions
Copy link
Contributor

❌ Gradle check result for 889a7c0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the streaming-agg-planning-refactor branch from 889a7c0 to e03943d Compare January 27, 2026 23:17
…stant across segments, not per aggregator

Signed-off-by: bowenlan-amzn <[email protected]>
@github-actions
Copy link
Contributor

❌ Gradle check result for b217ab9: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 6d8c16c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

- segmentTopN for term agg

Signed-off-by: bowenlan-amzn <[email protected]>
- rest layer filter streaming

Signed-off-by: bowenlan-amzn <[email protected]>
@github-actions
Copy link
Contributor

❌ Gradle check result for 597b0cc: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
@bowenlan-amzn bowenlan-amzn marked this pull request as ready for review January 28, 2026 16:12
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java (1)

125-137: Make segmentTopN request-scoped to avoid cross-search leakage.

segmentTopN is static and set during estimateStreamingCost, then reused by supplier builds. Concurrent searches with different shard_size/required_size can overwrite it, leading to wrong top-N selection (potentially missing buckets) or inconsistent performance. Compute it per request instead of storing globally.

🐛 Suggested fix (request-scoped segmentTopN)
-                    if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) {
+                    if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) {
+                        int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, context);
                         return createStreamStringTermsAggregator(
                             name,
                             factories,
                             valuesSource,
                             order,
                             format,
                             bucketCountThresholds,
                             context,
                             parent,
                             showTermDocCountError,
-                            segmentTopN,
+                            segmentTopN,
                             metadata
                         );
                     }
-                if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) {
+                if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) {
+                    int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, context);
                     return createStreamNumericTermsAggregator(
                         name,
                         factories,
                         numericValuesSource,
                         format,
                         order,
                         bucketCountThresholds,
                         context,
                         parent,
                         longFilter,
                         includeExclude,
                         showTermDocCountError,
                         cardinality,
-                        segmentTopN,
+                        segmentTopN,
                         metadata
                     );
                 }
-    private int computeSegTopN(BucketCountThresholds bucketCountThresholds, BucketOrder order) {
+    private static int computeSegTopN(BucketCountThresholds bucketCountThresholds, BucketOrder order, SearchContext context) {
         int effectiveShardSize = bucketCountThresholds.getShardSize();
         if (InternalOrder.isKeyOrder(order) == false
             && effectiveShardSize == TermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
             effectiveShardSize = BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize());
         }
         // Ensure shardSize is valid (at minimum, use requiredSize)
         if (effectiveShardSize < bucketCountThresholds.getRequiredSize()) {
             effectiveShardSize = bucketCountThresholds.getRequiredSize();
         }
 
-        int minSegmentSize = queryShardContext.getIndexSettings().getStreamingAggregationMinShardSize();
+        int minSegmentSize = context.indexShard().indexSettings().getStreamingAggregationMinShardSize();
         return Math.max(minSegmentSize, effectiveShardSize);
     }
 
-    private static int segmentTopN;
-
     `@Override`
     public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) {
         ValuesSource valuesSource = config.getValuesSource();
-        segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order);
+        int segmentTopN = 2 * computeSegTopN(bucketCountThresholds, order, searchContext);
 
         // Reject numeric aggregators with key-based ordering
         if (InternalOrder.isKeyOrder(order) && valuesSource instanceof ValuesSource.Numeric) {
             return StreamingCostMetrics.nonStreamable();
         }
 
         if (valuesSource instanceof WithOrdinals || valuesSource instanceof ValuesSource.Numeric) {
             return new StreamingCostMetrics(true, segmentTopN);
         }
 
         return StreamingCostMetrics.nonStreamable();
     }

Also applies to: 235-250, 600-681, 685-719

🤖 Fix all issues with AI agents
In `@server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java`:
- Around line 137-144: isSubAggregationStreamable currently only checks the
immediate aggregation type and misses nested aggs under a second-level
TermsAggregationBuilder; update isSubAggregationStreamable to, when agg is a
TermsAggregationBuilder, iterate its sub-aggregations and recursively validate
each sub-aggregation (e.g., by calling isSubAggregationStreamable on each child)
so that nested unsupported aggregations (like terms { terms { unsupported_agg
}}) are rejected; ensure the recursion handles only necessary depth (or full
recursion) and uses the
TermsAggregationBuilder.getSubAggregations()/sub-aggregation accessor to locate
children.

In
`@server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java`:
- Around line 405-414: createAggregatorFactory returns a FactoryAndContext that
holds a SearchContext which isn't closed, causing resource leaks; make
FactoryAndContext implement AutoCloseable and add a close() that closes the
contained SearchContext (call searchContext.close()), then update tests to use
try-with-resources (try (FactoryAndContext fac = createAggregatorFactory(...)) {
... }) so the SearchContext is always closed; alternatively, if you prefer
test-suite level cleanup, collect returned SearchContext instances and close
them in an `@After` method, but the preferred change is to implement AutoCloseable
on FactoryAndContext and switch tests to try-with-resources.
🧹 Nitpick comments (5)
CHANGELOG.md (1)

36-36: LGTM! Changelog entry is correctly formatted and placed.

The entry follows the established changelog format and is appropriately placed in the "Changed" section for a refactoring effort. The description matches the PR title and the link is correct.

💡 Optional: Consider a more detailed description

The AI summary indicates this is a significant architectural refactor (moving from runtime collector-tree evaluation to aggregation-factory estimation, introducing StreamingCostEstimable interface, removing Streamable interface and AggregatorTreeEvaluator). If you want to provide more context for future readers, you could expand the description slightly:

-- Refactor streaming agg query phase planning ([`#20471`](https://github.com/opensearch-project/OpenSearch/pull/20471))
+- Refactor streaming agg query phase planning to use factory-based cost estimation ([`#20471`](https://github.com/opensearch-project/OpenSearch/pull/20471))

However, the current concise format is consistent with other entries in this changelog, so this is purely optional.

server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java (1)

118-121: Consider removing redundant override.

This collectDebugInfo override only calls super.collectDebugInfo(add) without adding any streaming-specific debug information. Since the method simply delegates to the parent class, it can be removed entirely.

♻️ Suggested removal
-    `@Override`
-    public void collectDebugInfo(BiConsumer<String, Object> add) {
-        super.collectDebugInfo(add);
-    }
server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (1)

21-24: Consider using long for topNSize to align with FlushModeResolver settings.

The topNSize field is declared as int, but FlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNT is a long setting with default 100,000. While current values fit comfortably in an int, larger bucket counts could overflow in nested aggregation scenarios (e.g., multiplying two large topNSize values exceeds Integer.MAX_VALUE). Using long would provide consistency with the settings API and improve future-proofing.

server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java (1)

320-327: Consider using stream API for consistency.

The loop to detect non-streamable sub-factories could be simplified using streams, aligning with patterns used elsewhere in the codebase.

Suggested refactor
-                    boolean hasNonStreamable = false;
-                    for (AggregatorFactory subFactory : subFactoryArray) {
-                        if (!(subFactory instanceof StreamingCostEstimable)) {
-                            hasNonStreamable = true;
-                            break;
-                        }
-                    }
-                    assertTrue("Should have non-streamable sub-factory (TopHitsAggregatorFactory)", hasNonStreamable);
+                    boolean hasNonStreamable = Arrays.stream(subFactoryArray)
+                        .anyMatch(f -> !(f instanceof StreamingCostEstimable));
+                    assertTrue("Should have non-streamable sub-factory (TopHitsAggregatorFactory)", hasNonStreamable);
server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java (1)

359-370: Assert-only guard for non-empty factories array.

The assert factories.length > 0 on line 360 protects against empty arrays, but asserts are disabled in production. If somehow called with an empty array (despite the caller's guard at line 315), combined would remain null and be returned.

The current code is safe because the caller explicitly handles factories.length == 0 before calling this method (lines 315-316). Consider adding a brief comment noting this invariant for future maintainers.

Optional: Add clarifying comment
     private static StreamingCostMetrics estimateStreamingCostFromFactories(AggregatorFactory[] factories, SearchContext searchContext) {
+        // Caller (createTopLevelAggregators) handles empty case separately
         assert factories.length > 0 : "factories array must be non-empty";
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fc33a53 and 839df89.

📒 Files selected for processing (34)
  • CHANGELOG.md
  • plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java
  • server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregatorTreeEvaluator.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java
  • server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
  • server/src/main/java/org/opensearch/search/streaming/Streamable.java
  • server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java
  • server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java
  • server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java
  • server/src/test/java/org/opensearch/search/aggregations/AggregatorTreeEvaluatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java
  • server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregatorTests.java
  • server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java
  • server/src/test/java/org/opensearch/search/streaming/StreamingCostMetricsTests.java
  • test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java
💤 Files with no reviewable changes (8)
  • server/src/main/java/org/opensearch/search/streaming/Streamable.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregatorTreeEvaluator.java
  • server/src/test/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/AggregatorTreeEvaluatorTests.java
  • server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-02T19:23:29.698Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: The gRPC search API in OpenSearch is marked as "experimental" in official documentation, so changes to proto schemas that remove previously unsupported fields (those throwing UnsupportedOperationException) are not considered breaking changes.

Applied to files:

  • CHANGELOG.md
  • server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.

Applied to files:

  • server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java
  • server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java
  • server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java
  • server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java
  • server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
  • server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java
  • server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
  • server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java
🧬 Code graph analysis (3)
server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java (1)
server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java (1)
  • NumberFieldMapper (102-2185)
server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java (1)
server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java (1)
  • RestSearchAction (90-468)
server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (1)
server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java (1)
  • ExperimentalApi (29-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: Analyze (java)
🔇 Additional comments (35)
server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java (1)

74-74: LGTM! Removal of Streamable interface aligns with the factory-driven streaming model.

The class now only implements StarTreePreComputeCollector, consistent with the PR's architectural shift to handle streaming cost estimation at the aggregator factory level. MinAggregatorFactory correctly implements StreamingCostEstimable, and no remaining Streamable implementations exist in the metrics aggregators.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java (2)

495-504: LGTM!

The comment update accurately reflects the refactored behavior where factory-level estimation determines streaming eligibility. The test correctly validates that aggregation results remain accurate regardless of whether streaming is used.


522-587: Well-structured test with proper resource management.

The test correctly:

  1. Documents why the setting adjustment is needed (combined topN from terms shardSize * 2^precision).
  2. Uses try-finally to ensure settings are restored even if assertions fail.
  3. Validates both profile structure (StreamCardinalityAggregator presence) and result correctness (cardinality values).
  4. Uses low precision threshold intentionally to keep the test within bounds.
server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java (1)

71-77: The concern about double invocation of preCollection() is not valid.

preCollection() is called once in createCollector() (line 76) and no caller or wrapper invokes it again. The returned collector is passed directly to searcher.search() and searchLeaf(), which do not call preCollection(). Subsequent postCollection() processing in BucketCollectorProcessor only invokes postCollection(), never preCollection(). The refactor correctly consolidates the initialization into the collector creation method with no lifecycle issues.

test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java (1)

336-338: LGTM!

The explicit mock setup for getFlushMode() returning FlushMode.PER_SEGMENT is appropriate for test infrastructure. This correctly bypasses the cost-estimation decision logic in the factory, ensuring tests can directly exercise the streaming aggregator path without depending on cost-based decisions.

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java (1)

75-75: LGTM!

The removal of Streamable from the class declaration aligns with the architectural refactor that shifts streaming cost estimation from per-aggregator interfaces to factory-level StreamingCostEstimable. The aggregator retains its core functionality while streaming decisions are now made at the factory level.

server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java (1)

51-51: LGTM!

The removal of Streamable from ProfilingAggregator is consistent with the broader refactor. As a decorator wrapping other aggregators, it correctly delegates to the underlying aggregator for all core operations. Streaming decisions are now handled at the factory level, so the profiling wrapper no longer needs to participate in per-collector streaming metrics.

server/src/main/java/org/opensearch/search/streaming/StreamingCostEstimable.java (1)

14-24: LGTM!

Clean interface design following the single-responsibility principle. The @ExperimentalApi annotation appropriately marks this as experimental, and the Javadoc clearly describes the contract for implementors. The SearchContext parameter provides the necessary context for cost estimation based on field metadata and cardinality.

server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java (3)

28-42: LGTM!

The removal of Streamable from the class declaration aligns with the refactor. The constructor properly delegates to the parent CardinalityAggregator with all necessary parameters.


44-79: LGTM!

The getLeafCollector implementation correctly manages the streaming collector lifecycle:

  • Cleans up the previous collector before creating a new one
  • Handles null values source appropriately
  • Validates that only ordinal value sources are supported for streaming
  • Creates the appropriate collector type based on ordinal count

81-95: LGTM!

The doReset implementation correctly handles batch transitions by:

  • Calling super to reset parent state
  • Closing the stream collector
  • Recreating HyperLogLogPlusPlus counts (necessary since HLL lacks a public reset method)
server/src/main/java/org/opensearch/search/streaming/StreamingCostMetrics.java (3)

35-46: LGTM!

The neutral() factory method is well-designed for metric aggregations (max, min, avg, etc.) that produce single-value outputs. A topNSize of 1 correctly represents that these aggregations don't contribute to bucket proliferation while remaining compatible with streaming.


58-70: LGTM!

The combineWithSubAggregation correctly models nested aggregation cost with multiplicative effects. Using Math.multiplyExact for overflow detection and falling back to nonStreamable() is a safe approach that prevents invalid metric propagation.


82-94: LGTM!

The combineWithSibling correctly models parallel aggregation cost with additive effects. Using Math.addExact for overflow detection mirrors the approach in combineWithSubAggregation for consistent handling.

server/src/test/java/org/opensearch/search/streaming/StreamingCostMetricsTests.java (3)

15-31: LGTM! Tests cover the simplified constructor and neutral factory method.

The tests properly verify the new 2-parameter constructor and the neutral() factory method. The assertions correctly validate both streamable() and topNSize() return values.


33-50: LGTM! Overflow handling tests are well-designed.

The combination tests correctly verify multiplication semantics for sub-aggregations (100 * 50 = 5000) and that integer overflow correctly triggers a non-streamable result.


52-78: LGTM! Sibling combination and non-streamable propagation tests are correct.

Tests properly verify addition semantics for siblings (100 + 200 = 300), overflow detection, and that combining with a non-streamable metric correctly propagates the non-streamable state.

server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java (3)

85-94: LGTM! Decision logic is clean and straightforward.

The decideFlushMode method correctly short-circuits on non-streamable metrics and uses a simple threshold check for bucket count.


108-120: LGTM! Eligibility check properly validates top-level aggregations.

The null/empty check and iteration over top-level aggregations is correct.


48-75: Settings are not applied in the decideFlushMode decision logic.

STREAMING_MIN_CARDINALITY_RATIO and STREAMING_MIN_ESTIMATED_BUCKET_COUNT are defined and exposed through DefaultSearchContext getters, but they are not currently used within the decideFlushMode method. The method only considers maxBucketCount in its decision criteria. If these thresholds are intended to gate streaming decisions (as their documentation suggests), they should be integrated into the decision logic.

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java (2)

147-158: LGTM! Streaming cost estimation correctly limits to ordinal-based sources.

The estimation logic properly restricts streaming support to Bytes.WithOrdinals value sources, and the HLL register count calculation (1 << precision()) is safe within int bounds for the expected precision range.


113-119: LGTM! FlushMode checks are appropriately tightened.

Both createUnmapped and doCreateInternal now explicitly require FlushMode.PER_SEGMENT rather than allowing null FlushMode, reducing ambiguity in the streaming path.

Also applies to: 139-145

server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java (1)

57-57: LGTM! Neutral streaming cost is appropriate for sum aggregation.

Sum aggregation produces a single value per bucket without additional overhead, so returning neutral() correctly represents zero additional streaming cost.

Also applies to: 106-109

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java (1)

462-467: LGTM! Streaming eligibility check is properly delegated.

The refactored canUseStreamSearch correctly handles the null source case and delegates aggregation validation to FlushModeResolver.isEligibleForStreaming, improving separation of concerns.

server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java (1)

56-56: LGTM! Neutral streaming cost is appropriate for value_count aggregation.

Value count aggregation produces a single count per bucket without additional overhead, so returning neutral() correctly represents zero additional streaming cost.

Also applies to: 99-103

server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java (1)

45-46: LGTM!

The implementation of StreamingCostEstimable is clean and appropriate. Returning StreamingCostMetrics.neutral() is correct for metric aggregations like avg since they don't create additional buckets and are inherently streamable.

Also applies to: 57-57, 106-109

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java (1)

45-46: LGTM!

Consistent implementation with other metric aggregator factories. Returning neutral metrics is appropriate for max aggregation.

Also applies to: 57-57, 106-109

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java (1)

45-46: LGTM!

Consistent implementation with other metric aggregator factories (AvgAggregatorFactory, MaxAggregatorFactory). The neutral metrics approach is appropriate for min aggregation.

Also applies to: 57-57, 106-109

server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java (1)

25-64: LGTM!

Excellent test coverage for FlushModeResolver.decideFlushMode logic including:

  • Non-streamable metrics
  • Streamable metrics with various topN values
  • Boundary condition (topN exactly at max)
  • Neutral metrics behavior

The tests are well-documented with inline comments explaining the expected behavior.

server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java (2)

313-330: Well-structured streaming decision logic.

The streaming cost estimation workflow is properly integrated:

  • Correctly guards with isStreamSearch() and flushMode == null check
  • Handles the edge case of empty factories by defaulting to PER_SHARD
  • Uses setFlushModeIfAbsent to avoid overwriting any previously set mode
  • Good debug logging for observability

383-404: LGTM!

The estimateFromFactory method cleanly handles the recursive cost estimation:

  • Returns early for non-StreamingCostEstimable factories
  • Properly propagates non-streamable status from both factory and sub-factories
  • Uses appropriate combination methods (combineWithSubAggregation vs combineWithSibling)
server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java (1)

146-218: Good expanded coverage for streaming eligibility.

The new tests cover multiple-terms top-level and supported/unsupported sub-agg combinations, which should guard the updated eligibility logic well.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java (1)

48-82: segmentTopN wiring looks consistent.

Constructor injection and top-bucket selection are aligned.

Also applies to: 242-243

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java (1)

55-74: StreamingCostEstimable integration looks clean.

The interface adoption and related imports fit well with the new estimation flow.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java (1)

52-83: segmentTopN usage is consistent in numeric streaming flow.

Constructor injection and selection usage line up cleanly.

Also applies to: 170-174

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@github-actions
Copy link
Contributor

❌ Gradle check result for 839df89: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
…w, if not supported, will remove

Signed-off-by: bowenlan-amzn <[email protected]>
@github-actions
Copy link
Contributor

❌ Gradle check result for 43fddbc: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 213a6ab: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: bowenlan-amzn <[email protected]>
@github-actions
Copy link
Contributor

❌ Gradle check result for ab931dd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: bowenlan-amzn <[email protected]>
@bowenlan-amzn bowenlan-amzn force-pushed the streaming-agg-planning-refactor branch from 60c253d to 6ddb9e1 Compare January 28, 2026 18:25
@github-actions
Copy link
Contributor

❌ Gradle check result for dbe69b6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

✅ Gradle check result for 99a50a5: SUCCESS

@codecov
Copy link

codecov bot commented Jan 28, 2026

Codecov Report

❌ Patch coverage is 77.58621% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.35%. Comparing base (fc33a53) to head (99a50a5).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...nsearch/search/streaming/StreamingCostMetrics.java 55.17% 3 Missing and 10 partials ⚠️
...regations/bucket/terms/TermsAggregatorFactory.java 80.95% 1 Missing and 3 partials ⚠️
...earch/search/aggregations/AggregatorFactories.java 75.00% 1 Missing and 2 partials ⚠️
...egations/metrics/CardinalityAggregatorFactory.java 66.66% 1 Missing and 1 partial ⚠️
...rch/search/aggregations/metrics/SumAggregator.java 60.00% 0 Missing and 2 partials ⚠️
.../main/java/org/opensearch/index/IndexSettings.java 75.00% 1 Missing ⚠️
...opensearch/search/streaming/FlushModeResolver.java 96.29% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20471      +/-   ##
============================================
+ Coverage     73.19%   73.35%   +0.15%     
- Complexity    71975    72112     +137     
============================================
  Files          5796     5795       -1     
  Lines        329539   329462      -77     
  Branches      47465    47458       -7     
============================================
+ Hits         241220   241675     +455     
+ Misses        69005    68452     -553     
- Partials      19314    19335      +21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rishabhmaurya rishabhmaurya merged commit 0d58b07 into opensearch-project:main Jan 28, 2026
37 checks passed
@bowenlan-amzn bowenlan-amzn added the v3.5.0 Issues and PRs related to version 3.4.0 label Jan 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

v3.5.0 Issues and PRs related to version 3.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants