Defer Snappy decompression of P2P messages until worker thread processing#10048
Defer Snappy decompression of P2P messages until worker thread processing#10048pinges wants to merge 13 commits intobesu-eth:mainfrom
Conversation
…sing RawMessage now supports a compressed constructor that defers Snappy decompression until getData() is first called on the worker thread. Messages stay in their compressed form while queued in the tx worker pool. Additionally, worker threads now skip processing for messages from already-disconnected peers, and decompression/deserialization failures disconnect the peer with BREACH_OF_PROTOCOL. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
…sing RawMessage now supports a compressed constructor that defers Snappy decompression until getData() is first called on the worker thread. Messages stay in their compressed form while queued in the tx worker pool. Additionally, worker threads now skip processing for messages from already-disconnected peers, and decompression/deserialization failures disconnect the peer with BREACH_OF_PROTOCOL. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
…compressed Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
There was a problem hiding this comment.
Pull request overview
This PR defers Snappy decompression of inbound P2P messages from the Netty I/O thread to worker-thread processing by introducing lazy decompression in RawMessage, reducing queue memory pressure and avoiding accidental eager decompression in trace logging. It also adds worker-side checks/handling for disconnected peers and malformed message payloads.
Changes:
- Add lazy decompression + header-only sizing to
RawMessageand relaxAbstractMessageDataimmutability to support it. - Update
Framerto pass compressed payloads through (after initial negotiation/validation) for deferred decompression. - Guard trace logging and add worker-thread disconnected/malformed-message handling to skip work and disconnect misbehaving peers.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| ethereum/p2p/.../wire/RawMessage.java | Adds lazy Snappy decompression and size calculation from Snappy header. |
| ethereum/p2p/.../wire/AbstractMessageData.java | Makes data mutable and getSize() overridable to support lazy decode. |
| ethereum/p2p/.../framing/Framer.java | Stops eagerly decompressing most messages; passes compressed bytes to RawMessage. |
| ethereum/p2p/.../netty/ApiHandler.java | Avoids evaluating message.getData() on Netty I/O thread when trace is disabled. |
| ethereum/eth/.../TransactionsMessageHandler.java | Defers decoding to tx worker; adds disconnected-peer skip + malformed-message disconnect. |
| ethereum/eth/.../NewPooledTransactionHashesMessageHandler.java | Same worker-side skip + malformed-message disconnect for pooled hash messages. |
You can also share your feedback on Copilot code review. Take the survey.
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
...src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java
Show resolved
Hide resolved
...org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Outdated
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/AbstractMessageData.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/AbstractMessageData.java
Show resolved
Hide resolved
- Restore `data` to final in AbstractMessageData to preserve immutability contract for all subclasses - Use private volatile fields and a `decompressed` flag in RawMessage instead of mutating parent state or using fragile reference equality - Narrow try/catch in TransactionsMessageHandler and NewPooledTransactionHashesMessageHandler to only cover the decode step, avoiding false-positive peer disconnects from processing exceptions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
There was a problem hiding this comment.
Pull request overview
This PR defers Snappy decompression of inbound RLPx P2P messages from the Netty I/O thread to worker threads by introducing lazy decompression in RawMessage, reducing queued-memory pressure and adding additional guards around logging and malformed/queued message handling.
Changes:
- Add lazy Snappy decompression support to
RawMessage(decompress on firstgetData(), size via Snappy frame header). - Update
Framerto emit compressedRawMessageinstances (except for first-message fallback / validation logic). - Avoid eager decompression on the I/O thread via
isTraceEnabled()guard and add disconnected/malformed message handling in tx-related message handlers.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java | Introduces lazy decompression and size reporting without full decompression. |
| ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/AbstractMessageData.java | Allows subclasses (e.g., RawMessage) to override getSize(). |
| ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/framing/Framer.java | Emits lazily-decompressed RawMessage for subsequent compressed frames. |
| ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/ApiHandler.java | Prevents trace logging from forcing eager getData() evaluation. |
| ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java | Defers message parsing to worker, skips disconnected peers, disconnects on malformed messages. |
| ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java | Same as above for pooled transaction hash announcements. |
You can also share your feedback on Copilot code review. Take the survey.
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Outdated
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Outdated
Show resolved
Hide resolved
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
macfarla
left a comment
There was a problem hiding this comment.
Could you add a test for the key behavioral change - first message decompresses eagerly, subsequent messages are lazy?
Also maybe a changelog entry?
- Defer Snappy decompression of inbound P2P messages from the Netty I/O thread to the worker thread, reducing memory held in the transaction worker queue to compressed size
ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/RawMessage.java
Show resolved
Hide resolved
Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
PR description
messages from the Netty I/O thread to the worker
thread that processes them, reducing memory
held in the transaction worker queue from
decompressed size to compressed size
compressed bytes are stored as-is and only
decompressed on the first getData() call;
getSize() reads the Snappy frame header without
decompressing
isTraceEnabled() to prevent eager getData()
evaluation on the Netty I/O thread
in TransactionsMessageHandler and
NewPooledTransactionHashesMessageHandler to skip
work for peers that disconnected while queued,
and disconnect peers that send malformed
messages