Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 52 additions & 23 deletions src/main/java/io/lettuce/core/failover/DatabaseCommandTracker.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package io.lettuce.core.failover;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.failover.api.RedisCircuitBreakerException;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.CommandWrapper;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
* Tracks command results and records them to the circuit breaker.
Expand Down Expand Up @@ -86,6 +88,13 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
return command;
}

@SuppressWarnings("unchecked")
CircuitBreakerAwareCommand<K, V, T> circuitBreakerAwareCommand = CommandWrapper.unwrap(command,
CircuitBreakerAwareCommand.class);
if (circuitBreakerAwareCommand == null) {
command = new CircuitBreakerAwareCommand<>(command, circuitBreaker.getGeneration());
}

RedisCommand<K, V, T> result;
try {
// Delegate to parent
Expand All @@ -95,9 +104,6 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
throw e;
}

CircuitBreakerGeneration generation = circuitBreaker.getGeneration();
// Attach completion callback to track success/failure
attachRecorder(generation, result);
return result;
}

Expand All @@ -109,36 +115,59 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
commands.forEach(c -> c.completeExceptionally(RedisCircuitBreakerException.INSTANCE));
return (Collection) commands;
}

Collection<RedisCommand<K, V, ?>> circuitBreakerAwareCommands = new ArrayList<>(commands.size());
for (RedisCommand<K, V, ?> command : commands) {
@SuppressWarnings("unchecked")
CircuitBreakerAwareCommand<K, V, ?> circuitBreakerAwareCommand = CommandWrapper.unwrap(command,
CircuitBreakerAwareCommand.class);
if (circuitBreakerAwareCommand == null) {
command = new CircuitBreakerAwareCommand<>(command, circuitBreaker.getGeneration());
}
circuitBreakerAwareCommands.add(command);
}

commands = circuitBreakerAwareCommands;

Collection<RedisCommand<K, V, ?>> result;
try {
// Delegate to parent
result = commandWriter.writeMany(commands);
} catch (Exception e) {
// TODO: here not sure we should record exception for each command or just once for the batch
circuitBreaker.getGeneration().recordResult(e);
commands.forEach(c -> circuitBreaker.getGeneration().recordResult(e));
throw e;
Comment on lines 136 to 138
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical bug: Potential triple recording of failures for batch write exceptions. When writeMany throws an exception, line 137 records the exception once for each command. Then, each command will also be completed exceptionally (presumably), which will trigger doBeforeError (line 160-161) recording the exception again. Additionally, if the write promise fails, operationComplete (line 165-168) will record it a third time. This results in each batch write failure being recorded 2-3 times per command.

Copilot uses AI. Check for mistakes.
}

// Attach completion callbacks to track success/failure for each command
CircuitBreakerGeneration generation = circuitBreaker.getGeneration();
for (RedisCommand<K, V, ?> command : result) {
attachRecorder(generation, command);
}
return result;
}

private <K, V> void attachRecorder(CircuitBreakerGeneration generation, RedisCommand<K, V, ?> command) {
if (command instanceof CompleteableCommand) {
@SuppressWarnings("unchecked")
CompleteableCommand<Object> completeable = (CompleteableCommand<Object>) command;
completeable.onComplete((o, e) -> {
// Only record timeout exceptions
// Other exceptions are tracked inside the pipeline vioa MultiDbOutboundHandler
if (e instanceof RedisCommandTimeoutException) {
generation.recordResult(e);
}
});
static class CircuitBreakerAwareCommand<K, V, T> extends CommandWrapper<K, V, T>
implements GenericFutureListener<Future<Void>> {

private final CircuitBreakerGeneration generation;

public CircuitBreakerAwareCommand(RedisCommand<K, V, T> command, CircuitBreakerGeneration generation) {
super(command);
this.generation = generation;
}

@Override
protected void doBeforeComplete() {
generation.recordResult(null);
}

@Override
protected void doBeforeError(Throwable throwable) {
generation.recordResult(throwable);
}
Comment on lines +154 to 162
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical issue: The new implementation changes the tracking behavior in a way that conflicts with existing tests. The CircuitBreakerAwareCommand records ALL successes (line 156) and ALL errors (line 161), but the existing tests expect that only timeout exceptions are recorded by DatabaseCommandTracker while successes and non-timeout exceptions are recorded by MultiDbOutboundHandler. This change appears to cause double-recording when combined with MultiDbOutboundHandler, or the tests need to be updated to reflect the new behavior. The PR description does not mention this behavioral change, and the tests are not included in the diff, suggesting they may not have been updated or verified to pass with these changes.

Copilot uses AI. Check for mistakes.

@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
generation.recordResult(future.cause());
}
}

Comment on lines +164 to +170
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical bug: This implementation can lead to double recording of failures. When a write operation fails, both operationComplete() (line 165-168) and doBeforeError() (line 160-161) will record the same failure to the circuit breaker generation. The operationComplete() method should be removed, or the doBeforeError() should check if the error is being recorded due to a write failure to avoid duplicate recording.

Suggested change
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
generation.recordResult(future.cause());
}
}

Copilot uses AI. Check for mistakes.
}

}
52 changes: 12 additions & 40 deletions src/main/java/io/lettuce/core/failover/MultiDbOutboundHandler.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package io.lettuce.core.failover;

import java.util.List;
import java.util.Collection;

import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.failover.DatabaseCommandTracker.CircuitBreakerAwareCommand;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
* Channel handler for tracking command results and recording them to the circuit breaker.
Expand Down Expand Up @@ -39,15 +35,18 @@ public MultiDbOutboundHandler(CircuitBreaker circuitBreaker) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (circuitBreaker != null) {

if (msg instanceof RedisCommand<?, ?, ?>) {
promise.addListener(recorder(circuitBreaker.getGeneration(), (RedisCommand<?, ?, ?>) msg));
if (msg instanceof CircuitBreakerAwareCommand<?, ?, ?>) {
promise.addListener(((CircuitBreakerAwareCommand<?, ?, ?>) msg));
}

if (msg instanceof List) {
List<?> list = (List<?>) msg;
for (Object o : list) {
if (o instanceof RedisCommand<?, ?, ?>) {
promise.addListener(recorder(circuitBreaker.getGeneration(), (RedisCommand<?, ?, ?>) o));
if (msg instanceof Collection) {
Collection<?> collection = (Collection<?>) msg;
for (Object cmd : collection) {
if (cmd instanceof CircuitBreakerAwareCommand<?, ?, ?>) {
// we want to use the generation that is active at the time of write.
// Using a lamda which acces directly to circuitBreaker.getGeneration() here would delay capturing the
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: "acces" should be "access", and "lamda" should be "lambda".

Suggested change
// Using a lamda which acces directly to circuitBreaker.getGeneration() here would delay capturing the
// Using a lambda which access directly to circuitBreaker.getGeneration() here would delay capturing the

Copilot uses AI. Check for mistakes.
// generation and use a potentially wrong generation, a later one.
promise.addListener(((CircuitBreakerAwareCommand<?, ?, ?>) cmd));
}
}
}
Expand All @@ -57,31 +56,4 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

}

private GenericFutureListener<? extends Future<? super Void>> recorder(CircuitBreakerGeneration generation,
RedisCommand<?, ?, ?> command) {
return future -> {
// Record failure if write failed
if (!future.isSuccess()) {
generation.recordResult(future.cause());
} else {
// Track command completion if write succeeded
recordOnCommandComplete(generation, command);
}
};
}

private void recordOnCommandComplete(CircuitBreakerGeneration generation, RedisCommand<?, ?, ?> command) {
// Attach completion callback to track success/failure
if (command instanceof CompleteableCommand) {
CompleteableCommand<?> completeable = (CompleteableCommand<?>) command;
completeable.onComplete((o, e) -> {
// Record failures except for RedisCommandTimeoutException
// Timeouts are recorded by DatabaseCommandTracker
if (!(e instanceof RedisCommandTimeoutException)) {
generation.recordResult(e);
}
});
}
}

}
26 changes: 25 additions & 1 deletion src/main/java/io/lettuce/core/protocol/CommandWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,24 @@ public void complete() {

if (consumers != COMPLETE && ONCOMPLETE.compareAndSet(this, consumers, COMPLETE)) {

doBeforeComplete();

command.complete();

doOnComplete();
notifyConsumers(consumers);
}
}

/**
* Callback method called after successful completion and before notifying downstream consumers.
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc comment is incorrect. It says "after successful completion and before notifying downstream consumers", but the method is called BEFORE the wrapped command is completed (line 69), not after. The comment should say "before successful completion of the wrapped command and before notifying downstream consumers".

Suggested change
* Callback method called after successful completion and before notifying downstream consumers.
* Callback method called before successful completion of the wrapped command and before notifying downstream
* consumers.

Copilot uses AI. Check for mistakes.
*
* @since 7.4
*/
protected void doBeforeComplete() {

}

/**
* Callback method called after successful completion and before notifying downstream consumers.
*
Expand Down Expand Up @@ -111,9 +122,11 @@ public void cancel() {
Object[] consumers = ONCOMPLETE.get(this);

if (consumers != COMPLETE && ONCOMPLETE.compareAndSet(this, consumers, COMPLETE)) {
CancellationException exception = new CancellationException();

doBeforeError(exception);

command.cancel();
CancellationException exception = new CancellationException();
doOnError(exception);
notifyBiConsumer(consumers, exception);
}
Expand All @@ -128,6 +141,7 @@ public boolean completeExceptionally(Throwable throwable) {
boolean result = false;
if (consumers != COMPLETE && ONCOMPLETE.compareAndSet(this, consumers, COMPLETE)) {

doBeforeError(throwable);
result = command.completeExceptionally(throwable);
doOnError(throwable);
notifyBiConsumer(consumers, throwable);
Expand All @@ -136,6 +150,16 @@ public boolean completeExceptionally(Throwable throwable) {
return result;
}

/**
* Callback method called after error completion and before notifying downstream consumers.
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc comment is incorrect. It says "after error completion and before notifying downstream consumers", but the method is called BEFORE the wrapped command is completed exceptionally (line 145 for completeExceptionally, line 129 for cancel), not after. The comment should say "before error completion of the wrapped command and before notifying downstream consumers".

Suggested change
* Callback method called after error completion and before notifying downstream consumers.
* Callback method called before error completion of the wrapped command and before notifying downstream consumers.

Copilot uses AI. Check for mistakes.
*
* @param throwable
* @since 7.4
*/
protected void doBeforeError(Throwable throwable) {

}

/**
* Callback method called after error completion and before notifying downstream consumers.
*
Expand Down
Loading