[GRPC] Add gRPC support for Min, Max, and Terms aggregations #21205
[GRPC] Add gRPC support for Min, Max, and Terms aggregations #21205lucy66hw wants to merge 7 commits intoopensearch-project:mainfrom
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit 6c7e5b4.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
| // TODO: Nested aggregations not yet supported in proto definition | ||
| // if (container.getAggregationsCount() > 0) { | ||
| // AggregatorFactories.Builder subFactories = new AggregatorFactories.Builder(); | ||
| // | ||
| // for (Map.Entry<String, AggregationContainer> entry : container.getAggregationsMap().entrySet()) { | ||
| // String subAggName = entry.getKey(); | ||
| // AggregationContainer subAggContainer = entry.getValue(); | ||
| // | ||
| // logger.debug("Parsing subaggregation '{}' for parent '{}'", subAggName, name); | ||
| // AggregationBuilder subAgg = fromProto(subAggName, subAggContainer); | ||
| // subFactories.addAggregator(subAgg); | ||
| // } | ||
| // | ||
| // builder.subAggregations(subFactories); | ||
| // logger.debug("Added {} subaggregation(s) to aggregation '{}'", container.getAggregationsCount(), name); | ||
| // } |
There was a problem hiding this comment.
Can we remove this comment as subaggregations are supported already in TermsAggregationBuilderConverter?
| // Future: Register bucket aggregation converters here | ||
| // Example: delegate.registerConverter(new TermsAggregationBuilderProtoConverter()); |
There was a problem hiding this comment.
Can we remove these 2 comments on L50-51 as it seems this is already handled by delegate.setRegistryOnAllConverters(this);?
| */ | ||
| public class AggregateProtoUtils { | ||
|
|
||
| private static AggregateProtoConverterRegistry registry = new AggregateProtoConverterRegistryImpl(); |
There was a problem hiding this comment.
This follows a different pattern than how queryUtils/aggregationResgistry are handled on the request-side, i.e. The request-side uses constructor injection (registry passed through SearchServiceImpl -> SearchRequestProtoUtils -> SearchSourceBuilderProtoUtils), but the response-side uses this static mutable singleton - this is not safe if an external plugin wants tocall setRegistry - it would replace the entire global registry
Shall we remove this static registry, and add a AggregateProtoConverterRegistry to AggregationsProtoUtils.toProto and SearchResponseSectionsProtoUtils.toProto, then thread the registry from SearchServiceImpl (which already has it from GrpcPlugin) down through the response conversion chain , to match the request side?
There was a problem hiding this comment.
Addressed — AggregateProtoUtils no longer holds a static registry. The registry is now passed as a parameter through toProto(aggregation, registry) and SearchServiceImpl through the response chain.
| */ | ||
| private static IncludeExclude convertInclude(TermsInclude include) { | ||
| return switch (include.getTermsIncludeCase()) { | ||
| case TERMS -> { |
There was a problem hiding this comment.
Are we missing the regex option like the REST side https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/IncludeExclude.java#L126-L127 ? eg REST-side you can pass something like "include": "foo.*"
But termsInclude protos seems only to have terms and partition, no regex option.
message TermsInclude {
oneof terms_include {
StringArray terms = 1;
TermsPartition partition = 2;
}
}
Should we explicitly add a regex to the protos for clarity?
message TermsInclude {
oneof terms_include {
string regex = 1; // single regex pattern string
StringArray terms = 2; // array of exact terms
TermsPartition partition = 3;
}
}
There was a problem hiding this comment.
Actually I see the spec has this regexp field: https://github.com/opensearch-project/opensearch-api-specification/blob/7a42f5b107a498caf78cb7ae9aca3d5177bb41d6/spec/schemas/_common.aggregations.yaml#L2266
Wonder why it didnt carry over to the protos?
There was a problem hiding this comment.
good catch. It was caused by a protobuf tooling rule that collapses fields when array of items and item have the same type under oneof.
the tooling had been fixed and added regex handling.
| } | ||
|
|
||
| /** | ||
| * Converts protobuf MinAggregation to {@link org.opensearch.search.aggregations.metrics.MinAggregationBuilder}. |
There was a problem hiding this comment.
nit can we just import this class?
| termsBuilder.addBuckets(convertBucket(bucket)); | ||
| } | ||
|
|
||
| AggregateProtoUtils.addMetadata(termsBuilder::setMeta, doubleTerms); |
There was a problem hiding this comment.
Should we move all AggregateProtoUtils.addMetadata() calls to the very top of the toProto methods, as the first field? Since InternalAggregation.toXContent sets it first, before any of the doXContent() from the concrete aggregate types set their fields
3f875fd to
b393fa1
Compare
| private UnsignedLongTermsBucket convertBucket(UnsignedLongTerms.Bucket bucket) throws IOException { | ||
| UnsignedLongTermsBucket.Builder builder = UnsignedLongTermsBucket.newBuilder(); | ||
|
|
||
| builder.setKey(((Number) bucket.getKey()).longValue()); |
There was a problem hiding this comment.
It's not obvious how this works, can we add a comment here?
Looks like this happens to work only because BigInterger.longValue() just happens to return the low 64 bits and proto's uint64 stores them with the same bit pattern, which is not obvious, and also mismatches the REST side (which uses BigInteger directly).
|
|
||
| for (Aggregation subAgg : bucket.getAggregations()) { | ||
| if (subAgg instanceof InternalAggregation internalAgg) { | ||
| builder.getMutableAggregate().put(subAgg.getName(), AggregateProtoUtils.toProto(internalAgg, registry)); |
There was a problem hiding this comment.
getMutableAggregate() is an internal protobuf method,
can we just use the public API: builder.putAggregate(subAgg.getName(),AggregateProtoUtils.toProto(internalAgg, registry));?
Same for all other files where this is used
There was a problem hiding this comment.
meke sense. replaced to builder.putAggregate() to all other terms aggregate converter.
| guava = "33.2.1-jre" | ||
| gson = "2.13.2" | ||
| opensearchprotobufs = "1.3.0" | ||
| opensearchprotobufs = "1.4.0-SNAPSHOT" |
There was a problem hiding this comment.
To be updated with a non-snapshot version before merging
This commit adds complete gRPC support for Min and Max metric aggregations, including request parsing, response conversion, and integration with the search pipeline. **Request Conversion (OpenSearch ← gRPC):** - AggregationContainerProtoUtils: Central dispatcher routing aggregation types to specific converters, validates aggregation names - ValuesSourceAggregationProtoUtils: Shared utilities for parsing common ValuesSource fields (field, missing, value_type, format, script) - MinAggregationProtoUtils: Converts proto MinAggregation → MinAggregationBuilder - MaxAggregationProtoUtils: Converts proto MaxAggregation → MaxAggregationBuilder - Updated SearchSourceBuilderProtoUtils to parse aggregations map from proto SearchRequestBody and add to SearchSourceBuilder **Response Conversion (gRPC ← OpenSearch):** - SearchResponseSectionsProtoUtils: **CRITICAL FIX** - Added aggregation response conversion to search response pipeline, enabling aggregation results to be returned via gRPC. This connects the Min/Max aggregate converters to the search response builder. - AggregateProtoUtils: Central dispatcher for converting InternalAggregation to proto Aggregate, with metadata and sub-aggregation helpers - MinAggregateProtoUtils: Converts InternalMin → proto MinAggregate - MaxAggregateProtoUtils: Converts InternalMax → proto MaxAggregate - Handles special values (infinity, NaN), formatting, and metadata **OpenSearch Core Changes:** - InternalNumericMetricsAggregation: Added getFormat() getter to expose format information for gRPC converters **Request Conversion Tests:** - AggregationContainerProtoUtilsTests: 11 tests - MinAggregationProtoUtilsTests: 108 tests - MaxAggregationProtoUtilsTests: 108 tests - ValuesSourceAggregationProtoUtilsTests: 40+ tests - SearchSourceBuilderProtoUtilsTests: 2 new aggregation tests **Response Conversion Tests:** - AggregateProtoUtilsTests: 10 tests - MinAggregateProtoUtilsTests: 190 tests (values, infinity, NaN, formatting) - MaxAggregateProtoUtilsTests: 190 tests (values, infinity, NaN, formatting) - SearchResponseSectionsProtoUtilsTests: 2 new aggregation tests (null/present) - InternalMinTests: 32 tests - InternalMaxTests: 32 tests **Total: ~680 test cases** **Design Principles:** - Mirrors REST API patterns for consistency - Maintains behavioral parity with REST layer - Comprehensive error handling and validation - Special value support (infinity, NaN) matching REST behavior - Metadata handling consistent across all aggregations **Implementation Summary:** - **New Implementation Files**: 11 (converters + infrastructure) - **New Test Files**: 8 (comprehensive test coverage) - **Modified Files**: 4 (SearchSourceBuilder + SearchResponseSections + server changes) - **Package Documentation**: 5 package-info.java files - **Total Lines**: ~1,850 (implementation + tests) **Testing & Validation:** - Comprehensive integration testing via REST and gRPC - Verified behavioral parity (see min_max_aggregation_comparison_report.md) - All core functionality confirmed working correctly - Values match exactly between REST and gRPC responses **API Differences (By Design):** - REST uses simple JSON values; gRPC uses typed protobuf structures - Parameter formats differ (e.g., missing parameter requires FieldValue in gRPC) - Field naming follows protobuf conventions (camelCase vs snake_case) Co-Authored-By: Claude (claude-sonnet-4-5) <noreply@anthropic.com> Signed-off-by: Yiyu Pan <yypan14@gmail.com>
Signed-off-by: Patrick Zhai <pzhai@uber.com>
Signed-off-by: xil <fridalu66@gmail.com>
Signed-off-by: xil <fridalu66@gmail.com>
Signed-off-by: xil <fridalu66@gmail.com>
Signed-off-by: xil <fridalu66@gmail.com>
96de6bd to
aff6d31
Compare
Signed-off-by: xil <fridalu66@gmail.com>
Following this to skip the diff analyzer as the flagged change is manually verified to be fine diff_report:
The table above displays the top 10 most important findings. |
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
|
❌ Gradle check result for 6c7e5b4: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
Based on #20676, #20970
TEST
Index Mapping
{ "properties": { "status": { "type": "keyword" }, "category_id": { "type": "long" }, "price": { "type": "double" }, "big_id": { "type": "unsigned_long" } } }Document count: 5
Test 1 — Keyword field (
status)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "status_terms": { "terms_aggregation": { "terms": { "field": "status" } } } } } }REST Response (
aggregationsonly){ "status_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "active", "doc_count": 3 }, { "key": "inactive", "doc_count": 2 } ] } }gRPC Response (
aggregationsonly){ "status_terms": { "sterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0", "buckets": [ { "key": "active", "docCount": "3" }, { "key": "inactive", "docCount": "2" } ] } } }Test 2 — Long field (
category_id)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "category_terms": { "terms_aggregation": { "terms": { "field": "category_id" } } } } } }REST Response (
aggregationsonly){ "category_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": 1, "doc_count": 2 }, { "key": 2, "doc_count": 2 }, { "key": 3, "doc_count": 1 } ] } }gRPC Response (
aggregationsonly){ "category_terms": { "lterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0", "buckets": [ { "key": { "signed": "1" }, "docCount": "2" }, { "key": { "signed": "2" }, "docCount": "2" }, { "key": { "signed": "3" }, "docCount": "1" } ] } } }Test 3 — Double field (
price)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "price_terms": { "terms_aggregation": { "terms": { "field": "price" } } } } } }REST Response (
aggregationsonly){ "price_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": 5.2, "doc_count": 1 }, { "key": 10.5, "doc_count": 1 }, { "key": 15.0, "doc_count": 1 }, { "key": 25.0, "doc_count": 1 }, { "key": 30.0, "doc_count": 1 } ] } }gRPC Response (
aggregationsonly){ "price_terms": { "dterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0", "buckets": [ { "key": 5.2, "docCount": "1" }, { "key": 10.5, "docCount": "1" }, { "key": 15, "docCount": "1" }, { "key": 25, "docCount": "1" }, { "key": 30, "docCount": "1" } ] } } }Test 4 — Unsigned Long field (
big_id)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "bigid_terms": { "terms_aggregation": { "terms": { "field": "big_id" } } } } } }REST Response (
aggregationsonly){ "bigid_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": 9223372036854775808, "doc_count": 2 }, { "key": 9223372036854775809, "doc_count": 2 }, { "key": 9223372036854775810, "doc_count": 1 } ] } }gRPC Response (
aggregationsonly){ "bigid_terms": { "ulterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0", "buckets": [ { "key": "9223372036854775808", "docCount": "2" }, { "key": "9223372036854775809", "docCount": "2" }, { "key": "9223372036854775810", "docCount": "1" } ] } } }Test 5 — Unmapped field (
does_not_exist)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "unmapped_terms": { "terms_aggregation": { "terms": { "field": "does_not_exist" } } } } } }REST Response (
aggregationsonly){ "unmapped_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [] } }gRPC Response (
aggregationsonly){ "unmapped_terms": { "umterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0" } } }Test 6 — Keyword field with sub-aggregation (
status+max_price)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "status_terms": { "terms_aggregation": { "terms": { "field": "status" }, "aggregations": { "max_price": { "max": { "field": "price" } } } } } } } }REST Response (
aggregationsonly){ "status_terms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "active", "doc_count": 3, "max_price": { "value": 25.0 } }, { "key": "inactive", "doc_count": 2, "max_price": { "value": 30.0 } } ] } }gRPC Response (
aggregationsonly){ "status_terms": { "sterms": { "doc_count_error_upper_bound": "0", "sum_other_doc_count": "0", "buckets": [ { "key": "active", "docCount": "3", "aggregate": { "max_price": { "max": { "value": { "double": 25 } } } } }, { "key": "inactive", "docCount": "2", "aggregate": { "max_price": { "max": { "value": { "double": 30 } } } } } ] } } }Test 7 — Min aggregation (
price,category_id,big_id)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "min_price": { "min": { "field": "price" } }, "min_catid": { "min": { "field": "category_id" } }, "min_bigid": { "min": { "field": "big_id" } } } } }REST Response (
aggregationsonly){ "min_price": { "value": 5.2 }, "min_catid": { "value": 1.0 }, "min_bigid": { "value": 9.223372036854776e+18, "value_as_string": "9.223372036854776E18" } }gRPC Response (
aggregationsonly){ "min_price": { "min": { "value": { "double": 5.2 } } }, "min_catid": { "min": { "value": { "double": 1 } } }, "min_bigid": { "min": { "value": { "double": 9.223372036854776e+18 }, "valueAsString": "9.223372036854776E18" } } }Test 8 — Max aggregation (
price,category_id,big_id)gRPC Request
{ "index": ["test-terms-agg"], "search_request_body": { "size": 0, "aggregations": { "max_price": { "max": { "field": "price" } }, "max_catid": { "max": { "field": "category_id" } }, "max_bigid": { "max": { "field": "big_id" } } } } }REST Response (
aggregationsonly){ "max_price": { "value": 30.0 }, "max_catid": { "value": 3.0 }, "max_bigid": { "value": 9.223372036854776e+18, "value_as_string": "9.223372036854776E18" } }gRPC Response (
aggregationsonly){ "max_price": { "max": { "value": { "double": 30 } } }, "max_catid": { "max": { "value": { "double": 3 } } }, "max_bigid": { "max": { "value": { "double": 9.223372036854776e+18 }, "valueAsString": "9.223372036854776E18" } } }Test 9 — Min/Max on unmapped field (
does_not_exist)REST Response (
aggregationsonly){ "min_missing": { "value": null }, "max_missing": { "value": null } }gRPC Response (
aggregationsonly){ "min_missing": { "min": { "value": { "nullValue": "NULL_VALUE_NULL" } } }, "max_missing": { "max": { "value": { "nullValue": "NULL_VALUE_NULL" } } } }Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.