Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
private final AtomicInteger lastProtocolVersion = new AtomicInteger(0);

private volatile long lastRequestTimestamp = 0;
private static final double ALPHA = 0.1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be configurable as an experimental option as don't know what alpha value makes sense for mainnet.

private volatile double averageLatencyMs = -1.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the initial value the worst case may mean that some peers never get selected and get sampled.

private volatile double averageThroughputBytesPerSecond = -1.0;

private final Map<String, Map<Integer, RequestManager>> requestManagers;

Expand Down Expand Up @@ -224,6 +227,33 @@ public void recordUsefulResponse() {
reputation.recordUsefulResponse();
}

public void recordLatency(final double latencyMs) {
if (averageLatencyMs < 0) {
averageLatencyMs = latencyMs;
} else {
averageLatencyMs = (ALPHA * latencyMs) + ((1.0 - ALPHA) * averageLatencyMs);
}
}

public void recordThroughput(final long bytes, final double durationMs) {
if (durationMs <= 0) return;
double throughput = (bytes * 1000.0) / durationMs;
if (averageThroughputBytesPerSecond < 0) {
averageThroughputBytesPerSecond = throughput;
} else {
averageThroughputBytesPerSecond =
(ALPHA * throughput) + ((1.0 - ALPHA) * averageThroughputBytesPerSecond);
}
}

public double getAverageLatencyMs() {
return averageLatencyMs;
}

public double getAverageThroughputBytesPerSecond() {
return averageThroughputBytesPerSecond;
}

public void disconnect(final DisconnectReason reason) {
connection.disconnect(reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public record EthPeerImmutableAttributes(
boolean isServingSnap,
boolean hasAvailableRequestCapacity,
boolean isInboundInitiated,
double averageLatencyMs,
double averageThroughputBytesPerSecond,
EthPeer ethPeer) {

public static EthPeerImmutableAttributes from(final EthPeer peer) {
Expand All @@ -43,6 +45,8 @@ public static EthPeerImmutableAttributes from(final EthPeer peer) {
peer.isServingSnap(),
peer.hasAvailableRequestCapacity(),
peer.getConnection().inboundInitiated(),
peer.getAverageLatencyMs(),
peer.getAverageThroughputBytesPerSecond(),
peer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public class EthPeers implements PeerSelector {
public static final Comparator<EthPeerImmutableAttributes> LEAST_TO_MOST_BUSY =
Comparator.comparing(EthPeerImmutableAttributes::outstandingRequests)
.thenComparing(EthPeerImmutableAttributes::lastRequestTimestamp);

public static final Comparator<EthPeerImmutableAttributes> BY_LOW_LATENCY =
Comparator.comparing(
(final EthPeerImmutableAttributes p) ->
p.averageLatencyMs() < 0 ? Double.MAX_VALUE : p.averageLatencyMs())
.reversed();

public static final Comparator<EthPeerImmutableAttributes> BY_HIGH_THROUGHPUT =
Comparator.comparing(EthPeerImmutableAttributes::averageThroughputBytesPerSecond);

public static final int NODE_ID_LENGTH = 64;
public static final int USEFULL_PEER_SCORE_THRESHOLD = 102;

Expand Down Expand Up @@ -373,9 +383,14 @@ public Stream<EthPeerImmutableAttributes> streamAvailablePeers() {
}

public Stream<EthPeerImmutableAttributes> streamBestPeers() {
return streamBestPeers(getBestPeerComparator());
}

public Stream<EthPeerImmutableAttributes> streamBestPeers(
final Comparator<EthPeerImmutableAttributes> comparator) {
return streamAvailablePeers()
.filter(EthPeerImmutableAttributes::isFullyValidated)
.sorted(getBestPeerComparator().reversed());
.sorted(comparator.reversed());
}

public Optional<EthPeer> bestPeer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private final String protocolName;
private final int requestCode;
private volatile PendingPeerRequest responseStream;
private long startTimeNanos;

protected AbstractPeerRequestTask(
final EthContext ethContext,
Expand All @@ -64,6 +65,7 @@ protected final void executeTask() {
responseStream = sendRequest();
responseStream.then(
stream -> {
startTimeNanos = System.nanoTime();
// Start the timeout now that the request has actually been sent
ethContext.getScheduler().failAfterTimeout(promise, timeout);

Expand Down Expand Up @@ -108,17 +110,28 @@ private void handleMessage(
final Optional<R> result = processResponse(streamClosed, message, peer);
result.ifPresent(
r -> {
final double durationMs = (System.nanoTime() - startTimeNanos) / 1_000_000.0;
if (peer != null) {
peer.recordLatency(durationMs);
if (message != null) {
peer.recordThroughput(message.getSize(), durationMs);
}
peer.recordUsefulResponse();
}
promise.complete(r);
peer.recordUsefulResponse();
});
} catch (final RLPException e) {
// Peer sent us malformed data - disconnect
LOG.debug(
"Disconnecting with BREACH_OF_PROTOCOL due to malformed message: {}",
peer.getLoggableId(),
"Disconnecting with BREACH_OF_PROTOCOL due to malformed message from peer: {}",
(peer == null) ? "unknown" : peer.getLoggableId(),
e);
LOG.trace("Peer {} Malformed message data: {}", peer, message.getData());
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
if (message != null) {
LOG.trace("Peer {} Malformed message data: {}", peer, message.getData());
}
if (peer != null) {
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}
promise.completeExceptionally(new PeerBreachedProtocolException());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Comparator;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand All @@ -39,6 +40,7 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry

private final Set<EthPeer> triedPeers = new HashSet<>();
private final Set<EthPeer> failedPeers = new HashSet<>();
private Optional<Comparator<EthPeerImmutableAttributes>> peerComparator = Optional.empty();

protected AbstractRetryingSwitchingPeerTask(
final EthContext ethContext,
Expand Down Expand Up @@ -100,6 +102,10 @@ protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedP
});
}

public void setPeerComparator(final Comparator<EthPeerImmutableAttributes> peerComparator) {
this.peerComparator = Optional.of(peerComparator);
}

@Override
protected void handleTaskError(final Throwable error) {
if (isPeerFailure(error)) {
Expand Down Expand Up @@ -129,7 +135,8 @@ private Optional<EthPeer> selectNextPeer() {
protected Optional<EthPeer> nextPeerToTry() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.streamBestPeers(
peerComparator.orElse(getEthContext().getEthPeers().getBestPeerComparator()))
.filter((peer) -> isSuitablePeer(peer) && !triedPeers.contains(peer.ethPeer()))
.map(EthPeerImmutableAttributes::ethPeer)
.findFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetAccountRangeFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetBytecodeFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.snap.RetryingGetStorageRangeFromPeerTask;
Expand Down Expand Up @@ -94,7 +95,10 @@ public CompletableFuture<Task<SnapDataRequest>> requestAccount(
accountDataRequest.getEndKeyHash(),
blockHeader,
metricsSystem);
((RetryingGetAccountRangeFromPeerTask) getAccountTask)
.setPeerComparator(EthPeers.BY_HIGH_THROUGHPUT);
downloadState.addOutstandingTask(getAccountTask);

return getAccountTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
Expand Down Expand Up @@ -142,6 +146,8 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestStorage(
final EthTask<StorageRangeMessage.SlotRangeData> getStorageRangeTask =
RetryingGetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashesAsBytes32, minRange, maxRange, blockHeader, metricsSystem);
((RetryingGetStorageRangeFromPeerTask) getStorageRangeTask)
.setPeerComparator(EthPeers.BY_HIGH_THROUGHPUT);
downloadState.addOutstandingTask(getStorageRangeTask);
return getStorageRangeTask
.run()
Expand Down Expand Up @@ -203,6 +209,7 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestCode(
final EthTask<Map<Bytes32, Bytes>> getByteCodeTask =
RetryingGetBytecodeFromPeerTask.forByteCode(
ethContext, codeHashes, blockHeader, metricsSystem);
((RetryingGetBytecodeFromPeerTask) getByteCodeTask).setPeerComparator(EthPeers.BY_LOW_LATENCY);
downloadState.addOutstandingTask(getByteCodeTask);
return getByteCodeTask
.run()
Expand Down Expand Up @@ -249,6 +256,8 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestTrieNodeByPath(
final EthTask<Map<Bytes, Bytes>> getTrieNodeFromPeerTask =
RetryingGetTrieNodeFromPeerTask.forTrieNodes(
ethContext, message, blockHeader, metricsSystem);
((RetryingGetTrieNodeFromPeerTask) getTrieNodeFromPeerTask)
.setPeerComparator(EthPeers.BY_LOW_LATENCY);
downloadState.addOutstandingTask(getTrieNodeFromPeerTask);
return getTrieNodeFromPeerTask
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,39 @@ public void recordUsefulResponse() {
assertThat(peer.getReputation().compareTo(peer2.getReputation())).isGreaterThan(0);
}

@Test
public void shouldRecordLatencyAndThroughput() {
final EthPeer peer = createPeer();
assertThat(peer.getAverageLatencyMs()).isEqualTo(-1.0);
assertThat(peer.getAverageThroughputBytesPerSecond()).isEqualTo(-1.0);

// First record sets the value
peer.recordLatency(100.0);
assertThat(peer.getAverageLatencyMs()).isEqualTo(100.0);

// Second record updates EMA: 0.1 * 200 + 0.9 * 100 = 110
peer.recordLatency(200.0);
assertThat(peer.getAverageLatencyMs()).isEqualTo(110.0);

// First record sets throughput
// 1000 bytes in 100ms = 10,000 bytes/s
peer.recordThroughput(1000, 100.0);
assertThat(peer.getAverageThroughputBytesPerSecond()).isEqualTo(10000.0);

// Second record updates EMA
// 2000 bytes in 100ms = 20,000 bytes/s
// EMA: 0.1 * 20000 + 0.9 * 10000 = 11000
peer.recordThroughput(2000, 100.0);
assertThat(peer.getAverageThroughputBytesPerSecond()).isEqualTo(11000.0);
}

@Test
public void shouldHandleZeroDurationInThroughput() {
final EthPeer peer = createPeer();
peer.recordThroughput(1000, 0.0);
assertThat(peer.getAverageThroughputBytesPerSecond()).isEqualTo(-1.0);
}

private void messageStream(
final ResponseStreamSupplier getStream,
final MessageData targetMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,63 @@ public void comparesPeersWithTdAndNoHeight() {
.isEmpty();
}

@Test
public void comparesPeersByLatency() {
final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();
final EthPeer peerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();

peerA.recordLatency(100.0);
peerB.recordLatency(200.0);

final EthPeerImmutableAttributes attrA = EthPeerImmutableAttributes.from(peerA);
final EthPeerImmutableAttributes attrB = EthPeerImmutableAttributes.from(peerB);

// Smaller latency is better, so attrA > attrB in LOW_LATENCY comparator
assertThat(EthPeers.BY_LOW_LATENCY.compare(attrA, attrB)).isGreaterThan(0);
assertThat(EthPeers.BY_LOW_LATENCY.compare(attrB, attrA)).isLessThan(0);

// Peer with unknown latency should be worse than any known latency
final EthPeer peerC =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();
final EthPeerImmutableAttributes attrC = EthPeerImmutableAttributes.from(peerC);
assertThat(EthPeers.BY_LOW_LATENCY.compare(attrA, attrC)).isGreaterThan(0);
}

@Test
public void comparesPeersByThroughput() {
final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();
final EthPeer peerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();

peerA.recordThroughput(2000, 1000.0); // 2000 bytes/s
peerB.recordThroughput(1000, 1000.0); // 1000 bytes/s

final EthPeerImmutableAttributes attrA = EthPeerImmutableAttributes.from(peerA);
final EthPeerImmutableAttributes attrB = EthPeerImmutableAttributes.from(peerB);

// Larger throughput is better
assertThat(EthPeers.BY_HIGH_THROUGHPUT.compare(attrA, attrB)).isGreaterThan(0);
assertThat(EthPeers.BY_HIGH_THROUGHPUT.compare(attrB, attrA)).isLessThan(0);
}

@Test
public void shouldStreamBestPeersWithCustomComparator() {
final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();
final EthPeer peerB =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000).getEthPeer();

peerA.recordLatency(200.0);
peerB.recordLatency(100.0);

// BY_LOW_LATENCY reversed should put peerB first
assertThat(ethPeers.streamBestPeers(EthPeers.BY_LOW_LATENCY).findFirst())
.contains(EthPeerImmutableAttributes.from(peerB));
}

@Test
public void shouldExecutePeerRequestImmediatelyWhenPeerIsAvailable() throws Exception {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
Expand Down