Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.lettuce.core.protocol.RedisHandshakeHandler;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.lettuce.core.resource.Transports;
import io.lettuce.core.resource.Transports.NativeTransports;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
Expand Down Expand Up @@ -256,22 +255,6 @@ protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, Conn
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}

protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {

LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");

boolean domainSocket = LettuceStrings.isNotEmpty(connectionPoint.getSocket());
connectionBuilder.bootstrap().group(getEventLoopGroup(
domainSocket ? NativeTransports.eventLoopGroupClass(true) : Transports.eventLoopGroupClass()));

if (connectionPoint.getSocket() != null) {
NativeTransports.assertDomainSocketAvailable();
connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
} else {
connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
}
}

private EventLoopGroup getEventLoopGroup(Class<? extends EventLoopGroup> eventLoopGroupClass) {

for (;;) {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.lettuce.core.protocol.RedisHandshakeHandler;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.EpollProvider;
import io.lettuce.core.resource.EventLoopResources;
import io.lettuce.core.resource.IOUringProvider;
import io.lettuce.core.resource.KqueueProvider;
import io.lettuce.core.resource.Transports;
Expand Down Expand Up @@ -263,16 +264,16 @@ public void configureBootstrap(boolean domainSocket,
LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set");
LettuceAssert.assertState(clientOptions != null, "ClientOptions must be set");

Class<? extends EventLoopGroup> eventLoopGroupClass = Transports.eventLoopGroupClass();

Class<? extends Channel> channelClass = Transports.socketChannelClass();
EventLoopResources resources = Transports.eventLoopResources();
Class<? extends EventLoopGroup> eventLoopGroupClass = resources.eventLoopGroupClass();
Class<? extends Channel> channelClass;

if (domainSocket) {

Transports.NativeTransports.assertDomainSocketAvailable();
eventLoopGroupClass = Transports.NativeTransports.eventLoopGroupClass(true);
channelClass = Transports.NativeTransports.domainSocketChannelClass();
channelClass = resources.domainSocketChannelClass();
} else {
channelClass = resources.socketChannelClass();
bootstrap.resolver(clientResources.addressResolverGroup());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public class DefaultClientResources implements ClientResources {
* Default {@link AddressResolverGroup}.
*/
public static final AddressResolverGroup<?> DEFAULT_ADDRESS_RESOLVER_GROUP = new DnsAddressResolverGroup(
new DnsNameResolverBuilder().datagramChannelType(Transports.datagramChannelClass())
.socketChannelType(Transports.socketChannelClass().asSubclass(SocketChannel.class))
new DnsNameResolverBuilder().datagramChannelType(Transports.eventLoopResources().datagramChannelClass())
.socketChannelType(Transports.eventLoopResources().socketChannelClass().asSubclass(SocketChannel.class))
.cnameCache(new DefaultDnsCnameCache()).resolveCache(new DefaultDnsCache()));

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,43 +220,14 @@ static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Cl
factoryProvider.getThreadFactory("lettuce-eventExecutorLoop"));
}

// Check native transports FIRST (priority order: Epoll > Kqueue > IOUring)
// This ensures native transports are used when available, even though all providers
// now match MultiThreadIoEventLoopGroup.class in Netty 4.2
if (EpollProvider.isAvailable()) {

EventLoopResources resources = EpollProvider.getResources();

if (resources.matches(type)) {
return resources.newEventLoopGroup(numberOfThreads, factoryProvider.getThreadFactory("lettuce-epollEventLoop"));
}
}

if (KqueueProvider.isAvailable()) {

EventLoopResources resources = KqueueProvider.getResources();

if (resources.matches(type)) {
return resources.newEventLoopGroup(numberOfThreads,
factoryProvider.getThreadFactory("lettuce-kqueueEventLoop"));
}
}

if (IOUringProvider.isAvailable()) {

EventLoopResources resources = IOUringProvider.getResources();

if (resources.matches(type)) {
return resources.newEventLoopGroup(numberOfThreads,
factoryProvider.getThreadFactory("lettuce-io_uringEventLoop"));
}
}

// NIO as fallback (checked LAST)
EventLoopResources nioResources = NioProvider.getResources();

if (nioResources.matches(type)) {
return nioResources.newEventLoopGroup(numberOfThreads, factoryProvider.getThreadFactory("lettuce-nioEventLoop"));
// Use Transports.eventLoopResources() to get the best available transport
// This automatically handles the priority order: Epoll > Kqueue > IOUring > NIO
EventLoopResources resources = Transports.eventLoopResources();

// Verify the requested type matches the selected resources
// Support both the new MultiThreadIoEventLoopGroup and deprecated NioEventLoopGroup for backward compatibility
if (resources.eventLoopGroupClass().equals(type) || resources.matches(type)) {
return resources.newEventLoopGroup(numberOfThreads, factoryProvider.getThreadFactory(resources.threadNamePrefix()));
}

throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/resource/EpollProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public SocketAddress newSocketAddress(String socketPath) {
return new DomainSocketAddress(socketPath);
}

@Override
public String threadNamePrefix() {
return "lettuce-epollEventLoop";
}

}

}
15 changes: 14 additions & 1 deletion src/main/java/io/lettuce/core/resource/EventLoopResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ public interface EventLoopResources {
*
* @param type must not be {@code null}.
* @return {@code true} if {@code type} is a {@link EventExecutorGroup} of the underlying loop resources.
* @deprecated since 7.3.0, use {@link Transports#eventLoopResources()} to obtain the appropriate {@link EventLoopResources}
* instead of checking type compatibility. This method will be removed in 8.0.
*/
boolean matches(Class<? extends EventExecutorGroup> type);
@Deprecated
default boolean matches(Class<? extends EventExecutorGroup> type) {
return eventLoopGroupClass().equals(type);
}

/**
* @return the {@link EventLoopGroup} class.
Expand Down Expand Up @@ -80,4 +85,12 @@ public interface EventLoopResources {
*/
SocketAddress newSocketAddress(String socketPath);

/**
* Returns the thread name prefix for event loop threads created by this provider.
*
* @return the thread name prefix (e.g., "lettuce-nioEventLoop", "lettuce-epollEventLoop")
* @since 7.3.0
*/
String threadNamePrefix();

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ public SocketAddress newSocketAddress(String socketPath) {
return delegate.newSocketAddress(socketPath);
}

@Override
public String threadNamePrefix() {
verifier.run();
return delegate.threadNamePrefix();
}

}
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/resource/IOUringProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public SocketAddress newSocketAddress(String socketPath) {
return new DomainSocketAddress(socketPath);
}

@Override
public String threadNamePrefix() {
return "lettuce-io_uringEventLoop";
}

}

}
12 changes: 12 additions & 0 deletions src/main/java/io/lettuce/core/resource/KqueueProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ public Class<? extends DatagramChannel> datagramChannelClass() {
return null;
}

@Override
public String threadNamePrefix() {

checkForKqueueLibrary();
return null;
}

}

/**
Expand Down Expand Up @@ -238,6 +245,11 @@ public SocketAddress newSocketAddress(String socketPath) {
return new DomainSocketAddress(socketPath);
}

@Override
public String threadNamePrefix() {
return "lettuce-kqueueEventLoop";
}

}

}
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/resource/NioProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public SocketAddress newSocketAddress(String socketPath) {
throw new UnsupportedOperationException("Domain sockets are not supported with NIO transport");
}

@Override
public String threadNamePrefix() {
return "lettuce-nioEventLoop";
}

}

}
84 changes: 55 additions & 29 deletions src/main/java/io/lettuce/core/resource/Transports.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import io.lettuce.core.internal.LettuceAssert;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -22,38 +19,53 @@ public class Transports {

/**
* @return the default {@link EventLoopGroup} for socket transport that is compatible with {@link #socketChannelClass()}.
* @deprecated since 7.3.0, use {@link #eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#eventLoopGroupClass()}. This method will be removed in 8.0.
*/
@Deprecated
public static Class<? extends EventLoopGroup> eventLoopGroupClass() {

if (NativeTransports.isAvailable()) {
return NativeTransports.eventLoopGroupClass();
}

return NioProvider.getResources().eventLoopGroupClass();
return eventLoopResources().eventLoopGroupClass();
}

/**
* @return the default {@link Channel} for socket (network/TCP) transport.
* @deprecated since 7.3.0, use {@link #eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#socketChannelClass()}. This method will be removed in 8.0.
*/
@Deprecated
public static Class<? extends Channel> socketChannelClass() {

if (NativeTransports.isAvailable()) {
return NativeTransports.socketChannelClass();
}

return NioSocketChannel.class;
return eventLoopResources().socketChannelClass();
}

/**
* @return the default {@link DatagramChannel} for socket (network/UDP) transport.
* @deprecated since 7.3.0, use {@link #eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#datagramChannelClass()}. This method will be removed in 8.0.
*/
@Deprecated
public static Class<? extends DatagramChannel> datagramChannelClass() {
return eventLoopResources().datagramChannelClass();
}

/**
* Returns the best available {@link EventLoopResources} based on the runtime environment. This method selects native
* transports (Epoll, Kqueue, IOUring) when available, falling back to NIO otherwise.
* <p>
* Priority order: Epoll &gt; Kqueue &gt; IOUring &gt; NIO
* <p>
* The returned {@link EventLoopResources} automatically handles domain socket support. When both Epoll and IOUring are
* available, Epoll is selected (as it supports domain sockets while IOUring does not).
*
* @return the best available {@link EventLoopResources}
* @since 7.3.0
*/
public static EventLoopResources eventLoopResources() {

if (NativeTransports.isAvailable()) {
return NativeTransports.datagramChannelClass();
return NativeTransports.RESOURCES;
}

return NioDatagramChannel.class;
return NioProvider.getResources();
}

/**
Expand All @@ -63,8 +75,8 @@ public static class NativeTransports {

private static final InternalLogger transportsLogger = InternalLoggerFactory.getInstance(Transports.class);

static EventLoopResources RESOURCES = KqueueProvider.isAvailable() ? KqueueProvider.getResources()
: IOUringProvider.isAvailable() ? IOUringProvider.getResources() : EpollProvider.getResources();
static EventLoopResources RESOURCES = EpollProvider.isAvailable() ? EpollProvider.getResources()
: KqueueProvider.isAvailable() ? KqueueProvider.getResources() : IOUringProvider.getResources();

/**
* @return {@code true} if a native transport is available.
Expand All @@ -85,46 +97,60 @@ public static boolean isDomainSocketSupported() {

/**
* @return the native transport socket {@link Channel} class.
* @deprecated since 7.3.0, use {@link Transports#eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#socketChannelClass()}. This method will be removed in 8.0.
*/
@Deprecated
static Class<? extends Channel> socketChannelClass() {
return RESOURCES.socketChannelClass();
}

/**
* @return the native transport socket {@link DatagramChannel} class.
* @deprecated since 7.3.0, use {@link Transports#eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#datagramChannelClass()}. This method will be removed in 8.0.
*/
@Deprecated
static Class<? extends DatagramChannel> datagramChannelClass() {
return RESOURCES.datagramChannelClass();
}

/**
* @return the native transport domain socket {@link Channel} class.
* @deprecated since 7.3.0, use {@link Transports#eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#domainSocketChannelClass()}. With the corrected transport priority order (Epoll
* &gt; Kqueue &gt; IOUring), the returned resources automatically handle domain socket support. This method
* will be removed in 8.0.
*/
@Deprecated
public static Class<? extends Channel> domainSocketChannelClass() {
assertDomainSocketAvailable();
return EpollProvider.isAvailable() && IOUringProvider.isAvailable()
? EpollProvider.getResources().domainSocketChannelClass()
: RESOURCES.domainSocketChannelClass();
return RESOURCES.domainSocketChannelClass();
}

/**
* @return the native transport {@link EventLoopGroup} class. Defaults to TCP sockets. See
* {@link #eventLoopGroupClass(boolean)} to request a specific EventLoopGroup for Domain Socket usage.
* @return the native transport {@link EventLoopGroup} class.
* @deprecated since 7.3.0, use {@link Transports#eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#eventLoopGroupClass()}. This method will be removed in 8.0.
*/
@Deprecated
public static Class<? extends EventLoopGroup> eventLoopGroupClass() {
return eventLoopGroupClass(false);
return RESOURCES.eventLoopGroupClass();
}

/**
* @return the native transport {@link EventLoopGroup} class.
* @param domainSocket {@code true} to indicate Unix Domain Socket usage, {@code false} otherwise.
* @deprecated since 7.3.0, the {@code domainSocket} parameter is no longer needed. Use
* {@link Transports#eventLoopResources()} to obtain {@link EventLoopResources} and call
* {@link EventLoopResources#eventLoopGroupClass()}. With the corrected transport priority order (Epoll &gt;
* Kqueue &gt; IOUring), the returned resources automatically handle domain socket support. This method will
* be removed in 8.0.
* @since 6.3.3
*/
@Deprecated
public static Class<? extends EventLoopGroup> eventLoopGroupClass(boolean domainSocket) {

return domainSocket && EpollProvider.isAvailable() && IOUringProvider.isAvailable()
? EpollProvider.getResources().eventLoopGroupClass()
: RESOURCES.eventLoopGroupClass();
return RESOURCES.eventLoopGroupClass();
}

public static void assertDomainSocketAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public SocketAddress newSocketAddress(String socketPath) {
return null;
}

@Override
public String threadNamePrefix() {
return null;
}

}
Loading