Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
150172b
Initial commit intra segment
prudhvigodithi Oct 21, 2025
0aa7695
intra segment
prudhvigodithi Oct 21, 2025
1056140
intra segment, update default values
prudhvigodithi Oct 21, 2025
73fba5f
intra segment, update default values
prudhvigodithi Oct 21, 2025
5ffd097
comment info logger
prudhvigodithi Oct 27, 2025
019470b
Fix SubSearchContext
prudhvigodithi Oct 27, 2025
49fd872
Default enable test
prudhvigodithi Oct 27, 2025
5b03b2d
Test auto partition logic
prudhvigodithi Oct 28, 2025
ad9c5c6
Update LPT logic
prudhvigodithi Nov 11, 2025
9b1d15f
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Nov 18, 2025
5769681
Intra segment
prudhvigodithi Dec 4, 2025
a0254a3
disable logs
prudhvigodithi Dec 5, 2025
77f633e
Intra segment, code cleanup
prudhvigodithi Dec 5, 2025
4677272
Update cluster settings
prudhvigodithi Dec 8, 2025
aa60299
Update cluster settings
prudhvigodithi Dec 8, 2025
9e88032
Update cluster settings
prudhvigodithi Dec 8, 2025
6c013cd
Update cluster settings
prudhvigodithi Dec 8, 2025
0e9cd66
Update cluster settings
prudhvigodithi Dec 8, 2025
6c01ab8
code cleanup
prudhvigodithi Dec 9, 2025
c61e97d
Onboard intra segment decider and queries
prudhvigodithi Dec 17, 2025
d27e21d
Onboard intra segment decider and queries
prudhvigodithi Dec 17, 2025
50293b8
Add java docs
prudhvigodithi Dec 18, 2025
bd06f77
Fix tests
prudhvigodithi Dec 18, 2025
d084d4a
Fix tests
prudhvigodithi Dec 18, 2025
ff87e7d
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Dec 18, 2025
3f2917c
Default Enable
prudhvigodithi Dec 18, 2025
9274fa3
Default Enable
prudhvigodithi Dec 18, 2025
c929afb
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Dec 23, 2025
d208056
Support global agg
prudhvigodithi Dec 24, 2025
d6c0682
disable for start tree
prudhvigodithi Jan 12, 2026
455ed13
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 12, 2026
9492d3d
Spotless fix
prudhvigodithi Jan 13, 2026
db432ca
Add partition strategy
prudhvigodithi Jan 21, 2026
049be6b
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 21, 2026
6827e91
Upstream fetch
prudhvigodithi Jan 21, 2026
67b20f0
code cleanup
prudhvigodithi Jan 22, 2026
57b41fb
code cleanup
prudhvigodithi Jan 22, 2026
e3f16a2
Fix tests
prudhvigodithi Jan 23, 2026
f2e4734
Initial code for enabling intra
prudhvigodithi Jan 23, 2026
9346e19
Initial code for enabling intra
prudhvigodithi Jan 23, 2026
ff6bb6b
Initial code for enabling intra
prudhvigodithi Jan 23, 2026
8ff0c81
code refactor
prudhvigodithi Jan 23, 2026
ac940f4
code remove
prudhvigodithi Jan 23, 2026
146909c
code remove
prudhvigodithi Jan 23, 2026
df9e25e
code remove
prudhvigodithi Jan 24, 2026
cd401b1
Add tests
prudhvigodithi Jan 24, 2026
ff1cb77
Update changelog
prudhvigodithi Jan 24, 2026
e749e79
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 24, 2026
63ffd57
Upstream fetch
prudhvigodithi Jan 24, 2026
807c0cc
use balanced as default
prudhvigodithi Jan 26, 2026
231a696
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 26, 2026
81ea6ff
address comments
prudhvigodithi Jan 26, 2026
864da81
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 26, 2026
4b85551
Upstream fetch
prudhvigodithi Jan 26, 2026
e02800d
Change from none to segment
prudhvigodithi Jan 27, 2026
3fd164c
Address PR comments
prudhvigodithi Jan 28, 2026
a8782b6
Address PR comments
prudhvigodithi Jan 28, 2026
90505f3
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 28, 2026
677683c
Fix conflict
prudhvigodithi Jan 28, 2026
5d4dcf6
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 28, 2026
31bdfdd
Fix conflict
prudhvigodithi Jan 28, 2026
6bf2c10
Fix tests
prudhvigodithi Jan 28, 2026
5880b11
Merge remote-tracking branch 'upstream/main' into approx
prudhvigodithi Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update to `almalinux:10` ([#20482](https://github.com/opensearch-project/OpenSearch/pull/20482))
- Add X-Request-Id to uniquely identify a search request ([#19798](https://github.com/opensearch-project/OpenSearch/pull/19798))
- Added TopN selection logic for streaming terms aggregations ([#20481](https://github.com/opensearch-project/OpenSearch/pull/20481))
- Added support for Intra Segment Search ([#19704](https://github.com/opensearch-project/OpenSearch/pull/19704))

### Changed
- Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;

import static org.opensearch.index.query.QueryBuilders.spanNearQuery;
import static org.opensearch.index.query.QueryBuilders.spanTermQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

/**
* Integration tests for span_near queries with concurrent segment search and partition strategies.
*/
public class SpanNearQueryIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public SpanNearQueryIT(Settings staticSettings) {
super(staticSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "segment").build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "force")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() },
new Object[] {
Settings.builder()
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(), "balanced")
.put(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(), 1000)
.build() }
);
}

public void testSpanNearQuery() throws Exception {
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
int totalDocs = 2500;
for (int i = 0; i < totalDocs; i++) {
String content = (i % 100 == 0) ? "alpha beta" : (i % 50 == 0) ? "alpha gamma beta" : "other words " + i;
client().prepareIndex("test").setId(String.valueOf(i)).setSource("field", content).get();
}
refresh();
forceMerge(1);
indexRandomForConcurrentSearch("test");
SearchResponse response = client().prepareSearch("test")
.setQuery(spanNearQuery(spanTermQuery("field", "alpha"), 0).addClause(spanTermQuery("field", "beta")).inOrder(true))
.get();
assertHitCount(response, 25L);
response = client().prepareSearch("test")
.setQuery(spanNearQuery(spanTermQuery("field", "alpha"), 1).addClause(spanTermQuery("field", "beta")).inOrder(true))
.get();
assertHitCount(response, 50L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE,
SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY,
SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
Expand Down
30 changes: 30 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,36 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

// Partition strategy constants
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT = "segment";
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_BALANCED = "balanced";
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_FORCE = "force";

public static final Setting<String> INDEX_CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY = Setting.simpleString(
"index.search.concurrent_segment_search.partition_strategy",
CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT,
value -> {
switch (value) {
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT:
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_BALANCED:
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_FORCE:
break;
default:
throw new IllegalArgumentException("Setting value must be one of [segment, balanced, force]");
}
},
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Integer> INDEX_CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE = Setting.intSetting(
"index.search.concurrent_segment_search.partition_min_segment_size",
500_000,
1000,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting(
"index.optimize_doc_id_lookup.fuzzy_set.enabled",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,13 @@ default void visit(QueryBuilderVisitor visitor) {
visitor.accept(this);
};

/**
* Indicates whether this query benefits from intra-segment search.
* Override to return {@code true} for compute-heavy queries that parallelize well
* Default is {@code false} - queries must explicitly opt-in.
*/
default boolean supportsIntraSegmentSearch() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ public void visit(QueryBuilderVisitor visitor) {
}
}

@Override
public boolean supportsIntraSegmentSearch() {
return true;
}

/**
* SpanGapQueryBuilder enables gaps in a SpanNearQuery.
* Since, SpanGapQuery is private to SpanNearQuery, SpanGapQueryBuilder cannot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,9 @@ public static SpanTermQueryBuilder fromXContent(XContentParser parser) throws IO
public String getWriteableName() {
return NAME;
}

@Override
public boolean supportsIntraSegmentSearch() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
import org.opensearch.search.deciders.IntraSegmentSearchDecider;
import org.opensearch.search.deciders.IntraSegmentSearchVisitor;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSearchResult;
Expand All @@ -106,6 +108,7 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.streaming.FlushMode;
import org.opensearch.search.suggest.SuggestionSearchContext;

Expand All @@ -132,6 +135,9 @@
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_AUTO;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT;
import static org.opensearch.search.SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
import static org.opensearch.search.streaming.FlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNT;
Expand Down Expand Up @@ -220,6 +226,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final String concurrentSearchMode;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final SetOnce<Boolean> requestShouldUseIntraSegmentSearch = new SetOnce<>();
private final int maxAggRewriteFilters;
private final int filterRewriteSegmentThreshold;
private final int cardinalityAggregationPruningThreshold;
Expand Down Expand Up @@ -1036,13 +1043,23 @@ private boolean evaluateAutoMode() {
logger.debug("request has supported aggregations, using concurrent search");
}
return true;

} else {
if (logger.isDebugEnabled()) {
logger.debug("request does not have aggregations, not using concurrent search");
} else if (CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT.equals(getPartitionStrategy()) == false
&& request().source() != null
&& request().source().query() != null
&& request().source().query().supportsIntraSegmentSearch()) {
if (logger.isDebugEnabled()) {
logger.debug(
"request query supports intra-segment search, using concurrent search with partition strategy: {}",
getPartitionStrategy()
);
}
return true;
} else {
if (logger.isDebugEnabled()) {
logger.debug("request does not have aggregations, not using concurrent search");
}
return false;
}
return false;
}

} else {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -1332,4 +1349,71 @@ public double getStreamingMinCardinalityRatio() {
public long getStreamingMinEstimatedBucketCount() {
return clusterService.getClusterSettings().get(STREAMING_MIN_ESTIMATED_BUCKET_COUNT);
}

/**
* Returns the partition strategy for this search context.
*/
@Override
public String getPartitionStrategy() {
return indexService.getIndexSettings()
.getSettings()
.get(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY.getKey(),
clusterService.getClusterSettings().get(CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY)
);
}

/**
* Returns the minimum segment size required for balanced partitioning.
*/
@Override
public int getPartitionMinSegmentSize() {
return indexService.getIndexSettings()
.getSettings()
.getAsInt(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE.getKey(),
clusterService.getClusterSettings().get(CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE)
);
}

/**
* Returns intra-segment search status for the search context.
*/
@Override
public boolean shouldUseIntraSegmentSearch() {
return Boolean.TRUE.equals(requestShouldUseIntraSegmentSearch.get());
}

/**
* Evaluate if request should use intra-segment search based on partition strategy and query/aggregation analysis.
*/
public void evaluateRequestShouldUseIntraSegmentSearch() {
String partitionStrategy = getPartitionStrategy();
if (CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT.equals(partitionStrategy) || shouldUseConcurrentSearch() == false) {
requestShouldUseIntraSegmentSearch.set(false);
return;
}
// StarTree precomputes aggregations at index time - intra-segment adds no benefit
if (aggregations() != null && StarTreeQueryHelper.getSupportedStarTree(getQueryShardContext()) != null) {
logger.debug("partition strategy decision: StarTree detected, disabling intra-segment");
requestShouldUseIntraSegmentSearch.set(false);
return;
}
IntraSegmentSearchDecider decider = new IntraSegmentSearchDecider();
if (request().source() != null && request().source().query() != null) {
IntraSegmentSearchVisitor visitor = new IntraSegmentSearchVisitor(decider);
request().source().query().visit(visitor);
}
if (aggregations() != null && aggregations().factories() != null) {
decider.evaluateForAggregations(aggregations().factories());
}
boolean result = decider.shouldUseIntraSegmentSearch();
logger.debug(
"partition strategy decision: strategy={}, useIntraSegment={}, reason={}",
partitionStrategy,
result,
decider.getReason()
);
requestShouldUseIntraSegmentSearch.set(result);
}
}
35 changes: 35 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,39 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic,
Property.NodeScope
);

// Partition strategy constants
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT = "segment";
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_BALANCED = "balanced";
public static final String CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_FORCE = "force";

// Partition strategy setting
public static final Setting<String> CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY = Setting.simpleString(
"search.concurrent_segment_search.partition_strategy",
CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_BALANCED,
value -> {
switch (value) {
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_SEGMENT:
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_BALANCED:
case CONCURRENT_SEGMENT_SEARCH_PARTITION_STRATEGY_FORCE:
break;
default:
throw new IllegalArgumentException("Setting value must be one of [segment, balanced, force]");
}
},
Property.Dynamic,
Property.NodeScope
);

// Minimum segment size for balanced partitioning (only applies when partition_strategy = balanced)
public static final Setting<Integer> CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE = Setting.intSetting(
"search.concurrent_segment_search.partition_min_segment_size",
500_000,
1000,
Property.Dynamic,
Property.NodeScope
);

// value 0 means rewrite filters optimization in aggregations will be disabled
@ExperimentalApi
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
Expand Down Expand Up @@ -1532,6 +1565,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
// nothing to parse...
if (source == null) {
context.evaluateRequestShouldUseConcurrentSearch();
context.evaluateRequestShouldUseIntraSegmentSearch();
return;
}

Expand Down Expand Up @@ -1723,6 +1757,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.collapse(collapseContext);
}
context.evaluateRequestShouldUseConcurrentSearch();
context.evaluateRequestShouldUseIntraSegmentSearch();
if (source.profile()) {
final Function<Query, Collection<Supplier<ProfileMetric>>> pluginProfileMetricsSupplier = (query) -> pluginProfilers.stream()
.flatMap(p -> p.getQueryProfileMetrics(context, query).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ public boolean allFactoriesSupportConcurrentSearch() {
return true;
}

public boolean allFactoriesSupportIntraSegmentSearch() {
for (AggregatorFactory factory : factories) {
if (factory.supportsIntraSegmentSearch() == false || factory.evaluateChildFactoriesForIntraSegment() == false) {
return false;
}
}
return true;
}

/**
* Create all aggregators so that they can be consumed with multiple
* buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,22 @@ protected boolean supportsConcurrentSegmentSearch() {
return false;
}

/**
* Implementation should override this method and return true if the Aggregator benefits from intra-segment search
* Default is false - aggregations must explicitly opt-in
*/
protected boolean supportsIntraSegmentSearch() {
return false;
}

public boolean evaluateChildFactories() {
return factories.allFactoriesSupportConcurrentSearch();
}

public boolean evaluateChildFactoriesForIntraSegment() {
return factories.allFactoriesSupportIntraSegmentSearch();
}

public AggregatorFactories getSubFactories() {
return factories;
}
Expand Down
Loading
Loading