Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d8a9424
Add streaming search with configurable scoring modes
atris Aug 25, 2025
3fe4f52
add javadocs
atris Aug 28, 2025
0b4d6a2
Fix changelog
atris Aug 28, 2025
f46f224
Fix forbidden APIs
atris Aug 28, 2025
9251736
Intermediate commit
atris Aug 30, 2025
ca8f639
Working intermediate commit
atris Aug 30, 2025
b8be3f6
Get streaming infra working
atris Aug 30, 2025
9cbdcbc
Phase 2
atris Aug 30, 2025
e967492
Intermediate commit
atris Aug 31, 2025
357848d
working commit
atris Sep 1, 2025
a30df2a
Working commit 2
atris Sep 2, 2025
c0d2a06
Cleanup
atris Sep 6, 2025
0e298e0
More cleanup
atris Sep 7, 2025
5061f09
Add streaming search with scoring using Hoeffding bounds
atris Sep 22, 2025
ac28752
Cleanup
atris Sep 24, 2025
3f78994
Add spotless output
atris Sep 24, 2025
9973de3
more cleanup
atris Sep 24, 2025
5edfe3c
Update per comments
atris Sep 27, 2025
6a4d92e
More cleanup
atris Sep 27, 2025
df7ad7b
Fix forbidden API issue
atris Sep 27, 2025
c084b56
Merge branch 'main' into streaming-scoring-clean
atris Oct 9, 2025
ad9c30d
Fix build issues
atris Oct 9, 2025
b4b16b0
More shenanigans
atris Oct 10, 2025
cbf228d
Remove confidence based streaming
atris Oct 11, 2025
dfcbbed
More cleanup
Oct 17, 2025
3d90216
Make spotless changes
Oct 17, 2025
dc5e1e8
Intermittent commit
atris Nov 4, 2025
938951f
4 to go
atris Nov 5, 2025
9233200
use global ordinals; fix per-leaf reset; enable under concurrent sear…
atris Nov 11, 2025
14f81b1
Fix more tests
atris Nov 11, 2025
a6a21b0
More tests fixes and cleanup
atris Nov 11, 2025
d751f73
Fix reindexing tests
atris Nov 20, 2025
cd0f276
Some tests pass
atris Jan 11, 2026
169bc33
Fix FlushModeResolver tests
atris Jan 11, 2026
1da8e37
Yet more fixes
atris Jan 12, 2026
b202193
Cleanup
atris Jan 12, 2026
4ce9a97
Merge remote-tracking branch 'origin/main' into streaming-scoring-clean
atris Jan 12, 2026
b4ef9e8
Fix compilation errors and merge conflicts after upstream merge
atris Jan 12, 2026
f851389
Spotless clean up
atris Jan 13, 2026
ae1b22f
Remove forbidden APIs
atris Jan 13, 2026
c967cbb
Sigh more test fixes
atris Jan 16, 2026
aceb756
More fixes
atris Jan 16, 2026
774588a
Spotless fixes
atris Jan 16, 2026
ae6c911
Yet more fixes
atris Jan 19, 2026
d0510cd
More cleanup
atris Jan 19, 2026
5352427
Miscellaneous refactoring
atris Jan 20, 2026
201110a
More refactor
atris Jan 21, 2026
3e1079a
Explicitly set partial to true
atris Jan 21, 2026
2fbd384
Revert silent drop of partial packets
atris Jan 21, 2026
edeabe5
Spotless changes
atris Jan 22, 2026
c204273
Merge branch 'main' into streaming-scoring-clean
atris Jan 22, 2026
5c095cf
Cleanup
atris Jan 22, 2026
6af9270
Fix serialization issue
atris Jan 22, 2026
87a3712
Streaming multiple partial results at coordinator and test fixes
Jan 28, 2026
b2f1903
Merge branch 'main' into streaming-scoring-clean
atris Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -383,6 +384,63 @@ public void testMissingSources() {
assertThat(response, matcher().deleted(0).slices(hasSize(0)));
}

/**
* Regression test to ensure delete-by-query works correctly even when streaming search is enabled.
* Since delete-by-query uses scroll searches, it should automatically opt out of the streaming pipeline.
*/
public void testDeleteByQueryWithStreamingEnabled() throws Exception {
// Enable streaming search globally
Settings streamingSettings = Settings.builder().put(STREAM_SEARCH_ENABLED.getKey(), true).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(streamingSettings).get());

try {
// Index some test documents
indexRandom(
true,
client().prepareIndex("test").setId("1").setSource("foo", "delete_me"),
client().prepareIndex("test").setId("2").setSource("foo", "delete_me"),
client().prepareIndex("test").setId("3").setSource("foo", "keep_me")
);

assertHitCount(client().prepareSearch("test").setSize(0).get(), 3);

// Perform delete-by-query - this should work without "topDocs already consumed" errors
BulkByScrollResponse response = deleteByQuery().source("test").filter(termQuery("foo", "delete_me")).refresh(true).get();

// Verify the delete operation succeeded
assertThat(response, matcher().deleted(2));
assertHitCount(client().prepareSearch("test").setSize(0).get(), 1);

// Verify only the expected document remains
assertHitCount(client().prepareSearch("test").setQuery(termQuery("foo", "keep_me")).get(), 1);
} finally {
// Comprehensive cleanup: reset both transient and persistent streaming settings
Settings disableStreamingSettings = Settings.builder().putNull(STREAM_SEARCH_ENABLED.getKey()).build();
try {
// Clear transient settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(disableStreamingSettings).get());
// Clear persistent settings (in case they were set)
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(disableStreamingSettings).get());

// Assert the setting is cleared to prevent test pollution
assertFalse(
"Stream search should be disabled after cleanup",
client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getMetadata()
.transientSettings()
.getAsBoolean(STREAM_SEARCH_ENABLED.getKey(), false)
);
} catch (Exception cleanupException) {
// Log cleanup failures but don't fail the test
logger.warn("Failed to clean up streaming settings", cleanupException);
}
}
}

/** Enables or disables the cluster disk allocation decider **/
private void setDiskAllocationDeciderEnabled(boolean value) {
Settings settings = value
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.opensearch.arrow.flight.transport;

import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -40,10 +42,13 @@

/**
* Outbound handler for Arrow Flight streaming responses.
* It must invoke messageListener and relay any exception back to the caller and not supress them
* It must invoke messageListener and relay any exception back to the caller and
* not supress them
*
* @opensearch.internal
*/
class FlightOutboundHandler extends ProtocolOutboundHandler {
private static final Logger logger = LogManager.getLogger(FlightOutboundHandler.class);
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
private final String nodeName;
private final Version version;
Expand Down Expand Up @@ -157,6 +162,19 @@ private void processBatchTask(BatchTask task) {
try {
try (VectorStreamOutput out = new VectorStreamOutput(flightChannel.getAllocator(), flightChannel.getRoot())) {
task.response().writeTo(out);
if (task.response() instanceof org.opensearch.search.query.QuerySearchResult) {
logger.info(
"QuerySearchResult hasAggs: {}",
((org.opensearch.search.query.QuerySearchResult) task.response()).hasAggs()
);
}
logger.info(
"Sending batch for requestId [{}], action [{}], items [{}], rows [{}]",
task.requestId(),
task.action(),
task.response(),
out.getRoot().getRowCount()
);
flightChannel.sendBatch(getHeaderBuffer(task.requestId(), task.nodeVersion(), task.features()), out);
messageListener.onResponseSent(task.requestId(), task.action(), task.response());
}
Expand Down Expand Up @@ -292,7 +310,8 @@ public void setMessageListener(TransportMessageListener listener) {
}

private ByteBuffer getHeaderBuffer(long requestId, Version nodeVersion, Set<String> features) throws IOException {
// Just a way( probably inefficient) to serialize header to reuse existing logic present in
// Just a way( probably inefficient) to serialize header to reuse existing logic
// present in
// NativeOutboundMessage.Response#writeVariableHeader()
NativeOutboundMessage.Response headerMessage = new NativeOutboundMessage.Response(
threadPool.getThreadContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ public Map<String, Supplier<Transport>> getTransports(
return Collections.emptyMap();
}

@Override
public Map<String, Supplier<Transport>> getStreamTransports(
Settings settings,
ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService,
Tracer tracer
) {
// Return empty map since getTransports() already registers the FLIGHT transport
// The NetworkModule will find it via getStreamTransportSupplier() which looks in transportFactories
return Collections.emptyMap();
}

/**
* Gets the auxiliary transports for the FlightStream plugin.
* @param settings The settings for the plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.CORRELATION_ID_KEY;

/**
* Arrow Flight implementation of streaming transport responses.
*
* <p>
* Handles streaming responses from Arrow Flight servers with lazy batch
* processing.
* Headers are extracted when first accessed, and responses are deserialized on
* demand.

* Streaming transport response implementation using Arrow Flight.
* Manages Flight stream lifecycle with lazy initialization and prefetching support.
*/
Expand Down Expand Up @@ -167,6 +175,79 @@ public void close() {
} catch (IllegalStateException ignore) {} catch (Exception e) {
throw new StreamException(StreamErrorCode.INTERNAL, "Error closing flight stream", e);
}
flightStream.close();
} catch (IllegalStateException ignore) {
// this is fine if the allocator is already closed
} catch (Exception e) {
throw new StreamException(StreamErrorCode.INTERNAL, "Error while closing flight stream", e);
} finally {
isClosed = true;
}
}

public TransportResponseHandler<T> getHandler() {
return handler;
}

/**
* Initializes the stream by fetching the first batch to extract headers.
*/
private synchronized void initializeStreamIfNeeded() {
if (streamInitialized || streamExhausted) {
return;
}
long startTime = System.currentTimeMillis();
try {
if (flightStream.next()) {
currentRoot = flightStream.getRoot();
currentHeader = headerContext.getHeader(correlationId);
// Capture the batch size before deserialization
currentBatchSize = FlightUtils.calculateVectorSchemaRootSize(currentRoot);
streamInitialized = true;
} else {
streamExhausted = true;
}
} catch (FlightRuntimeException e) {
// TODO maybe add a check - handshake and validate if node is connected
// Try to get headers even if stream failed
currentHeader = headerContext.getHeader(correlationId);
streamExhausted = true;
initializationException = FlightErrorMapper.fromFlightException(e);
logger.warn("Stream initialization failed", e);
} catch (Exception e) {
// Try to get headers even if stream failed
currentHeader = headerContext.getHeader(correlationId);
streamExhausted = true;
initializationException = new StreamException(StreamErrorCode.INTERNAL, "Stream initialization failed", e);
logger.warn("Stream initialization failed", e);
} finally {
logSlowOperation(startTime);
}
}

private T deserializeResponse() {
try (VectorStreamInput input = new VectorStreamInput(currentRoot, namedWriteableRegistry)) {
T response = handler.read(input);
if (response instanceof org.opensearch.search.query.QuerySearchResult) {
logger.info("Received QuerySearchResult hasAggs: {}", ((org.opensearch.search.query.QuerySearchResult) response).hasAggs());
}
return response;
} catch (IOException e) {
throw new StreamException(StreamErrorCode.INTERNAL, "Failed to deserialize response", e);
}
}

private void ensureOpen() {
if (isClosed) {
throw new StreamException(StreamErrorCode.UNAVAILABLE, "Stream is closed");
}
}

private void logSlowOperation(long startTime) {
long took = System.currentTimeMillis() - startTime;
long thresholdMs = config.getSlowLogThreshold().millis();
if (took > thresholdMs) {
logger.warn("Flight stream next() took [{}ms], exceeding threshold [{}ms]", took, thresholdMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.action.search.StreamTransportSearchAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.TransportStreamSearchAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -741,9 +741,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
}
actions.register(StreamSearchAction.INSTANCE, TransportStreamSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

/**
* Marker interface for ActionListeners that can receive the first partial response
* in streaming preview-first mode. This is a temporary interface for OSB benchmarking.
*
* @opensearch.internal
*/
public interface PreviewFirstPartialReceiver {

/**
* Called when the first partial response is available.
* Implementation should send the response and cancel the search task.
*
* @param partial the first partial search response
*/
void onPartialResponse(SearchResponse partial);
}
Loading
Loading