Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.opensearch.transport.grpc.spi;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;

import java.util.List;
Expand All @@ -20,6 +21,12 @@
*/
public interface GrpcInterceptorProvider {

/**
* Provide visibility into node settings.
* @param settings for use in interceptors.
*/
default void initNodeSettings(Settings settings) {}

/**
* Returns a list of ordered gRPC interceptors with access to ThreadContext.
* Each interceptor must have a unique order value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public Collection<Object> createComponents(
// Then add plugin-provided interceptors
if (!interceptorProviders.isEmpty()) {
for (GrpcInterceptorProvider provider : interceptorProviders) {
provider.initNodeSettings(environment.settings());
orderedList.addAll(provider.getOrderedGrpcInterceptors(threadPool.getThreadContext()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.env.Environment;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.SecureAuxTransportSettingsProvider;
import org.opensearch.protobufs.QueryContainer;
Expand Down Expand Up @@ -83,6 +84,9 @@ public class GrpcPluginTests extends OpenSearchTestCase {
@Mock
private Client client;

@Mock
private Environment environment;

private NetworkService networkService;

private ClusterSettings clusterSettings;
Expand All @@ -102,8 +106,9 @@ public void setup() {
// Create a real ClusterSettings instance with the plugin's settings
plugin = new GrpcPlugin();

// Mock ThreadPool and ThreadContext
// Mock ThreadPool/ThreadContext/Environment
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(environment.settings()).thenReturn(Settings.EMPTY);

// Set the client in the plugin
plugin.createComponents(
Expand All @@ -113,7 +118,7 @@ public void setup() {
null, // ResourceWatcherService
null, // ScriptService
null, // NamedXContentRegistry
null, // Environment
environment, // Environment
null, // NodeEnvironment
null, // NamedWriteableRegistry
null, // IndexNameExpressionResolver
Expand Down Expand Up @@ -448,12 +453,11 @@ public void testLoadExtensionsWithDuplicateGrpcInterceptorOrder() {

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, null, null, null, null, null)
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, environment, null, null, null, null)
);

String errorMessage = exception.getMessage();
assertTrue(errorMessage.contains("Multiple gRPC interceptors have the same order value [1]"));
assertTrue(errorMessage.contains("ServerInterceptor")); // Mock class name will contain this
assertTrue(errorMessage.contains("Each interceptor must have a unique order value"));
}

Expand All @@ -470,12 +474,11 @@ public void testLoadExtensionsWithMultipleProvidersAndDuplicateOrder() {

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, null, null, null, null, null)
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, environment, null, null, null, null)
);

String errorMessage = exception.getMessage();
assertTrue(errorMessage.contains("Multiple gRPC interceptors have the same order value [5]"));
assertTrue(errorMessage.contains("ServerInterceptor"));
assertTrue(errorMessage.contains("Each interceptor must have a unique order value"));
}

Expand All @@ -498,12 +501,11 @@ public void testLoadExtensionsWithSameExplicitOrderInterceptors() {

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, null, null, null, null, null)
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, environment, null, null, null, null)
);

String errorMessage = exception.getMessage();
assertTrue(errorMessage.contains("Multiple gRPC interceptors have the same order value [5]"));
assertTrue(errorMessage.contains("ServerInterceptor"));
assertTrue(errorMessage.contains("Each interceptor must have a unique order value"));
}

Expand Down Expand Up @@ -742,13 +744,31 @@ private void testRequestProcessingWithException(
}

/**
* Creates a mock interceptor with given order
* Creates a no-op interceptor with the specified order.
*/
private OrderedGrpcInterceptor createMockInterceptor(int order) {
OrderedGrpcInterceptor mock = Mockito.mock(OrderedGrpcInterceptor.class);
when(mock.order()).thenReturn(order);
when(mock.getInterceptor()).thenReturn(Mockito.mock(ServerInterceptor.class));
return mock;
return new OrderedGrpcInterceptor() {

@Override
public int order() {
return order;
}

@Override
public ServerInterceptor getInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
// no-op interceptor
return next.startCall(call, headers);
}
};
}
};
}

private void assertDoesNotThrow(Runnable runnable) {
Expand Down Expand Up @@ -901,7 +921,9 @@ public void testGrpcInterceptorChainIntegrationWithPlugin() {
ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class);
when(mockThreadPool.getThreadContext()).thenReturn(new org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY));

assertDoesNotThrow(() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, null, null, null, null, null));
assertDoesNotThrow(
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, environment, null, null, null, null)
);
}

public void testGrpcInterceptorChainWithDuplicateOrders() {
Expand Down Expand Up @@ -929,7 +951,7 @@ public void testGrpcInterceptorChainWithDuplicateOrders() {
// Should throw exception due to duplicate orders during createComponents
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, null, null, null, null, null)
() -> plugin.createComponents(client, null, mockThreadPool, null, null, null, environment, null, null, null, null)
);

// Verify error message includes order value and interceptor class names
Expand Down Expand Up @@ -984,6 +1006,55 @@ private void testGrpcInterceptorChain(List<OrderedGrpcInterceptor> interceptors,
}
}

public void testGrpcInterceptorProviderSettingsInitialization() {
// Mock extension loading for GrpcPlugin
TestSettingsAwareInterceptorProvider provider = new TestSettingsAwareInterceptorProvider();
ExtensiblePlugin.ExtensionLoader mockLoader = Mockito.mock(ExtensiblePlugin.ExtensionLoader.class);
when(mockLoader.loadExtensions(QueryBuilderProtoConverter.class)).thenReturn(null);
when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(List.of(provider));
GrpcPlugin plugin = new GrpcPlugin();
plugin.loadExtensions(mockLoader);

// Mock Environments
Settings validSetting = Settings.builder().put("test-setting", true).build();
Environment validEnv = Mockito.mock(Environment.class);
when(validEnv.settings()).thenReturn(validSetting);

Settings invalidSetting = Settings.builder().put("test-setting", false).build();
Environment invalidEnv = Mockito.mock(Environment.class);
when(invalidEnv.settings()).thenReturn(invalidSetting);

Settings emptySetting = Settings.builder().build();
Environment emptyEnv = Mockito.mock(Environment.class);
when(emptyEnv.settings()).thenReturn(emptySetting);

// createComponents initializes interceptor with the correct setting
assertDoesNotThrow(() -> plugin.createComponents(client, null, threadPool, null, null, null, validEnv, null, null, null, null));

// createComponents throws exception with the incorrect setting
try {
plugin.createComponents(client, null, threadPool, null, null, null, invalidEnv, null, null, null, null);
fail("Expect test interceptor with wrong settings throws exception.");
} catch (RuntimeException e) {
assertEquals("test-setting not found or not set to true", e.getMessage());
}

// createComponents throws exception with empty setting
try {
plugin.createComponents(client, null, threadPool, null, null, null, emptyEnv, null, null, null, null);
fail("Expect test interceptor with empty settings throws exception.");
} catch (RuntimeException e) {
assertEquals("test-setting not found or not set to true", e.getMessage());
}
}

public void testGrpcInterceptorProviderEmpty() {
GrpcInterceptorProvider prov = threadContext -> List.of();
assertDoesNotThrow(() -> prov.initNodeSettings(Settings.EMPTY));
List<OrderedGrpcInterceptor> interceptors = prov.getOrderedGrpcInterceptors(new ThreadContext(Settings.EMPTY));
assertTrue(interceptors.isEmpty());
}

/**
* Creates a test interceptor that can succeed or fail
*/
Expand Down Expand Up @@ -1040,4 +1111,45 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
};
}

/**
* Test interceptor provider that validates GrpcInterceptorProvider's access to settings.
* TestSettingsAwareInterceptorProvider will throw an exception if setting "test-setting" is not true.
*/
private static class TestSettingsAwareInterceptorProvider implements GrpcInterceptorProvider {
private Settings settings;

@Override
public void initNodeSettings(Settings settings) {
this.settings = settings;
}

@Override
public List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
if (settings == null || !settings.getAsBoolean("test-setting", false)) {
throw new RuntimeException("test-setting not found or not set to true");
}

return List.of(new OrderedGrpcInterceptor() {
@Override
public int order() {
return 100;
}

@Override
public ServerInterceptor getInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
// No-op interceptor - just pass through
return next.startCall(call, headers);
}
};
}
});
}
}
}
Loading