Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.StreamingJsonRpcSuccessResponse;
import org.hyperledger.besu.plugin.services.rpc.RpcResponseType;

import java.io.IOException;
Expand Down Expand Up @@ -72,6 +73,22 @@ private void handleJsonObjectResponse(
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == RpcResponseType.NONE) {
response.end();
} else if (jsonRpcResponse instanceof StreamingJsonRpcSuccessResponse streamingResponse) {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
final JsonGenerator generator =
getJsonObjectMapper().getFactory().createGenerator(streamer);
try {
streamingResponse.writeTo(generator);
} finally {
try {
generator.close();
} catch (final IOException ignored) {
// Generator close flushes the buffer, which may fail if the connection was
// reset mid-stream. Swallow it — the primary exception is already propagating.
}
}
}
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,46 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods;

import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;
import static org.hyperledger.besu.ethereum.mainnet.feemarket.ExcessBlobGasCalculator.calculateExcessBlobGasForParent;

import org.hyperledger.besu.datatypes.BlobGas;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.JsonRpcParameter;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.TransactionTraceParams;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.Tracer;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.TransactionTrace;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.DebugTraceTransactionResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.StreamingJsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.debug.TraceOptions;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ImmutableTransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.vm.DebugOperationTracer;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.ethereum.mainnet.block.access.list.AccessLocationTracker;
import org.hyperledger.besu.ethereum.mainnet.block.access.list.BlockAccessList;
import org.hyperledger.besu.ethereum.processing.TransactionProcessingResult;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.vm.StreamingDebugOperationTracer;
import org.hyperledger.besu.evm.blockhash.BlockHashLookup;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.services.pipeline.Pipeline;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import com.google.common.base.Suppliers;

public abstract class AbstractDebugTraceBlock implements JsonRpcMethod {

private final ProtocolSchedule protocolSchedule;
private final LabelledMetric<Counter> outputCounter;
private final Supplier<BlockchainQueries> blockchainQueriesSupplier;
private final EthScheduler ethScheduler;

public AbstractDebugTraceBlock(
final ProtocolSchedule protocolSchedule,
Expand All @@ -62,14 +62,6 @@ public AbstractDebugTraceBlock(
final EthScheduler ethScheduler) {
this.blockchainQueriesSupplier = Suppliers.ofInstance(blockchainQueries);
this.protocolSchedule = protocolSchedule;
this.outputCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.BLOCKCHAIN,
"transactions_debugTraceblock_pipeline_processed_total",
"Number of transactions processed for each block",
"step",
"action");
this.ethScheduler = ethScheduler;
}

protected BlockchainQueries getBlockchainQueries() {
Expand Down Expand Up @@ -97,61 +89,103 @@ protected TraceOptions getTraceOptions(final JsonRpcRequestContext requestContex
return traceOptions;
}

protected Collection<DebugTraceTransactionResult> getTraces(
final JsonRpcRequestContext requestContext,
final TraceOptions traceOptions,
final Optional<Block> maybeBlock) {
return maybeBlock
.flatMap(
block ->
Tracer.processTracing(
getBlockchainQueries(),
Optional.of(block.getHeader()),
traceableState -> {
List<DebugTraceTransactionResult> tracesList =
Collections.synchronizedList(new ArrayList<>());
final ProtocolSpec protocolSpec =
protocolSchedule.getByBlockHeader(block.getHeader());
final MainnetTransactionProcessor transactionProcessor =
protocolSpec.getTransactionProcessor();
final TraceBlock.ChainUpdater chainUpdater =
new TraceBlock.ChainUpdater(traceableState);

TransactionSource transactionSource = new TransactionSource(block);
DebugOperationTracer debugOperationTracer =
new DebugOperationTracer(traceOptions.opCodeTracerConfig(), true);
ExecuteTransactionStep executeTransactionStep =
new ExecuteTransactionStep(
chainUpdater,
transactionProcessor,
getBlockchainQueries().getBlockchain(),
debugOperationTracer,
protocolSpec,
block);

Pipeline<TransactionTrace> traceBlockPipeline =
createPipelineFrom(
"getTransactions",
transactionSource,
4,
outputCounter,
false,
"debug_trace_block")
.thenProcess("executeTransaction", executeTransactionStep)
.thenProcessAsyncOrdered(
"debugTraceTransactionStep",
DebugTraceTransactionStepFactory.createAsync(
traceOptions, protocolSpec),
4)
.andFinishWith("collect_results", tracesList::add);

try {
ethScheduler.startPipeline(traceBlockPipeline).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return Optional.of(tracesList);
}))
.orElse(null);
/**
* Creates a streaming response that writes each transaction trace directly to the JSON output.
* Uses StreamingDebugOperationTracer which writes each struct log inline during EVM execution,
* achieving O(1) frame memory. Memory is only captured for the 24 opcodes that touch it.
*/
protected StreamingJsonRpcSuccessResponse.ResultWriter getStreamingTraces(
final TraceOptions traceOptions, final Optional<Block> maybeBlock) {
return generator -> {
if (maybeBlock.isEmpty()) {
generator.writeNull();
return;
}
final Block block = maybeBlock.get();

generator.writeStartArray();
final AtomicBoolean lambdaEntered = new AtomicBoolean(false);

final Optional<?> tracingResult = Tracer.processTracing(
getBlockchainQueries(),
Optional.of(block.getHeader()),
traceableState -> {
lambdaEntered.set(true);
final ProtocolSpec protocolSpec =
protocolSchedule.getByBlockHeader(block.getHeader());
final MainnetTransactionProcessor transactionProcessor =
protocolSpec.getTransactionProcessor();
final TraceBlock.ChainUpdater chainUpdater =
new TraceBlock.ChainUpdater(traceableState);
final BlockHeader header = block.getHeader();
final Optional<BlockHeader> maybeParentHeader =
getBlockchainQueries().getBlockchain().getBlockHeader(header.getParentHash());
final Wei blobGasPrice =
protocolSpec
.getFeeMarket()
.blobGasPricePerGas(
maybeParentHeader
.map(parent -> calculateExcessBlobGasForParent(protocolSpec, parent))
.orElse(BlobGas.ZERO));
final BlockHashLookup blockHashLookup =
protocolSpec.getPreExecutionProcessor().createBlockHashLookup(
getBlockchainQueries().getBlockchain(), header);

try {
for (final Transaction tx : block.getBody().getTransactions()) {
generator.writeStartObject();
generator.writeStringField("txHash", tx.getHash().toHexString());
generator.writeFieldName("result");
generator.writeStartObject();

generator.writeFieldName("structLogs");
generator.writeStartArray();

final StreamingDebugOperationTracer tracer =
new StreamingDebugOperationTracer(
traceOptions.opCodeTracerConfig(), true, generator);

final AccessLocationTracker accessListTracker =
BlockAccessList.BlockAccessListBuilder
.createTransactionAccessLocationTracker(0);
final TransactionProcessingResult result =
transactionProcessor.processTransaction(
chainUpdater.getNextUpdater(),
header,
tx,
header.getCoinbase(),
tracer,
blockHashLookup,
ImmutableTransactionValidationParams.builder().build(),
blobGasPrice,
Optional.of(accessListTracker));

generator.writeEndArray(); // structLogs

final long gas = tx.getGasLimit() - result.getGasRemaining();
final String returnValue = result.getOutput().toString().substring(2);
generator.writeNumberField("gas", gas);
generator.writeBooleanField("failed", !result.isSuccessful());
generator.writeStringField("returnValue", returnValue);

generator.writeEndObject(); // result
generator.writeEndObject(); // tx
generator.flush();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return Optional.of(Boolean.TRUE);
});

if (tracingResult.isEmpty() && lambdaEntered.get()) {
// Lambda ran but failed (e.g. connection reset mid-trace). The generator is in an
// unknown nested context — abandon the response rather than producing broken JSON.
throw new IOException("debug_traceBlock failed mid-stream: tracing aborted");
}
// Either tracing succeeded (all tx objects closed cleanly) or worldstate was unavailable
// (lambda never ran, generator is at top-level array context → produces []).
generator.writeEndArray();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.JsonRpcParameter.JsonRpcParameterException;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.DebugTraceTransactionResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.StreamingJsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
Expand All @@ -34,7 +33,6 @@
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;

import java.util.Collection;
import java.util.Optional;

import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -80,9 +78,9 @@ public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
.getBlockchain()
.getBlockByHash(block.getHeader().getParentHash())
.isPresent()) {
final Collection<DebugTraceTransactionResult> results =
getTraces(requestContext, traceOptions, Optional.ofNullable(block));
return new JsonRpcSuccessResponse(requestContext.getRequest().getId(), results);
return new StreamingJsonRpcSuccessResponse(
requestContext.getRequest().getId(),
getStreamingTraces(traceOptions, Optional.ofNullable(block)));
} else {
return new JsonRpcErrorResponse(
requestContext.getRequest().getId(), RpcErrorType.PARENT_BLOCK_NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.JsonRpcParameter.JsonRpcParameterException;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.DebugTraceTransactionResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.StreamingJsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.debug.TraceOptions;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;

import java.util.Collection;
import java.util.Optional;

public class DebugTraceBlockByHash extends AbstractDebugTraceBlock {
Expand Down Expand Up @@ -61,8 +59,8 @@ public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
TraceOptions traceOptions = getTraceOptions(requestContext);
Optional<Block> maybeBlock = getBlockchainQueries().getBlockchain().getBlockByHash(blockHash);

final Collection<DebugTraceTransactionResult> results =
getTraces(requestContext, traceOptions, maybeBlock);
return new JsonRpcSuccessResponse(requestContext.getRequest().getId(), results);
return new StreamingJsonRpcSuccessResponse(
requestContext.getRequest().getId(),
getStreamingTraces(traceOptions, maybeBlock));
}
}
Loading
Loading