-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[automatic failover] Refactor Circuit Breaker Command Tracking #3670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,17 +1,19 @@ | ||||||||||||||
| package io.lettuce.core.failover; | ||||||||||||||
|
|
||||||||||||||
| import java.util.ArrayList; | ||||||||||||||
| import java.util.Collection; | ||||||||||||||
| import java.util.concurrent.atomic.AtomicReference; | ||||||||||||||
|
|
||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||
|
|
||||||||||||||
| import io.lettuce.core.RedisCommandTimeoutException; | ||||||||||||||
| import io.lettuce.core.failover.api.RedisCircuitBreakerException; | ||||||||||||||
| import io.lettuce.core.protocol.CommandHandler; | ||||||||||||||
| import io.lettuce.core.protocol.CompleteableCommand; | ||||||||||||||
| import io.lettuce.core.protocol.CommandWrapper; | ||||||||||||||
| import io.lettuce.core.protocol.RedisCommand; | ||||||||||||||
| import io.netty.channel.Channel; | ||||||||||||||
| import io.netty.util.concurrent.Future; | ||||||||||||||
| import io.netty.util.concurrent.GenericFutureListener; | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Tracks command results and records them to the circuit breaker. | ||||||||||||||
|
|
@@ -86,6 +88,13 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { | |||||||||||||
| return command; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @SuppressWarnings("unchecked") | ||||||||||||||
| CircuitBreakerAwareCommand<K, V, T> circuitBreakerAwareCommand = CommandWrapper.unwrap(command, | ||||||||||||||
| CircuitBreakerAwareCommand.class); | ||||||||||||||
| if (circuitBreakerAwareCommand == null) { | ||||||||||||||
| command = new CircuitBreakerAwareCommand<>(command, circuitBreaker.getGeneration()); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| RedisCommand<K, V, T> result; | ||||||||||||||
| try { | ||||||||||||||
| // Delegate to parent | ||||||||||||||
|
|
@@ -95,9 +104,6 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { | |||||||||||||
| throw e; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| CircuitBreakerGeneration generation = circuitBreaker.getGeneration(); | ||||||||||||||
| // Attach completion callback to track success/failure | ||||||||||||||
| attachRecorder(generation, result); | ||||||||||||||
| return result; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -109,36 +115,59 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { | |||||||||||||
| commands.forEach(c -> c.completeExceptionally(RedisCircuitBreakerException.INSTANCE)); | ||||||||||||||
| return (Collection) commands; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Collection<RedisCommand<K, V, ?>> circuitBreakerAwareCommands = new ArrayList<>(commands.size()); | ||||||||||||||
| for (RedisCommand<K, V, ?> command : commands) { | ||||||||||||||
| @SuppressWarnings("unchecked") | ||||||||||||||
| CircuitBreakerAwareCommand<K, V, ?> circuitBreakerAwareCommand = CommandWrapper.unwrap(command, | ||||||||||||||
| CircuitBreakerAwareCommand.class); | ||||||||||||||
| if (circuitBreakerAwareCommand == null) { | ||||||||||||||
| command = new CircuitBreakerAwareCommand<>(command, circuitBreaker.getGeneration()); | ||||||||||||||
| } | ||||||||||||||
| circuitBreakerAwareCommands.add(command); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| commands = circuitBreakerAwareCommands; | ||||||||||||||
|
|
||||||||||||||
| Collection<RedisCommand<K, V, ?>> result; | ||||||||||||||
| try { | ||||||||||||||
| // Delegate to parent | ||||||||||||||
| result = commandWriter.writeMany(commands); | ||||||||||||||
| } catch (Exception e) { | ||||||||||||||
| // TODO: here not sure we should record exception for each command or just once for the batch | ||||||||||||||
| circuitBreaker.getGeneration().recordResult(e); | ||||||||||||||
| commands.forEach(c -> circuitBreaker.getGeneration().recordResult(e)); | ||||||||||||||
| throw e; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Attach completion callbacks to track success/failure for each command | ||||||||||||||
| CircuitBreakerGeneration generation = circuitBreaker.getGeneration(); | ||||||||||||||
| for (RedisCommand<K, V, ?> command : result) { | ||||||||||||||
| attachRecorder(generation, command); | ||||||||||||||
| } | ||||||||||||||
| return result; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| private <K, V> void attachRecorder(CircuitBreakerGeneration generation, RedisCommand<K, V, ?> command) { | ||||||||||||||
| if (command instanceof CompleteableCommand) { | ||||||||||||||
| @SuppressWarnings("unchecked") | ||||||||||||||
| CompleteableCommand<Object> completeable = (CompleteableCommand<Object>) command; | ||||||||||||||
| completeable.onComplete((o, e) -> { | ||||||||||||||
| // Only record timeout exceptions | ||||||||||||||
| // Other exceptions are tracked inside the pipeline vioa MultiDbOutboundHandler | ||||||||||||||
| if (e instanceof RedisCommandTimeoutException) { | ||||||||||||||
| generation.recordResult(e); | ||||||||||||||
| } | ||||||||||||||
| }); | ||||||||||||||
| static class CircuitBreakerAwareCommand<K, V, T> extends CommandWrapper<K, V, T> | ||||||||||||||
| implements GenericFutureListener<Future<Void>> { | ||||||||||||||
|
|
||||||||||||||
| private final CircuitBreakerGeneration generation; | ||||||||||||||
|
|
||||||||||||||
| public CircuitBreakerAwareCommand(RedisCommand<K, V, T> command, CircuitBreakerGeneration generation) { | ||||||||||||||
| super(command); | ||||||||||||||
| this.generation = generation; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doBeforeComplete() { | ||||||||||||||
| generation.recordResult(null); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doBeforeError(Throwable throwable) { | ||||||||||||||
| generation.recordResult(throwable); | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+154
to
162
|
||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void operationComplete(Future<Void> future) throws Exception { | ||||||||||||||
| if (!future.isSuccess()) { | ||||||||||||||
| generation.recordResult(future.cause()); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
Comment on lines
+164
to
+170
|
||||||||||||||
| @Override | |
| public void operationComplete(Future<Void> future) throws Exception { | |
| if (!future.isSuccess()) { | |
| generation.recordResult(future.cause()); | |
| } | |
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,15 +1,11 @@ | ||||||
| package io.lettuce.core.failover; | ||||||
|
|
||||||
| import java.util.List; | ||||||
| import java.util.Collection; | ||||||
|
|
||||||
| import io.lettuce.core.RedisCommandTimeoutException; | ||||||
| import io.lettuce.core.protocol.CompleteableCommand; | ||||||
| import io.lettuce.core.protocol.RedisCommand; | ||||||
| import io.lettuce.core.failover.DatabaseCommandTracker.CircuitBreakerAwareCommand; | ||||||
| import io.netty.channel.ChannelHandlerContext; | ||||||
| import io.netty.channel.ChannelOutboundHandlerAdapter; | ||||||
| import io.netty.channel.ChannelPromise; | ||||||
| import io.netty.util.concurrent.Future; | ||||||
| import io.netty.util.concurrent.GenericFutureListener; | ||||||
|
|
||||||
| /** | ||||||
| * Channel handler for tracking command results and recording them to the circuit breaker. | ||||||
|
|
@@ -39,15 +35,18 @@ public MultiDbOutboundHandler(CircuitBreaker circuitBreaker) { | |||||
| public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | ||||||
| if (circuitBreaker != null) { | ||||||
|
|
||||||
| if (msg instanceof RedisCommand<?, ?, ?>) { | ||||||
| promise.addListener(recorder(circuitBreaker.getGeneration(), (RedisCommand<?, ?, ?>) msg)); | ||||||
| if (msg instanceof CircuitBreakerAwareCommand<?, ?, ?>) { | ||||||
| promise.addListener(((CircuitBreakerAwareCommand<?, ?, ?>) msg)); | ||||||
| } | ||||||
|
|
||||||
| if (msg instanceof List) { | ||||||
| List<?> list = (List<?>) msg; | ||||||
| for (Object o : list) { | ||||||
| if (o instanceof RedisCommand<?, ?, ?>) { | ||||||
| promise.addListener(recorder(circuitBreaker.getGeneration(), (RedisCommand<?, ?, ?>) o)); | ||||||
| if (msg instanceof Collection) { | ||||||
| Collection<?> collection = (Collection<?>) msg; | ||||||
| for (Object cmd : collection) { | ||||||
| if (cmd instanceof CircuitBreakerAwareCommand<?, ?, ?>) { | ||||||
| // we want to use the generation that is active at the time of write. | ||||||
| // Using a lamda which acces directly to circuitBreaker.getGeneration() here would delay capturing the | ||||||
|
||||||
| // Using a lamda which acces directly to circuitBreaker.getGeneration() here would delay capturing the | |
| // Using a lambda which access directly to circuitBreaker.getGeneration() here would delay capturing the |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -64,13 +64,24 @@ public void complete() { | |||||||
|
|
||||||||
| if (consumers != COMPLETE && ONCOMPLETE.compareAndSet(this, consumers, COMPLETE)) { | ||||||||
|
|
||||||||
| doBeforeComplete(); | ||||||||
|
|
||||||||
| command.complete(); | ||||||||
|
|
||||||||
| doOnComplete(); | ||||||||
| notifyConsumers(consumers); | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * Callback method called after successful completion and before notifying downstream consumers. | ||||||||
|
||||||||
| * Callback method called after successful completion and before notifying downstream consumers. | |
| * Callback method called before successful completion of the wrapped command and before notifying downstream | |
| * consumers. |
Copilot
AI
Feb 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JavaDoc comment is incorrect. It says "after error completion and before notifying downstream consumers", but the method is called BEFORE the wrapped command is completed exceptionally (line 145 for completeExceptionally, line 129 for cancel), not after. The comment should say "before error completion of the wrapped command and before notifying downstream consumers".
| * Callback method called after error completion and before notifying downstream consumers. | |
| * Callback method called before error completion of the wrapped command and before notifying downstream consumers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical bug: Potential triple recording of failures for batch write exceptions. When writeMany throws an exception, line 137 records the exception once for each command. Then, each command will also be completed exceptionally (presumably), which will trigger doBeforeError (line 160-161) recording the exception again. Additionally, if the write promise fails, operationComplete (line 165-168) will record it a third time. This results in each batch write failure being recorded 2-3 times per command.