Skip to content

Native Arrow transport path with zero-copy transfer (For reference purpose only)#21253

Draft
rishabhmaurya wants to merge 1 commit intoopensearch-project:mainfrom
rishabhmaurya:native-arrow-transport-path
Draft

Native Arrow transport path with zero-copy transfer (For reference purpose only)#21253
rishabhmaurya wants to merge 1 commit intoopensearch-project:mainfrom
rishabhmaurya:native-arrow-transport-path

Conversation

@rishabhmaurya
Copy link
Copy Markdown
Contributor

Context

Alternative approach to #21240. Adds a zero-serialization path for Arrow data in the Flight transport without core server changes.

Summary

Send side

  • ArrowBatchResponse — abstract base for native Arrow responses. writeTo() is a final no-op. Framework detects this and does zero-copy transferTo() — moving buffer pointers from producer vectors into the channel's shared root on the executor thread.

Receive side

  • VectorStreamInput.getRoot()ArrowBatchResponse(StreamInput in) calls this to access typed vectors directly. No factory selection needed; the handler decides which methods to call.

Allocator access

  • ArrowFlightChannel — public interface with getAllocator() and from(channel) unwrap utility. Implemented by FlightTransportChannel and FlightServerChannel.

Buffer management

Java owns all memory via the channel's allocator. Producers create VectorSchemaRoot from this allocator. The framework creates a shared root bound to Flight via start(). Each batch is zero-copy transferred into the shared root before putNext(). Supports pipelined production — each batch has independent buffers, executor drains serially.

Key design decisions

  • No core server changes — everything within arrow-flight-rpc plugin
  • VectorStreamOutput refactored to abstract with ByteSerialized (unchanged byte path) and NativeArrow (no-op writes) implementations
  • Zero-copy via TransferPair.transfer() — buffer pointer swap, no memcpy
  • Pipelining safe — producers can queue batches ahead, each with independent buffers

Test plan

  • NativeArrowTransportIT — single batch, serial multi-batch, and parallel 100-batch (5 producer threads) with data integrity verification
  • NativeArrowStreamTransportExampleIT — example plugin demonstrating the API
  • Existing byte-path tests unaffected

Design doc

See plugins/arrow-flight-rpc/docs/native-arrow-transport-design.md

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 17, 2026

PR Reviewer Guide 🔍

(Review updated until commit ac83ec6)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Native Arrow zero-copy transport core implementation and tests

Relevant files:

  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowBatchResponse.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowFlightChannel.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamInput.java
  • plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/NativeArrowTransportIT.java
  • plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.java
  • plugins/arrow-flight-rpc/docs/native-arrow-transport-design.md

Sub-PR theme: Example plugin demonstrating native Arrow stream transport API

Relevant files:

  • plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/NativeArrowStreamDataAction.java
  • plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/NativeArrowStreamDataRequest.java
  • plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/NativeArrowStreamDataResponse.java
  • plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportNativeArrowStreamDataAction.java
  • plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java
  • plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/NativeArrowStreamTransportExampleIT.java

⚡ Recommended focus areas for review

Memory Leak

The producerRoot obtained in the send-side constructor is never closed after transferTo() completes. After transfer, the producer root's buffers are moved but the root itself (and its allocator) may still hold resources. There is no close() or lifecycle management for producerRoot in this class.

protected ArrowBatchResponse(VectorSchemaRoot producerRoot) {
    this.producerRoot = producerRoot;
}

/**
 * Deserializes a response from a StreamInput (receive side).
 * @param in the stream input containing the Arrow root
 * @throws IOException if deserialization fails
 */
protected ArrowBatchResponse(StreamInput in) throws IOException {
    super(in);
    this.producerRoot = ((VectorStreamInput) in).getRoot();
}

/**
 * Returns the producer's root. On the send side, this is the root populated
 * by the producer. On the receive side, this is the root from the Flight stream.
 */
public VectorSchemaRoot getRoot() {
    return producerRoot;
}

/**
 * Zero-copy transfers the producer's vectors into the target root.
 * Called by the framework on the executor thread before {@code putNext()}.
 * After transfer, the producer's buffers are moved to the target — the producer
 * root becomes empty.
 *
 * @param target the channel's shared root (bound to the Flight stream via start())
 */
void transferTo(VectorSchemaRoot target) {
    List<FieldVector> sourceVectors = producerRoot.getFieldVectors();
    List<FieldVector> targetVectors = target.getFieldVectors();
    for (int i = 0; i < sourceVectors.size(); i++) {
        TransferPair transfer = sourceVectors.get(i).makeTransferPair(targetVectors.get(i));
        transfer.transfer();
    }
    target.setRowCount(producerRoot.getRowCount());
}
Shared Root Leak

When the first batch arrives, a new VectorSchemaRoot is created via VectorSchemaRoot.create(...) and assigned to sharedRoot, but this root is never stored back into flightChannel (there is no flightChannel.setRoot(sharedRoot) call visible). If flightChannel.getRoot() returns null on every call, a new root is created for every batch, leaking memory. The logic for persisting the newly created shared root needs to be verified.

if (task.response() instanceof ArrowBatchResponse arrowResponse) {
    // Native Arrow path: zero-copy transfer producer's vectors into shared root
    VectorSchemaRoot sharedRoot = flightChannel.getRoot();
    if (sharedRoot == null) {
        // First batch: create the shared root with the same schema
        sharedRoot = VectorSchemaRoot.create(arrowResponse.getRoot().getSchema(), flightChannel.getAllocator());
    }
    arrowResponse.transferTo(sharedRoot);
    out = VectorStreamOutput.forNativeArrow(sharedRoot);
} else {
    out = VectorStreamOutput.create(flightChannel.getAllocator(), flightChannel.getRoot());
    task.response().writeTo(out);
}
Unsafe Cast

getAllocator() unconditionally casts getChannel() to FlightServerChannel. If the underlying TcpChannel is not a FlightServerChannel (e.g., in tests or future refactors), this will throw a ClassCastException at runtime with no descriptive error. A type check with a meaningful exception should be added.

@Override
public BufferAllocator getAllocator() {
    return ((FlightServerChannel) getChannel()).getAllocator();
}
Resource Leak

In handleStreamRequest, the parallel production path creates an ExecutorService via Executors.newFixedThreadPool() but only calls producers.shutdown() after producersDone.await(). If an exception is thrown before shutdown() (e.g., timeout on queue.poll), the thread pool is never shut down, leaking threads. A try-finally block should ensure shutdown.

    ExecutorService producers = Executors.newFixedThreadPool(request.parallelism);

    for (int batch = 0; batch < request.batchCount; batch++) {
        final int batchIndex = batch;
        producers.submit(() -> {
            try {
                VectorSchemaRoot root = createBatch(channelAllocator, batchIndex, request.rowsPerBatch);
                queue.put(new TestArrowResponse(root));
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                producersDone.countDown();
            }
        });
    }

    // Drain: send batches as they become available
    int sent = 0;
    while (sent < request.batchCount) {
        TestArrowResponse response = queue.poll(10, TimeUnit.SECONDS);
        if (response == null) throw new IOException("Timed out waiting for producer");
        channel.sendResponseBatch(response);
        sent++;
    }

    producersDone.await(30, TimeUnit.SECONDS);
    producers.shutdown();
}
Null Vector Risk

The vector field is assigned by casting root.getVector("0") to VarBinaryVector. For the native Arrow path, the root may not contain a column named "0", causing getVector("0") to return null. The null check added in close() mitigates a crash there, but readByte() and other read methods will still NPE if called on a native Arrow VectorStreamInput.

    vector = (VarBinaryVector) root.getVector("0");
    this.registry = registry;
}

@rishabhmaurya rishabhmaurya force-pushed the native-arrow-transport-path branch from 28617d9 to 7d506e8 Compare April 17, 2026 01:45
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 17, 2026

PR Code Suggestions ✨

Latest suggestions up to ac83ec6

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Close producer root after zero-copy transfer

After transferTo completes, producerRoot is no longer needed (its buffers have been
moved to target), but it is never closed. The empty producerRoot and its allocator
will leak unless explicitly closed. The method should close producerRoot after the
transfer.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowBatchResponse.java [87-95]

 void transferTo(VectorSchemaRoot target) {
     List<FieldVector> sourceVectors = producerRoot.getFieldVectors();
     List<FieldVector> targetVectors = target.getFieldVectors();
     for (int i = 0; i < sourceVectors.size(); i++) {
         TransferPair transfer = sourceVectors.get(i).makeTransferPair(targetVectors.get(i));
         transfer.transfer();
     }
     target.setRowCount(producerRoot.getRowCount());
+    producerRoot.close();
 }
Suggestion importance[1-10]: 8

__

Why: After transfer(), the producerRoot's buffers are moved to target but the empty producerRoot container and its allocator are never closed, causing a memory leak. Closing producerRoot after transfer is a critical resource management fix.

Medium
Persist newly created shared root to avoid memory leaks

The newly created sharedRoot is never stored back into flightChannel, so on every
subsequent batch flightChannel.getRoot() will still return null and a new root will
be created each time, leaking memory. The created root should be set on
flightChannel (e.g., via a setRoot(VectorSchemaRoot) method) so it is reused across
batches and properly closed when the channel is done.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [158-164]

 VectorSchemaRoot sharedRoot = flightChannel.getRoot();
 if (sharedRoot == null) {
-    // First batch: create the shared root with the same schema
+    // First batch: create the shared root with the same schema and store it
     sharedRoot = VectorSchemaRoot.create(arrowResponse.getRoot().getSchema(), flightChannel.getAllocator());
+    flightChannel.setRoot(sharedRoot);
 }
 arrowResponse.transferTo(sharedRoot);
 out = VectorStreamOutput.forNativeArrow(sharedRoot);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the newly created sharedRoot is never stored back into flightChannel, meaning a new root would be created for every batch. However, it's unclear whether FlightServerChannel already has a setRoot method or if this is a new API that needs to be added, making the improved_code potentially incomplete.

Medium
Propagate producer thread errors during drain

If a producer thread throws an exception, producersDone.countDown() is still called
(via finally), but the failed batch is never added to the queue. The drain loop will
then block until timeout waiting for a batch that will never arrive, causing a
misleading IOException("Timed out waiting for producer") instead of propagating the
real error. An AtomicReference should be used to capture producer failures and
checked in the drain loop.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/NativeArrowTransportIT.java [295-321]

 ExecutorService producers = Executors.newFixedThreadPool(request.parallelism);
+AtomicReference<Exception> producerError = new AtomicReference<>();
 
 for (int batch = 0; batch < request.batchCount; batch++) {
     final int batchIndex = batch;
     producers.submit(() -> {
         try {
             VectorSchemaRoot root = createBatch(channelAllocator, batchIndex, request.rowsPerBatch);
             queue.put(new TestArrowResponse(root));
         } catch (Exception e) {
+            producerError.compareAndSet(null, e);
             throw new RuntimeException(e);
         } finally {
             producersDone.countDown();
         }
     });
 }
 
 // Drain: send batches as they become available
 int sent = 0;
 while (sent < request.batchCount) {
     TestArrowResponse response = queue.poll(10, TimeUnit.SECONDS);
-    if (response == null) throw new IOException("Timed out waiting for producer");
+    if (response == null) {
+        Exception err = producerError.get();
+        throw new IOException("Timed out waiting for producer" + (err != null ? ": " + err.getMessage() : ""), err);
+    }
     channel.sendResponseBatch(response);
     sent++;
 }
 
 producersDone.await(30, TimeUnit.SECONDS);
 producers.shutdown();
Suggestion importance[1-10]: 5

__

Why: This is a valid improvement for test code — producer failures would cause a misleading timeout error rather than the actual exception. However, since this is test/integration code rather than production code, the impact is limited to test debuggability.

Low
General
Guard unsafe cast with explicit type check

The cast to FlightServerChannel will throw a ClassCastException at runtime if
getChannel() returns a different TcpChannel implementation. A guard check should be
added to provide a clear error message instead of an opaque cast failure.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java [153-156]

 @Override
 public BufferAllocator getAllocator() {
-    return ((FlightServerChannel) getChannel()).getAllocator();
+    TcpChannel tcpChannel = getChannel();
+    if (!(tcpChannel instanceof FlightServerChannel)) {
+        throw new IllegalStateException("Expected FlightServerChannel but got: " + tcpChannel.getClass().getName());
+    }
+    return ((FlightServerChannel) tcpChannel).getAllocator();
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion is valid — an unguarded cast to FlightServerChannel could throw an opaque ClassCastException. Adding an explicit check improves error clarity, though in practice FlightTransportChannel is always backed by a FlightServerChannel by design.

Low

Previous suggestions

Suggestions up to commit 7d506e8
CategorySuggestion                                                                                                                                    Impact
Possible issue
Persist newly created shared root to avoid leaks

The newly created sharedRoot is never stored back into flightChannel, so on every
subsequent batch flightChannel.getRoot() will still return null and a new root will
be created each time — leaking the previous one. The created root must be set on
flightChannel (e.g., via a setRoot(VectorSchemaRoot) method) so it is reused and
properly closed across batches.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [158-164]

 VectorSchemaRoot sharedRoot = flightChannel.getRoot();
 if (sharedRoot == null) {
     // First batch: create the shared root with the same schema
     sharedRoot = VectorSchemaRoot.create(arrowResponse.getRoot().getSchema(), flightChannel.getAllocator());
+    flightChannel.setRoot(sharedRoot);
 }
 arrowResponse.transferTo(sharedRoot);
 out = VectorStreamOutput.forNativeArrow(sharedRoot);
Suggestion importance[1-10]: 8

__

Why: The sharedRoot is created on the first batch but never stored back into flightChannel, so every subsequent batch will create a new root and leak the previous one. This is a real memory leak bug that would affect multi-batch streams.

Medium
Close producer root after zero-copy transfer

After transferTo completes, producerRoot is left open (its vectors are now empty but
the root itself is not closed). This leaks the VectorSchemaRoot and its allocator.
The producer root should be closed after the transfer to release its resources.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowBatchResponse.java [87-95]

 void transferTo(VectorSchemaRoot target) {
     List<FieldVector> sourceVectors = producerRoot.getFieldVectors();
     List<FieldVector> targetVectors = target.getFieldVectors();
     for (int i = 0; i < sourceVectors.size(); i++) {
         TransferPair transfer = sourceVectors.get(i).makeTransferPair(targetVectors.get(i));
         transfer.transfer();
     }
     target.setRowCount(producerRoot.getRowCount());
+    producerRoot.close();
 }
Suggestion importance[1-10]: 7

__

Why: After transferTo, the producerRoot is not closed, which leaks the VectorSchemaRoot and its allocator resources. Closing it after transfer is important for correct Arrow memory management, especially in high-throughput streaming scenarios.

Medium
Propagate producer errors during parallel batch drain

If a producer task throws an exception, producersDone.countDown() is still called
(via finally), but the failed batch is never added to the queue. The drain loop will
then block until timeout waiting for a batch that will never arrive, causing a
misleading IOException("Timed out waiting for producer") instead of propagating the
real error. Track producer failures with an AtomicReference and check it in the
drain loop.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/NativeArrowTransportIT.java [295-321]

+AtomicReference<Exception> producerError = new AtomicReference<>();
 ExecutorService producers = Executors.newFixedThreadPool(request.parallelism);
 
 for (int batch = 0; batch < request.batchCount; batch++) {
     final int batchIndex = batch;
     producers.submit(() -> {
         try {
             VectorSchemaRoot root = createBatch(channelAllocator, batchIndex, request.rowsPerBatch);
             queue.put(new TestArrowResponse(root));
         } catch (Exception e) {
+            producerError.compareAndSet(null, e);
             throw new RuntimeException(e);
         } finally {
             producersDone.countDown();
         }
     });
 }
 
 // Drain: send batches as they become available
 int sent = 0;
 while (sent < request.batchCount) {
     TestArrowResponse response = queue.poll(10, TimeUnit.SECONDS);
-    if (response == null) throw new IOException("Timed out waiting for producer");
+    if (response == null) {
+        Exception err = producerError.get();
+        throw new IOException("Timed out waiting for producer" + (err != null ? ": " + err : ""), err);
+    }
     channel.sendResponseBatch(response);
     sent++;
 }
 
 producersDone.await(30, TimeUnit.SECONDS);
 producers.shutdown();
Suggestion importance[1-10]: 5

__

Why: This is a valid improvement for the test code — when a producer fails, the drain loop will timeout with a misleading error. However, since this is test code (not production), the impact is limited to test diagnostics rather than correctness.

Low
General
Guard unsafe cast with descriptive error

The cast to FlightServerChannel is unchecked and will throw a ClassCastException at
runtime if the underlying TcpChannel is not a FlightServerChannel. A guard with a
descriptive error message would make failures easier to diagnose.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java [153-156]

 @Override
 public BufferAllocator getAllocator() {
-    return ((FlightServerChannel) getChannel()).getAllocator();
+    TcpChannel tcpChannel = getChannel();
+    if (!(tcpChannel instanceof FlightServerChannel)) {
+        throw new IllegalStateException(
+            "Expected FlightServerChannel but got: " + tcpChannel.getClass().getName()
+        );
+    }
+    return ((FlightServerChannel) tcpChannel).getAllocator();
 }
Suggestion importance[1-10]: 4

__

Why: The unchecked cast to FlightServerChannel could throw a ClassCastException at runtime, but since FlightTransportChannel is always backed by a FlightServerChannel in the current architecture, this is a defensive improvement rather than a critical fix.

Low
Suggestions up to commit 28617d9
CategorySuggestion                                                                                                                                    Impact
Possible issue
Persist shared root to avoid leaks and repeated creation

The newly created sharedRoot is never stored back into flightChannel, so every
subsequent batch will create a new root, leaking the previous one and breaking the
Flight start()/putNext() contract that requires a single bound root. The shared root
must be persisted on the channel after creation (e.g., via a setRoot() method on
FlightServerChannel) so subsequent batches reuse it.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [158-164]

 VectorSchemaRoot sharedRoot = flightChannel.getRoot();
 if (sharedRoot == null) {
-    // First batch: create the shared root with the same schema
+    // First batch: create the shared root with the same schema and persist it
     sharedRoot = VectorSchemaRoot.create(arrowResponse.getRoot().getSchema(), flightChannel.getAllocator());
+    flightChannel.setRoot(sharedRoot);
 }
 arrowResponse.transferTo(sharedRoot);
 out = VectorStreamOutput.forNativeArrow(sharedRoot);
Suggestion importance[1-10]: 8

__

Why: The sharedRoot is created on first batch but never stored back to flightChannel, so every subsequent batch would create a new root, leaking memory and violating the Flight start()/putNext() contract. This is a real bug that would cause memory leaks and incorrect behavior in multi-batch scenarios.

Medium
General
Ensure executor shutdown in finally block

producers.shutdown() is called after producersDone.await(), but if the await times
out or an exception is thrown earlier, the executor service is never shut down,
leaking threads. The shutdown should be placed in a finally block to guarantee
cleanup.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/NativeArrowTransportIT.java [320-321]

-producersDone.await(30, TimeUnit.SECONDS);
-producers.shutdown();
+try {
+    producersDone.await(30, TimeUnit.SECONDS);
+} finally {
+    producers.shutdown();
+}
Suggestion importance[1-10]: 5

__

Why: If producersDone.await() times out or an exception is thrown, producers.shutdown() is never called, leaking threads. Moving it to a finally block is a valid improvement for test reliability, though the impact is limited to test code.

Low
Guard against unexpected channel type cast

The cast to FlightServerChannel is unchecked and will throw a ClassCastException at
runtime if the underlying TcpChannel is not a FlightServerChannel. A guard or a more
descriptive exception should be added to fail fast with a clear error message.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java [153-156]

 @Override
 public BufferAllocator getAllocator() {
-    return ((FlightServerChannel) getChannel()).getAllocator();
+    TcpChannel tcpChannel = getChannel();
+    if (!(tcpChannel instanceof FlightServerChannel)) {
+        throw new IllegalStateException("Expected FlightServerChannel but got: " + tcpChannel.getClass().getName());
+    }
+    return ((FlightServerChannel) tcpChannel).getAllocator();
 }
Suggestion importance[1-10]: 5

__

Why: The unchecked cast to FlightServerChannel could throw a ClassCastException with an unhelpful message if the underlying channel is not a FlightServerChannel. Adding a guard with a descriptive error improves debuggability, though in practice this channel type is always used with FlightServerChannel.

Low
Validate schema compatibility before vector transfer

There is no validation that sourceVectors and targetVectors have the same size
before iterating. If the producer root's schema differs from the target root's
schema (e.g., due to a bug or schema mismatch), this will throw an
IndexOutOfBoundsException or silently transfer mismatched vectors. A size check
should be added to fail fast with a clear error.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowBatchResponse.java [87-95]

 void transferTo(VectorSchemaRoot target) {
     List<FieldVector> sourceVectors = producerRoot.getFieldVectors();
     List<FieldVector> targetVectors = target.getFieldVectors();
+    if (sourceVectors.size() != targetVectors.size()) {
+        throw new IllegalArgumentException(
+            "Schema mismatch: source has " + sourceVectors.size() + " vectors, target has " + targetVectors.size()
+        );
+    }
     for (int i = 0; i < sourceVectors.size(); i++) {
         TransferPair transfer = sourceVectors.get(i).makeTransferPair(targetVectors.get(i));
         transfer.transfer();
     }
     target.setRowCount(producerRoot.getRowCount());
 }
Suggestion importance[1-10]: 4

__

Why: Adding a size check before iterating over vectors would provide a clearer error message on schema mismatch, but in practice the shared root is created from the producer's schema so mismatches are unlikely. This is a minor defensive improvement.

Low

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7d506e8

@rishabhmaurya rishabhmaurya changed the title Native Arrow transport path with zero-copy transfer Native Arrow transport path with zero-copy transfer (For reference purpose only) Apr 17, 2026
Add zero-serialization path for Arrow data in the Flight transport.
When a response extends ArrowBatchResponse, the framework does zero-copy
transfer of typed Arrow vectors via the Flight stream - no byte
serialization. Java owns all buffer management through the channel
allocator. Supports pipelined batch production.

Signed-off-by: Rishabh Maurya <rishma@amazon.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
@rishabhmaurya rishabhmaurya force-pushed the native-arrow-transport-path branch from 7d506e8 to ac83ec6 Compare April 17, 2026 02:00
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ac83ec6

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ac83ec6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant