Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/eectl/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl<E: ExecEngineCtl> ExecWorkerState<E> {
})
}

#[expect(unused, reason = "will be used later")]
fn update_finalized_tip(&mut self, new_finalized: &L2BlockCommitment) -> EngineResult<()> {
self.call_engine("engine_update_finalized_tip", |eng| {
eng.update_finalized_block(*new_finalized.blkid())?;
Expand Down Expand Up @@ -264,7 +263,7 @@ pub fn worker_task_inner<E: ExecEngineCtl>(
let _ = completion.send(res);
}
ExecCommand::NewFinalizedTip(ts, completion) => {
let res = state.update_safe_tip(&ts);
let res = state.update_finalized_tip(&ts);
let _ = completion.send(res);
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/reth/exex/src/prover_exex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ impl<
.get(&block_number)
.map(|block| block.hash())
.zip(chain.execution_outcome_at_block(block_number))
.map(|(hash, outcome)| (block_number, hash, outcome))
});

for (block_hash, outcome) in bundles {
for (block_number, block_hash, outcome) in bundles {
#[cfg(debug_assertions)]
assert!(outcome.len() == 1, "should only contain single block");

Expand All @@ -71,7 +72,7 @@ impl<
break;
}

finished_height = Some(BlockNumHash::new(outcome.first_block(), block_hash))
finished_height = Some(BlockNumHash::new(block_number, block_hash))
}

Ok(finished_height)
Expand Down
32 changes: 31 additions & 1 deletion functional-tests-new/common/services/strata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import logging
from typing import TypedDict
from typing import Any, TypedDict

from common.rpc import JsonRpcClient
from common.rpc_types.strata import *
Expand Down Expand Up @@ -92,6 +92,36 @@ def wait_for_rpc_ready(
wait_until(lambda: rpc.call(method) is not None, error_with=err, timeout=timeout)
return rpc

def wait_for_account_genesis_epoch_commitment(
self,
account_id: int,
rpc: JsonRpcClient | None = None,
timeout: int = 20,
poll_interval: float = 0.5,
) -> Any:
"""
Wait until an account's genesis epoch commitment is available.

Args:
account_id: Account identifier to query.
rpc: Optional RPC client. If None, creates a new one.
timeout: Maximum time to wait in seconds.
poll_interval: How often to poll.

Returns:
The genesis epoch commitment returned by the RPC once available.
"""
if rpc is None:
rpc = self.create_rpc()

return wait_until_with_value(
lambda: rpc.strata_getAccountGenesisEpochCommitment(account_id),
lambda commitment: commitment is not None,
error_with=f"Timed out waiting for account {account_id} genesis commitment",
timeout=timeout,
step=poll_interval,
)

def get_sync_status(self, rpc: JsonRpcClient | None = None) -> ChainSyncStatus:
"""
Get the current block height from chain status.
Expand Down
125 changes: 125 additions & 0 deletions functional-tests-new/tests/alpen_client/test_exex_wal_pruning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Verify ExEx WAL files are pruned in EL<>OL mode."""

import glob
import logging
import os

import flexitest

from common.accounts import get_dev_account
from common.base_test import BaseTest
from common.config.constants import ALPEN_ACCOUNT_ID, ServiceType
from common.evm_utils import create_funded_account, send_raw_transaction, wait_for_receipt
from common.services import AlpenClientService, BitcoinService, StrataService
from common.wait import wait_until, wait_until_with_value

logger = logging.getLogger(__name__)


def _list_wal_files(datadir: str) -> set[str]:
"""List ExEx WAL files under node datadir."""
pattern = os.path.join(datadir, "**", "exex", "wal", "*.wal")
return set(glob.glob(pattern, recursive=True))


def _wal_file_id(path: str) -> int:
return int(os.path.basename(path).removesuffix(".wal"))


@flexitest.register
class TestExexWalPruning(BaseTest):
"""Check that ExEx eventually prunes old WAL files."""

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env("el_ol")

def main(self, ctx):
ee_sequencer: AlpenClientService = self.get_service(ServiceType.AlpenSequencer)
strata_seq: StrataService = self.get_service(ServiceType.Strata)
bitcoin: BitcoinService = self.get_service(ServiceType.Bitcoin)

ee_rpc = ee_sequencer.create_rpc()
strata_rpc = strata_seq.wait_for_rpc_ready(timeout=10)
btc_rpc = bitcoin.create_rpc()

strata_seq.wait_for_account_genesis_epoch_commitment(
ALPEN_ACCOUNT_ID,
rpc=strata_rpc,
timeout=20,
)

# Give EE enough time to produce several blocks so ExEx WAL files accumulate.
ee_sequencer.wait_for_block(10, timeout=60)
wal_dir_root = ee_sequencer.props["datadir"]
wal_files_before = wait_until_with_value(
lambda: _list_wal_files(wal_dir_root),
lambda files: len(files) > 0,
error_with="Expected ExEx WAL files to exist before pruning check",
timeout=60,
step=2,
)
logger.info("Captured %s WAL files before pruning check", len(wal_files_before))

# Submit a few txs to ensure there are non-empty blocks while pruning is observed.
dev_account = get_dev_account(ee_rpc)
sender = create_funded_account(ee_rpc, dev_account, 3 * 10**18)
recipient = "0x000000000000000000000000000000000000dEaD"
gas_price = int(ee_rpc.eth_gasPrice(), 16)
for _ in range(3):
raw_tx = sender.sign_transfer(
to=recipient,
value=1_000_000_000,
gas_price=gas_price,
gas=21_000,
)
tx_hash = send_raw_transaction(ee_rpc, raw_tx)
wait_for_receipt(ee_rpc, tx_hash, timeout=30)

mine_address = btc_rpc.proxy.getnewaddress()

def wal_files_pruned() -> bool:
# Track only files from the original snapshot. New WAL files can appear while
# blocks are still being produced, so a shrinking total file count would be a
# flaky signal. A set difference tells us whether any preexisting file was
# deleted regardless of how many new files were created meanwhile.
btc_rpc.proxy.generatetoaddress(2, mine_address)
current = _list_wal_files(wal_dir_root)
pruned_files = wal_files_before - current
if pruned_files:
logger.info(
"Observed pruning of WAL IDs: %s",
sorted(_wal_file_id(path) for path in pruned_files),
)
return bool(pruned_files)

wait_until(
wal_files_pruned,
error_with=(
"No preexisting ExEx WAL files were pruned while L1 kept advancing. "
"FinishedHeight may be reporting incorrect block numbers."
),
timeout=90,
step=2,
)

wal_files_after = _list_wal_files(wal_dir_root)
pruned_files = wal_files_before - wal_files_after
remaining_original = wal_files_before & wal_files_after
pruned_ids = sorted(_wal_file_id(f) for f in pruned_files)
remaining_ids = sorted(_wal_file_id(f) for f in remaining_original)

logger.info(
"WAL files: %s before, %s after, pruned IDs: %s, remaining original IDs: %s",
len(wal_files_before),
len(wal_files_after),
pruned_ids,
remaining_ids,
)

if remaining_ids:
assert max(pruned_ids) < min(remaining_ids), (
"Pruning did not remove the oldest files first. "
f"Pruned IDs: {pruned_ids}, Remaining original IDs: {remaining_ids}"
)

return True
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def main(self, ctx):
# Wait for chains to be active
logger.info("Waiting for Strata RPC to be ready...")
strata_rpc = strata_seq.wait_for_rpc_ready(timeout=10)
logger.info("Waiting for Alpen account genesis commitment...")
strata_seq.wait_for_account_genesis_epoch_commitment(
ALPEN_ACCOUNT_ID,
rpc=strata_rpc,
timeout=20,
)
alpen_seq.wait_for_block(5, timeout=60)

# Get alpen account summary at epoch 0 which should be none
Expand Down
7 changes: 6 additions & 1 deletion functional-tests/factory/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ def create_exec_client(
if sequencer_reth_rpc is not None:
cmd.extend(["--sequencer-http", sequencer_reth_rpc])

props = {"rpc_port": authrpc_port, "eth_rpc_http_port": ethrpc_http_port}
props = {
"rpc_port": authrpc_port,
"eth_rpc_http_port": ethrpc_http_port,
"datadir": datadir,
}

ethrpc_url = f"ws://localhost:{ethrpc_ws_port}"

Expand Down Expand Up @@ -389,6 +393,7 @@ def _restore_snapshot(idx: int):
svc.create_web3 = _create_web3
svc.snapshot_datadir = _snapshot_datadir
svc.restore_snapshot = _restore_snapshot
svc.datadir = datadir

return svc

Expand Down
111 changes: 111 additions & 0 deletions functional-tests/tests/el_exex_wal_pruning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import glob
import logging
import os

import flexitest
from web3 import Web3

from envs import net_settings, testenv
from utils import ProverClientSettings, wait_until


def send_tx(web3: Web3):
dest = web3.to_checksum_address("deedf001900dca3ebeefdeadf001900dca3ebeef")
txid = web3.eth.send_transaction(
{
"to": dest,
"value": hex(1),
"gas": hex(21000),
"from": web3.address,
}
)
web3.eth.wait_for_transaction_receipt(txid, timeout=30)


@flexitest.register
class ElExexWalPruningTest(testenv.StrataTestBase):
"""
Verifies that ExEx WAL files are pruned after epoch finalization.

The ProverWitnessGenerator ExEx emits FinishedHeight after processing each block.
Once an epoch is finalized (via forkchoiceUpdated with finalizedBlockHash), reth
should call finalize_wal() and delete old WAL files.

This test catches a regression where FinishedHeight reported block number 0
instead of the actual block number, causing WAL files to never be pruned.
"""

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env(
testenv.BasicEnvConfig(
110,
prover_client_settings=ProverClientSettings.new_with_proving(),
rollup_settings=net_settings.get_fast_batch_settings(),
)
)

def main(self, ctx: flexitest.RunContext):
seqrpc = ctx.get_service("sequencer").create_rpc()
reth = ctx.get_service("reth")
rethrpc = reth.create_rpc()
reth_waiter = self.create_reth_waiter(rethrpc)
seq_waiter = self.create_strata_waiter(seqrpc, timeout=20, interval=2)

seq_waiter.wait_until_genesis()

# WAL directory is under reth's datadir.
reth_datadir = reth.get_prop("datadir")
wal_dir = os.path.join(reth_datadir, "exex", "wal")

# Wait for some blocks to be produced so WAL files accumulate
reth_waiter.wait_until_eth_block_exceeds(10, message="blocks not advancing")

# Record WAL files before finalization
wal_files_before = set(glob.glob(os.path.join(wal_dir, "*.wal")))
logging.info(f"WAL files before finalization: {len(wal_files_before)}")
assert wal_files_before, "Expected WAL files to exist before finalization"

# Send some transactions to ensure non-empty blocks (workaround for reth restart issues)
web3 = reth.create_web3()
for _ in range(3):
send_tx(web3)

# Epoch 1 finalization triggers forkchoiceUpdated with finalizedBlockHash,
# which triggers finalize_wal() in the ExEx manager.
seq_waiter.wait_until_epoch_finalized(1, timeout=120)
logging.info("Epoch 1 finalized")

# Wait for reth to process the finalization and prune WAL
def wal_files_pruned():
current = set(glob.glob(os.path.join(wal_dir, "*.wal")))
return len(wal_files_before - current) > 0

wait_until(
wal_files_pruned,
error_with="No WAL files were pruned after finalization. "
"FinishedHeight may be reporting incorrect block numbers.",
timeout=90,
)

# Verify WAL pruning by tracking specific files by their IDs.
wal_files_after = set(glob.glob(os.path.join(wal_dir, "*.wal")))
pruned_files = wal_files_before - wal_files_after
remaining_original = wal_files_before & wal_files_after

def wal_file_id(path):
return int(os.path.basename(path).removesuffix(".wal"))

pruned_ids = sorted(wal_file_id(f) for f in pruned_files)
remaining_ids = sorted(wal_file_id(f) for f in remaining_original)

logging.info(
f"WAL files: {len(wal_files_before)} before, {len(wal_files_after)} after, "
f"pruned IDs: {pruned_ids}, remaining original IDs: {remaining_ids}"
)

# Pruned IDs must all be lower than surviving IDs.
if remaining_ids:
assert max(pruned_ids) < min(remaining_ids), (
f"Pruning did not remove the oldest files first. "
f"Pruned IDs: {pruned_ids}, Remaining original IDs: {remaining_ids}"
)
2 changes: 1 addition & 1 deletion functional-tests/tests/sync/sync_bitcoin_reorg.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def main(self, ctx: flexitest.RunContext):
cfg: RollupConfig = ctx.env.rollup_cfg()
finality_depth = cfg.l1_reorg_safe_depth

seq_waiter.wait_for_genesis()
seq_waiter.wait_until_genesis()

# Wait for prover
wait_until(
Expand Down
Loading