Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions modules/parquet-data-format/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ dependencies {
implementation project(':libs:opensearch-vectorized-exec-spi')

// Apache Arrow dependencies (using stable version with unsafe allocator)
implementation 'org.apache.arrow:arrow-vector:17.0.0'
implementation 'org.apache.arrow:arrow-memory-core:17.0.0'
implementation 'org.apache.arrow:arrow-memory-unsafe:17.0.0'
implementation 'org.apache.arrow:arrow-format:17.0.0'
implementation 'org.apache.arrow:arrow-c-data:17.0.0'
implementation 'org.apache.arrow:arrow-vector:18.3.0'
implementation 'org.apache.arrow:arrow-memory-core:18.3.0'
implementation 'org.apache.arrow:arrow-memory-unsafe:18.3.0'
implementation 'org.apache.arrow:arrow-format:18.3.0'
implementation 'org.apache.arrow:arrow-c-data:18.3.0'

// Checker Framework annotations (required by Arrow)
implementation 'org.checkerframework:checker-qual:3.42.0'
Expand Down
10 changes: 5 additions & 5 deletions plugins/engine-datafusion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ dependencies {
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// Apache Arrow dependencies for memory management
implementation "org.apache.arrow:arrow-memory-core:17.0.0"
implementation "org.apache.arrow:arrow-memory-unsafe:17.0.0"
implementation "org.apache.arrow:arrow-vector:17.0.0"
implementation "org.apache.arrow:arrow-c-data:17.0.0"
implementation "org.apache.arrow:arrow-format:17.0.0"
implementation "org.apache.arrow:arrow-memory-core:18.3.0"
implementation "org.apache.arrow:arrow-memory-unsafe:18.3.0"
implementation "org.apache.arrow:arrow-vector:18.3.0"
implementation "org.apache.arrow:arrow-c-data:18.3.0"
implementation "org.apache.arrow:arrow-format:18.3.0"
// SLF4J API for Arrow logging compatibility
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
// CheckerFramework annotations required by Arrow 17.0.0
Expand Down
45 changes: 43 additions & 2 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use jni::objects::JLongArray;
use jni::sys::{jboolean, jbyteArray, jint, jlong, jstring};
use jni::{JNIEnv, JavaVM};
use std::sync::{Arc, OnceLock};
use arrow_array::{Array, StructArray};
use arrow_array::{Array, RecordBatch, StructArray};
use arrow_array::ffi::FFI_ArrowArray;
use arrow_schema::ffi::FFI_ArrowSchema;
use datafusion::{
Expand Down Expand Up @@ -351,7 +351,49 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_getVersio
.as_raw()
}

/// Test JNI method to verify FFI boundary handling of sliced arrays.
/// Creates a sliced StringArray (simulating `head X from Y`) and returns FFI pointers.
#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createTestSlicedArray(
mut env: JNIEnv,
_class: JClass,
offset: jint,
length: jint,
listener: JObject,
) {
use arrow_schema::{Schema, Field, DataType};
use arrow_array::StringArray;

let original = StringArray::from(vec!["zero", "one", "two", "three", "four"]);
let sliced = original.slice(offset as usize, length as usize);

let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, false)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(sliced)]).unwrap();

let struct_array: StructArray = batch.into();
let array_data = struct_array.to_data();

let ffi_schema = FFI_ArrowSchema::try_from(array_data.data_type()).unwrap();
let schema_ptr = Box::into_raw(Box::new(ffi_schema)) as i64;

let ffi_array = FFI_ArrowArray::new(&array_data);
let array_ptr = Box::into_raw(Box::new(ffi_array)) as i64;

let result = env.new_long_array(2).unwrap();
env.set_long_array_region(&result, 0, &[schema_ptr, array_ptr]).unwrap();

let listener_class = env.get_object_class(&listener).unwrap();
let on_response = env.get_method_id(&listener_class, "onResponse", "(Ljava/lang/Object;)V").unwrap();

unsafe {
env.call_method_unchecked(
&listener,
on_response,
jni::signature::ReturnType::Primitive(jni::signature::Primitive::Void),
&[jni::objects::JValue::Object(&result).as_jni()]
).unwrap();
}
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createDatafusionReader(
Expand Down Expand Up @@ -651,7 +693,6 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_fetchSegm
});
}


#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNext(
mut env: JNIEnv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ private NativeBridge() {}

// Other methods
public static native String getVersionInfo();

/**
* Test method: Creates a sliced StringArray and returns FFI pointers.
* Used to verify that sliced arrays across FFI boundary are handled correctly
**/
public static native void createTestSlicedArray(int offset, int length, ActionListener<long[]> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.datafusion;

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.opensearch.core.action.ActionListener;
import org.opensearch.datafusion.jni.NativeBridge;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* Tests Arrow FFI boundary between Rust and Java.
* Verifies that sliced arrays (from LIMIT OFFSET queries) work correctly
* with Arrow 18.3.0's FFI handling.
*
* Source array: ["zero", "one", "two", "three", "four"]
*/
public class ArrowFFIBoundaryTests extends OpenSearchTestCase {

// head 2 from 1 - skip first, take 2
public void testSliceOffset1Length2() throws Exception {
assertSlicedArray(1, 2, new String[]{"one", "two"});
}

// head 1 from 0 - no offset (baseline)
public void testSliceOffset0Length1() throws Exception {
assertSlicedArray(0, 1, new String[]{"zero"});
}

// head 2 from 3 - larger offset
public void testSliceOffset3Length2() throws Exception {
assertSlicedArray(3, 2, new String[]{"three", "four"});
}

// head 1 from 4 - last element only
public void testSliceOffset4Length1() throws Exception {
assertSlicedArray(4, 1, new String[]{"four"});
}

// head 3 from 1 - middle section
public void testSliceOffset1Length3() throws Exception {
assertSlicedArray(1, 3, new String[]{"one", "two", "three"});
}

private void assertSlicedArray(int offset, int length, String[] expected) throws Exception {
CompletableFuture<long[]> future = new CompletableFuture<>();

NativeBridge.createTestSlicedArray(offset, length, new ActionListener<long[]>() {
@Override
public void onResponse(long[] pointers) {
future.complete(pointers);
}

@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
}
});

long[] pointers = future.get(10, TimeUnit.SECONDS);

try (BufferAllocator allocator = new RootAllocator();
ArrowSchema arrowSchema = ArrowSchema.wrap(pointers[0]);
ArrowArray arrowArray = ArrowArray.wrap(pointers[1])) {

try (VectorSchemaRoot root = Data.importVectorSchemaRoot(allocator, arrowArray, arrowSchema, null)) {
assertEquals("Row count mismatch", expected.length, root.getRowCount());

VarCharVector dataVector = (VarCharVector) root.getVector("data");
for (int i = 0; i < expected.length; i++) {
assertEquals("Value mismatch at index " + i, expected[i], new String(dataVector.get(i)));
}
}
}
}
}
Loading