Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static Throwable unwrapThrowable(Throwable t) {
return unwrapped;
}
}
return t;
return JavaUtils.unwrapCompletionException(t);
}

static IOException unwrapException(StatusRuntimeException se) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString;

/**
* A new log appender implementation using grpc bi-directional stream API.
*/
Expand Down Expand Up @@ -301,8 +303,8 @@ private void mayWait() {
getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
LOG.warn(this + ": Wait interrupted by " + ie);
Thread.currentThread().interrupt();
LOG.warn("{} is interrupted: {}", this, ie.toString());
}
}

Expand Down Expand Up @@ -616,11 +618,11 @@ void removePending(InstallSnapshotReplyProto reply) {
if (isNotificationOnly) {
Preconditions.assertSame(InstallSnapshotReplyBodyCase.SNAPSHOTINDEX,
reply.getInstallSnapshotReplyBodyCase(), "reply case");
Preconditions.assertSame(INSTALL_SNAPSHOT_NOTIFICATION_INDEX, (int) index, "poll index");
Preconditions.assertSame(INSTALL_SNAPSHOT_NOTIFICATION_INDEX, index, "poll index");
} else {
Preconditions.assertSame(InstallSnapshotReplyBodyCase.REQUESTINDEX,
reply.getInstallSnapshotReplyBodyCase(), "reply case");
Preconditions.assertSame(reply.getRequestIndex(), (int) index, "poll index");
Preconditions.assertSame(reply.getRequestIndex(), index, "poll index");
}
}
}
Expand Down Expand Up @@ -889,13 +891,9 @@ boolean isHeartbeat() {

@Override
public String toString() {
final String entries = entriesCount == 0? ""
: entriesCount == 1? ",entry=" + firstEntry
: ",entries=" + firstEntry + "..." + lastEntry;
return JavaUtils.getClassSimpleName(getClass())
+ ":cid=" + callId
+ ",entriesCount=" + entriesCount
+ entries;
+ ":" + toLogEntryTermIndexString(entriesCount, firstEntry, lastEntry);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ private void runImpl() {
}
synchronized (server) {
if (roleChangeChecking(electionTimeout)) {
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
this, lastRpcTime.elapsedTime(), electionTimeout);
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}ms, electionTimeout:{}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we add ms for lastRpcElapsedTime but not electionTimeout

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change

  • electionTimeout is a TimeDuration in ms
  • elapsedTime() returns a TimeDuration in ns.

So, I changed elapsedTime() to elapsedTimeMs() and added ms.

this, lastRpcTime.elapsedTimeMs(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
// election timeout, should become a candidate
server.changeToCandidate(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto;
import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto;
import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
Expand Down Expand Up @@ -239,31 +240,25 @@ public long[] getFollowerMatchIndices() {
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
private final NavigableIndices appendLogTermIndices;

private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this);
private final LeaderElectionMetrics leaderElectionMetrics;
private final RaftServerMetricsImpl raftServerMetrics;
private final CountDownLatch closeFinishedLatch = new CountDownLatch(1);

// To avoid append entry before complete start() method
// For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry
// request, and change state to RUNNING by lifeCycle.compareAndTransition(STARTING, RUNNING),
// then thread1 execute lifeCycle.transition(RUNNING) in startAsFollower(),
// So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING,
private final AtomicBoolean startComplete;
// Disallow appendEntries before start() complete; otherwise, it could fail with illegal lifeCycle transition
private final AtomicBoolean startComplete = new AtomicBoolean(false);
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
private final CountDownLatch closeFinishedLatch = new CountDownLatch(1);

private final TransferLeadership transferLeadership;
private final SnapshotManagementRequestHandler snapshotRequestHandler;
private final SnapshotInstallationHandler snapshotInstallationHandler;

private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;

private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
private final ThreadGroup threadGroup;

private final NavigableIndices appendLogTermIndices;

RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
throws IOException {
final RaftPeerId id = proxy.getId();
Expand Down Expand Up @@ -292,9 +287,6 @@ public long[] getFollowerMatchIndices() {
this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
getMemberId(), this::getCommitIndex, retryCache::getStatistics);

this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());

this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
Expand All @@ -309,6 +301,7 @@ public long[] getFollowerMatchIndices() {
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
id + "-client");
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
}

private long getCommitIndex(RaftPeerId id) {
Expand Down Expand Up @@ -1703,6 +1696,11 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
return appendFuture.whenCompleteAsync((r, t) -> {
if (t != null) {
LOG.warn("{}: appendEntries* failed: {}", getMemberId(), toLogEntryTermIndexString(entries), t);
} else if (LOG.isDebugEnabled()) {
LOG.debug("{}: appendEntries* succeeded: {}", getMemberId(), toLogEntryTermIndexString(entries));
}
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
Expand Down Expand Up @@ -1753,7 +1751,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
&& !(appendLogTermIndices != null && appendLogTermIndices.contains(previous))
&& !state.containsTermIndex(previous)) {
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
LOG.info("{}: Failed appendEntries, previous log entry {} not found", getMemberId(), previous);
return replyNextIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,23 @@ public static String toLogEntriesString(List<LogEntryProto> entries) {

public static String toLogEntriesShortString(List<LogEntryProto> entries,
Function<StateMachineLogEntryProto, String> stateMachineToString) {
return entries == null ? null
: entries.isEmpty()? "<empty>"
: "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0), stateMachineToString);
if (entries == null) {
return null;
}
return toLogEntryTermIndexString(entries)
+ (entries.isEmpty() ? "" : ", first=" + toLogEntryString(entries.get(0), stateMachineToString));
}

public static String toLogEntryTermIndexString(List<LogEntryProto> entries) {
final int n = entries.size();
return n == 0 ? toLogEntryTermIndexString(n, null, null)
: toLogEntryTermIndexString(n, TermIndex.valueOf(entries.get(0)), TermIndex.valueOf(entries.get(n - 1)));
}

public static String toLogEntryTermIndexString(int n, TermIndex first, TermIndex last) {
return n == 0 ? "HEARTBEAT"
: n == 1 ? "entry=" + first
: n + " entries=" + first + "..." + last;
}

public static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ProtoUtils;

import java.util.List;
import java.util.function.Function;

import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntriesShortString;

/**
* This class provides convenient utilities for converting Protocol Buffers messages to strings.
* The output strings are for information purpose only.
Expand All @@ -50,14 +49,12 @@ public static String toAppendEntriesRequestString(AppendEntriesRequestProto requ
if (request == null) {
return null;
}
final List<LogEntryProto> entries = request.getEntriesList();
return ProtoUtils.toString(request.getServerRequest())
+ "-t" + request.getLeaderTerm()
+ ",previous=" + TermIndex.valueOf(request.getPreviousLog())
+ ",leaderCommit=" + request.getLeaderCommit()
+ ",initializing? " + request.getInitializing()
+ "," + (entries.isEmpty()? "HEARTBEAT" : "entries: " +
LogProtoUtils.toLogEntriesShortString(entries, stateMachineToString));
+ "," + toLogEntriesShortString(request.getEntriesList(), stateMachineToString);
}

public static String toAppendEntriesReplyString(AppendEntriesReplyProto reply) {
Expand Down Expand Up @@ -87,7 +84,7 @@ public static String toInstallSnapshotRequestString(InstallSnapshotRequestProto
s = "notify:" + TermIndex.valueOf(notification.getFirstAvailableTermIndex());
break;
default:
throw new IllegalStateException("Unexpected body case in " + request);
throw new IllegalStateException("Unexpected InstallSnapshotRequestBodyCase in " + request);
}
return ProtoUtils.toString(request.getServerRequest())
+ "-t" + request.getLeaderTerm()
Expand Down Expand Up @@ -122,11 +119,7 @@ public static String toRequestVoteReplyString(RequestVoteReplyProto proto) {
+ "-last:" + TermIndex.valueOf(proto.getLastEntry());
}

/**
* Used to generate the necessary unified name in the submodules under
* {@link org.apache.ratis.server.impl.RaftServerImpl}, which consists
* of {@link org.apache.ratis.server.impl.ServerState#memberId} and the specific class.
*/
/** Generate the unified name for the given member and class. */
public static String generateUnifiedName(RaftGroupMemberId memberId, Class<?> clazz) {
return memberId + "-" + JavaUtils.getClassSimpleName(clazz);
}
Expand Down
Loading