Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
"--Xsynchronizer-world-state-min-millis-before-stalling";
private static final String WORLD_STATE_TASK_CACHE_SIZE_FLAG =
"--Xsynchronizer-world-state-task-cache-size";
private static final String RECEIPTS_DOWNLOAD_STEP_TIMEOUT_MILLIS_FLAG =
"--Xsynchronizer-receipts-download-step-timeout-millis";

// Regular (stable) flag
private static final String SNAP_SERVER_ENABLED_FLAG = "--snapsync-server-enabled";
Expand Down Expand Up @@ -258,6 +260,15 @@ public void parseBlockPropagationRange(final String arg) {
private int worldStateTaskCacheSize =
SynchronizerConfiguration.DEFAULT_WORLD_STATE_TASK_CACHE_SIZE;

@CommandLine.Option(
names = RECEIPTS_DOWNLOAD_STEP_TIMEOUT_MILLIS_FLAG,
hidden = true,
paramLabel = "<LONG>",
description =
"Maximum time in milliseconds to wait for receipts download step including all retries (default: ${DEFAULT-VALUE})")
private long receiptsDownloadStepTimeoutMillis =
SynchronizerConfiguration.DEFAULT_RECEIPTS_DOWNLOAD_STEP_TIMEOUT_MILLIS;

@CommandLine.Option(
names = SNAP_PIVOT_BLOCK_WINDOW_VALIDITY_FLAG,
hidden = true,
Expand Down Expand Up @@ -509,6 +520,7 @@ public SynchronizerConfiguration.Builder toDomainObject() {
builder.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled);
builder.snapSyncSavePreCheckpointHeadersOnlyEnabled(
snapSyncSavePreCheckpointHeadersOnlyEnabled);
builder.receiptsDownloadStepTimeoutMillis(receiptsDownloadStepTimeoutMillis);
builder.era1ImportPrepipelineEnabled(era1ImportPrepipelineEnabled);
builder.era1DataUri(era1DataUri);
builder.era1ImportPrepipelineConcurrency(era1ImportPrepipelineConcurrency);
Expand Down Expand Up @@ -553,6 +565,8 @@ public List<String> getCLIOptions() {
OptionParser.format(worldStateMinMillisBeforeStalling),
WORLD_STATE_TASK_CACHE_SIZE_FLAG,
OptionParser.format(worldStateTaskCacheSize),
RECEIPTS_DOWNLOAD_STEP_TIMEOUT_MILLIS_FLAG,
OptionParser.format(receiptsDownloadStepTimeoutMillis),
SNAP_PIVOT_BLOCK_WINDOW_VALIDITY_FLAG,
OptionParser.format(snapsyncPivotBlockWindowValidity),
SNAP_PIVOT_BLOCK_DISTANCE_BEFORE_CACHING_FLAG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public Block block(final BlockOptions options) {
}

public Block block() {
return block(new BlockOptions());
return block(blockOptionsSupplier.get());
}

/**
Expand Down Expand Up @@ -477,10 +477,11 @@ public BlockBody body(final BlockOptions options) {
ommers.add(ommer());
}
}
final List<Transaction> defaultTxs = new ArrayList<>();
final List<Transaction> defaultTxs = new ArrayList<>(options.transactionCount());
if (options.hasTransactions()) {
defaultTxs.add(transaction(options.getTransactionTypes()));
defaultTxs.add(transaction(options.getTransactionTypes()));
for (int i = 0; i < options.transactionCount(); i++) {
defaultTxs.add(transaction(options.getTransactionTypes()));
}
}

return new BlockBody(
Expand Down Expand Up @@ -901,6 +902,7 @@ public static class BlockOptions {
private Optional<Long> timestamp = Optional.empty();
private boolean hasOmmers = true;
private boolean hasTransactions = true;
private int transactionCount = 2;
private TransactionType[] transactionTypes = {
TransactionType.FRONTIER, TransactionType.ACCESS_LIST, TransactionType.EIP1559
};
Expand Down Expand Up @@ -997,6 +999,10 @@ public boolean hasTransactions() {
return hasTransactions;
}

public int transactionCount() {
return transactionCount;
}

public boolean hasOmmers() {
return hasOmmers;
}
Expand Down Expand Up @@ -1096,6 +1102,11 @@ public BlockOptions hasTransactions(final boolean hasTransactions) {
return this;
}

public BlockOptions transactionCount(final int transactionCount) {
this.transactionCount = transactionCount;
return this;
}

public BlockOptions transactionTypes(final TransactionType... transactionTypes) {
this.transactionTypes = transactionTypes;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
requestSender.sendRequest(peerTaskSubProtocol, requestMessageData, peer);

if (responseMessageData == null) {
throw new InvalidPeerTaskResponseException();
throw new InvalidPeerTaskResponseException("Null response");
}

result = peerTask.processResponse(responseMessageData);
Expand All @@ -158,7 +158,10 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
peerTask.postProcessResult(executorResult);
} else {
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId());
"Invalid response {} found for {} from peer {}",
validationResponse,
taskClassName,
peer.getLoggableId());
if (validationResponse.recordUselessResponse()) {
peer.recordUselessResponse(taskClassName);
}
Expand Down Expand Up @@ -212,11 +215,13 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR,
List.of(peer));
}
LOG.debug(
"Executed peer task against {}, response code {}, retries remaining {}",
peer.getLoggableId(),
executorResult.responseCode(),
retriesRemaining);
LOG.atDebug()
.setMessage("Executed peer task {} against {}, response code {}, retries remaining {}")
.addArgument(taskClassName)
.addArgument(peer::getLoggableId)
.addArgument(executorResult::responseCode)
.addArgument(retriesRemaining)
.log();
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import static com.google.common.base.Preconditions.checkArgument;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.SyncBlock;
import org.hyperledger.besu.ethereum.core.SyncTransactionReceipt;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.ethereum.core.encoding.receipt.SyncTransactionReceiptEncoder;
Expand All @@ -32,51 +35,35 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.ethereum.rlp.RLPException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;

public class GetSyncReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<SyncTransactionReceipt>>> {
implements PeerTask<Map<SyncBlock, List<SyncTransactionReceipt>>> {

private final Collection<BlockHeader> blockHeaders;
private final Map<BlockHeader, List<SyncTransactionReceipt>> receiptsByBlockHeader =
new HashMap<>();
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final List<SyncBlock> requestedBlocks;
private final List<BlockHeader> requestedHeaders;
private final long requiredBlockchainHeight;
private final boolean isPoS;
private final SyncTransactionReceiptEncoder syncTransactionReceiptEncoder;

public GetSyncReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders,
final List<SyncBlock> blocks,
final ProtocolSchedule protocolSchedule,
final SyncTransactionReceiptEncoder syncTransactionReceiptEncoder) {
this.blockHeaders = new ArrayList<>(blockHeaders);

// pre-fill any headers with an empty receipts root into the result map
this.blockHeaders.stream()
.filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH))
.forEach(header -> receiptsByBlockHeader.put(header, Collections.emptyList()));
this.blockHeaders.removeAll(receiptsByBlockHeader.keySet());

// group headers by their receipts root hash to reduce total number of receipts hashes requested
// for
this.blockHeaders.forEach(
header ->
headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header));
checkArgument(!blocks.isEmpty(), "Requested block list must not be empty");
this.requestedBlocks = blocks;
this.requestedHeaders = blocks.stream().map(SyncBlock::getHeader).toList();

// calculate the minimum required blockchain height a peer will need to be able to fulfil this
// request
requiredBlockchainHeight =
this.blockHeaders.stream()
requestedHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
Expand All @@ -92,62 +79,31 @@ public SubProtocol getSubProtocol() {

@Override
public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts
// for one of the headers with each unique receipt root.
final List<Hash> blockHashes =
headersByReceiptsRoot.values().stream()
.map(headers -> headers.getFirst().getHash())
.toList();
return GetReceiptsMessage.create(blockHashes);
return GetReceiptsMessage.create(requestedHeaders.stream().map(BlockHeader::getHash).toList());
}

@Override
public Map<BlockHeader, List<SyncTransactionReceipt>> processResponse(
final MessageData messageData)
public Map<SyncBlock, List<SyncTransactionReceipt>> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException, MalformedRlpFromPeerException {
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
throw new InvalidPeerTaskResponseException("Null message data");
}
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData);
final List<List<SyncTransactionReceipt>> receiptsByBlock;
try {
receiptsByBlock = receiptsMessage.syncReceipts();
final List<List<SyncTransactionReceipt>> receivedBlocks = receiptsMessage.syncReceipts();
if (receivedBlocks.size() > requestedBlocks.size()) {
throw new InvalidPeerTaskResponseException("Too many result returned");
}
final Map<SyncBlock, List<SyncTransactionReceipt>> response =
HashMap.newHashMap(receivedBlocks.size());
for (int i = 0; i < receivedBlocks.size(); i++) {
response.put(requestedBlocks.get(i), receivedBlocks.get(i));
}
return response;
} catch (RLPException e) {
// indicates a malformed or unexpected RLP result from the peer
throw new MalformedRlpFromPeerException(e, messageData.getData());
}
// take a copy of the pre-filled receiptsByBlockHeader, to ensure idempotency of subsequent
// calls to processResponse
final Map<BlockHeader, List<SyncTransactionReceipt>> receiptsByHeader =
new HashMap<>(receiptsByBlockHeader);
if (!blockHeaders.isEmpty()) {
if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}

for (final List<SyncTransactionReceipt> receiptsInBlock : receiptsByBlock) {
final List<BlockHeader> blockHeaders =
headersByReceiptsRoot.get(
Util.getRootFromListOfBytes(
receiptsInBlock.stream()
.map(
(r) -> {
Bytes rlp =
r.isFormattedForRootCalculation()
? r.getRlpBytes()
: syncTransactionReceiptEncoder.encodeForRootCalculation(r);
r.clearSubVariables();
return rlp;
})
.toList()));
if (blockHeaders == null) {
// Contains receipts that we didn't request, so mustn't be the response we're looking for.
throw new InvalidPeerTaskResponseException();
}
blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock));
}
}
return receiptsByHeader;
}

@Override
Expand All @@ -157,11 +113,50 @@ public Predicate<EthPeerImmutableAttributes> getPeerRequirementFilter() {

@Override
public PeerTaskValidationResponse validateResult(
final Map<BlockHeader, List<SyncTransactionReceipt>> result) {
final Map<SyncBlock, List<SyncTransactionReceipt>> result) {
if (result.isEmpty()) {
return PeerTaskValidationResponse.NO_RESULTS_RETURNED;
}

for (final Map.Entry<SyncBlock, List<SyncTransactionReceipt>> entry : result.entrySet()) {
final SyncBlock requestedBlock = entry.getKey();
final List<SyncTransactionReceipt> receivedReceiptsForBlock = entry.getValue();

// verify that the receipts count is within bounds for every received block
if (receivedReceiptsForBlock.size() > requestedBlock.getBody().getTransactionCount()) {
return PeerTaskValidationResponse.TOO_MANY_RESULTS_RETURNED;
}

// ensure the calculated receipts root matches the one in the requested block header
if (!receiptsRootMatches(requestedBlock.getHeader(), receivedReceiptsForBlock)) {
return PeerTaskValidationResponse.RESULTS_DO_NOT_MATCH_QUERY;
}
}

return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
}

public Collection<BlockHeader> getBlockHeaders() {
return blockHeaders;
private boolean receiptsRootMatches(
final BlockHeader blockHeader, final List<SyncTransactionReceipt> receipts) {
final Hash calculatedReceiptsRoot =
Util.getRootFromListOfBytes(
receipts.stream()
.map(
(r) -> {
Bytes rlp =
r.isFormattedForRootCalculation()
? r.getRlpBytes()
: syncTransactionReceiptEncoder.encodeForRootCalculation(r);
r.clearSubVariables();
return rlp;
})
.toList());

return calculatedReceiptsRoot.equals(blockHeader.getReceiptsRoot());
}

@VisibleForTesting
public List<SyncBlock> getRequestedBlocks() {
return requestedBlocks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.hyperledger.besu.ethereum.rlp.RLPInput;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.tuweni.bytes.Bytes;

Expand Down Expand Up @@ -57,10 +57,10 @@ public int getCode() {
return EthProtocolMessages.GET_RECEIPTS;
}

public Iterable<Hash> hashes() {
public List<Hash> hashes() {
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final Collection<Hash> hashes = new ArrayList<>();
final List<Hash> hashes = new ArrayList<>();
while (!input.isEndOfCurrentList()) {
hashes.add(Hash.wrap(input.readBytes32()));
}
Expand Down
Loading