diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 06541dc35f0cc..0fbd564f95d87 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -5883,6 +5883,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "kona-karst-derive" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "kona-derive", + "kona-genesis", + "kona-protocol", + "tracing", +] + [[package]] name = "kona-macros" version = "0.1.2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b46e8f85db595..bbbd79cb611ce 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -237,6 +237,7 @@ kona-genesis = { path = "kona/crates/protocol/genesis", version = "0.4.5", defau kona-protocol = { path = "kona/crates/protocol/protocol", version = "0.4.5", default-features = false } kona-registry = { path = "kona/crates/protocol/registry", version = "0.4.5", default-features = false } kona-hardforks = { path = "kona/crates/protocol/hardforks", version = "0.4.5", default-features = false } +kona-karst-derive = { path = "kona/crates/protocol/karst-derive", version = "0.1.0", default-features = false } # Node kona-rpc = { path = "kona/crates/node/rpc", version = "0.3.2", default-features = false } diff --git a/rust/kona/crates/protocol/karst-derive/Cargo.toml b/rust/kona/crates/protocol/karst-derive/Cargo.toml new file mode 100644 index 0000000000000..a6c1e3b49849c --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "kona-karst-derive" +version = "0.1.0" +description = "Flat, inlined OP Stack derivation pipeline (Holocene+ only)" + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +[lints] +workspace = true + +[dependencies] +# Protocol +kona-genesis.workspace = true +kona-protocol.workspace = true +kona-derive.workspace = true + +# Alloy +alloy-primitives.workspace = true + +# General +tracing.workspace = true + +[dev-dependencies] + +[features] +default = [] diff --git a/rust/kona/crates/protocol/karst-derive/src/attributes.rs b/rust/kona/crates/protocol/karst-derive/src/attributes.rs new file mode 100644 index 0000000000000..a89548ae75e5a --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/attributes.rs @@ -0,0 +1,89 @@ +//! Attributes construction: transforming batches into payload attributes. + +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineResult, ResetError, +}; +use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; +use tracing::info; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Returns the next `OpAttributesWithParent` for the given parent block. + /// + /// This is the top-level method that: + /// 1. Loads a pending batch (validating it if needed) + /// 2. Transforms the batch into payload attributes + /// 3. Returns the populated attributes + pub(crate) async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> PipelineResult { + // Load a batch if we don't have one pending. + if self.pending_batch.is_none() { + let batch = self.validate_and_get_batch(parent).await?; + self.pending_batch = Some(batch); + self.is_last_in_span = self.single_batch_buffer.is_empty(); + } + + let batch = self + .pending_batch + .take() + .ok_or(PipelineError::Eof.temp())?; + + // Sanity check parent hash. + if batch.parent_hash != parent.block_info.hash { + return Err(ResetError::BadParentHash(batch.parent_hash, parent.block_info.hash).into()); + } + + // Sanity check timestamp. + let expected_timestamp = parent.block_info.timestamp + self.cfg.block_time; + if batch.timestamp != expected_timestamp { + return Err(ResetError::BadTimestamp(batch.timestamp, expected_timestamp).into()); + } + + // Build payload attributes. + let tx_count = batch.transactions.len(); + let mut attributes = self + .attributes_builder + .prepare_payload_attributes(parent, batch.epoch()) + .await?; + + attributes.no_tx_pool = Some(true); + match attributes.transactions { + Some(ref mut txs) => txs.extend(batch.transactions), + None => { + if !batch.transactions.is_empty() { + attributes.transactions = Some(batch.transactions); + } + } + } + + info!( + target: "karst", + txs = tx_count, + timestamp = batch.timestamp, + "generated attributes in payload queue", + ); + + let origin = self.l1_origin.ok_or(PipelineError::MissingOrigin.crit())?; + let populated = OpAttributesWithParent::new( + attributes, + parent, + Some(origin), + self.is_last_in_span, + ); + + // Clear pending state (batch already consumed by take() above). + self.is_last_in_span = false; + + Ok(populated) + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/batch.rs b/rust/kona/crates/protocol/karst-derive/src/batch.rs new file mode 100644 index 0000000000000..20bc66c3c68bc --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/batch.rs @@ -0,0 +1,234 @@ +//! Batch processing: span batch decomposition and batch validation. + +use alloc::vec::Vec; + +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineErrorKind, PipelineResult, ResetError, +}; +use kona_protocol::{ + Batch, BatchValidity, BatchWithInclusionBlock, L2BlockInfo, SingleBatch, +}; +use tracing::{debug, error, info, warn}; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Pulls a single batch, decomposing span batches if necessary. + /// + /// This is the Holocene-only batch stream logic (always active). + pub(crate) async fn pull_single_batch( + &mut self, + parent: L2BlockInfo, + ) -> PipelineResult { + // If we have buffered single batches, return the next one. + if let Some(batch) = self.single_batch_buffer.pop_front() { + return Ok(batch); + } + + // Pull a batch from the channel reader. + let origin = self.l1_origin.ok_or(PipelineError::MissingOrigin.crit())?; + let batch = self.next_batch_from_reader().await?; + let batch_with_inclusion = BatchWithInclusionBlock::new(origin, batch); + + match batch_with_inclusion.batch { + Batch::Single(b) => Ok(b), + Batch::Span(b) => { + // Validate the span batch prefix. + let (validity, _) = b + .check_batch_prefix( + self.cfg.as_ref(), + self.l1_blocks.as_ref(), + parent, + &batch_with_inclusion.inclusion_block, + &mut self.l2_provider, + ) + .await; + + match validity { + BatchValidity::Accept => { + // Decompose span batch into single batches. + match b.get_singular_batches(self.l1_blocks.as_ref(), parent) { + Ok(singles) => { + self.single_batch_buffer.extend(singles); + self.single_batch_buffer + .pop_front() + .ok_or(PipelineError::NotEnoughData.temp()) + } + Err(e) => { + warn!(target: "karst", "Span batch decomposition failed: {}", e); + self.flush_channel(); + Err(PipelineError::NotEnoughData.temp()) + } + } + } + BatchValidity::Drop(_) => { + self.flush_channel(); + Err(PipelineError::NotEnoughData.temp()) + } + BatchValidity::Past + | BatchValidity::Undecided + | BatchValidity::Future => Err(PipelineError::NotEnoughData.temp()), + } + } + } + } + + /// Returns `true` if the pipeline origin is behind the parent's L1 origin. + fn origin_behind(&self, parent: &L2BlockInfo) -> bool { + self.l1_origin + .is_none_or(|origin| origin.number < parent.l1_origin.number) + } + + /// Updates the batch validator's view of L1 origin blocks. + pub(crate) fn update_origins(&mut self, parent: &L2BlockInfo) -> PipelineResult<()> { + let origin_behind = self.origin_behind(parent); + + if self.batch_origin != self.l1_origin { + self.batch_origin = self.l1_origin; + if origin_behind { + self.l1_blocks.clear(); + } else { + let origin = self.batch_origin.ok_or(PipelineError::MissingOrigin.crit())?; + self.l1_blocks.push(origin); + } + debug!( + target: "karst", + "Advancing batch validator origin to L1 block #{}.{}", + self.batch_origin.map(|b| b.number).unwrap_or_default(), + if origin_behind { " (origin behind)" } else { "" } + ); + } + + // Advance epoch if parent has moved past the first L1 block. + if !self.l1_blocks.is_empty() && parent.l1_origin.number > self.l1_blocks[0].number { + for (i, block) in self.l1_blocks.iter().enumerate() { + if parent.l1_origin.number == block.number { + self.l1_blocks.drain(0..i); + debug!(target: "karst", "Advancing internal L1 epoch"); + break; + } + } + } + + Ok(()) + } + + /// Attempts to derive an empty batch when the sequencing window has expired. + pub(crate) fn try_derive_empty_batch( + &mut self, + parent: &L2BlockInfo, + ) -> PipelineResult { + let epoch = self.l1_blocks[0]; + + let stage_origin = self.batch_origin.ok_or(PipelineError::MissingOrigin.crit())?; + let expiry_epoch = epoch.number + self.cfg.seq_window_size; + let force_empty_batches = expiry_epoch <= stage_origin.number; + let first_of_epoch = epoch.number == parent.l1_origin.number + 1; + let next_timestamp = parent.block_info.timestamp + self.cfg.block_time; + + if !force_empty_batches { + return Err(PipelineError::Eof.temp()); + } + + if self.l1_blocks.len() < 2 { + return Err(PipelineError::Eof.temp()); + } + + let next_epoch = self.l1_blocks[1]; + + if next_timestamp < next_epoch.timestamp || first_of_epoch { + info!(target: "karst", "Generating empty batch for epoch #{}", epoch.number); + return Ok(SingleBatch { + parent_hash: parent.block_info.hash, + epoch_num: epoch.number, + epoch_hash: epoch.hash, + timestamp: next_timestamp, + transactions: Vec::new(), + }); + } + + debug!( + target: "karst", + "Advancing batch validator epoch: {}, timestamp: {}, epoch timestamp: {}", + next_epoch.number, next_timestamp, next_epoch.timestamp + ); + self.l1_blocks.remove(0); + Err(PipelineError::Eof.temp()) + } + + /// Validates and returns the next batch for the given parent. + /// + /// This combines the batch stream (decomposition) and batch validator logic. + pub(crate) async fn validate_and_get_batch( + &mut self, + parent: L2BlockInfo, + ) -> PipelineResult { + // Update L1 origin blocks. + self.update_origins(&parent)?; + + let stage_origin = self.batch_origin.ok_or(PipelineError::MissingOrigin.crit())?; + + // If origin is behind parent, drain to catch up. + if self.origin_behind(&parent) || parent.l1_origin.number == stage_origin.number { + self.pull_single_batch(parent).await?; + return Err(PipelineError::NotEnoughData.temp()); + } + + // Need at least 2 L1 blocks for validation. + if self.l1_blocks.len() < 2 { + return Err(PipelineError::MissingOrigin.crit()); + } + + let epoch = self.l1_blocks[0]; + if parent.l1_origin != epoch.id() && parent.l1_origin.number != epoch.number - 1 { + return Err(PipelineErrorKind::Reset(ResetError::L1OriginMismatch( + parent.l1_origin.number, + epoch.number - 1, + ))); + } + + // Pull the next batch. + let mut next_batch = match self.pull_single_batch(parent).await { + Ok(batch) => batch, + Err(PipelineErrorKind::Temporary(PipelineError::Eof)) => { + return self.try_derive_empty_batch(&parent); + } + Err(e) => return Err(e), + }; + next_batch.parent_hash = parent.block_info.hash; + + // Validate the batch. + match next_batch.check_batch( + self.cfg.as_ref(), + self.l1_blocks.as_ref(), + parent, + &stage_origin, + ) { + BatchValidity::Accept => { + info!(target: "karst", "Found next batch (epoch #{})", next_batch.epoch_num); + Ok(next_batch) + } + BatchValidity::Past => { + warn!(target: "karst", "Dropping old batch"); + Err(PipelineError::NotEnoughData.temp()) + } + BatchValidity::Drop(reason) => { + warn!(target: "karst", "Invalid batch ({}), flushing channel.", reason); + self.flush_channel(); + Err(PipelineError::NotEnoughData.temp()) + } + BatchValidity::Undecided => Err(PipelineError::NotEnoughData.temp()), + BatchValidity::Future => { + error!(target: "karst", "Future batch detected."); + Err(PipelineError::InvalidBatchValidity.crit()) + } + } + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/channel.rs b/rust/kona/crates/protocol/karst-derive/src/channel.rs new file mode 100644 index 0000000000000..f3acaa578f64f --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/channel.rs @@ -0,0 +1,109 @@ +//! Channel assembly: accumulating frames into a complete channel. + +use alloy_primitives::{Bytes, hex}; +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineResult, +}; +use kona_genesis::MAX_RLP_BYTES_PER_CHANNEL_FJORD; +use kona_protocol::Channel; +use tracing::{debug, error, info, warn}; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Assembles frames into a complete channel and returns the compressed channel bytes. + /// + /// This is the Holocene-only channel assembler. It always uses + /// `MAX_RLP_BYTES_PER_CHANNEL_FJORD` for the size limit. + pub(crate) async fn assemble_next_channel(&mut self) -> PipelineResult> { + let origin = self.l1_origin.ok_or(PipelineError::MissingOrigin.crit())?; + + // Time out the channel if it has exceeded the timeout. + if let Some(channel) = self.channel.as_ref() { + let timed_out = channel.open_block_number() + + self.cfg.channel_timeout(origin.timestamp) + < origin.number; + if timed_out { + warn!( + target: "karst", + "Channel (ID: {}) timed out at L1 origin #{}, open block #{}. Discarding.", + hex::encode(channel.id()), + origin.number, + channel.open_block_number() + ); + self.channel = None; + } + } + + // Get the next frame. + let next_frame = self.next_frame().await?; + + // Start a new channel if frame number is 0. + if next_frame.number == 0 { + info!( + target: "karst", + "Starting new channel (ID: {}) at L1 origin #{}", + hex::encode(next_frame.id), + origin.number + ); + self.channel = Some(Channel::new(next_frame.id, origin)); + } + + if let Some(channel) = self.channel.as_mut() { + debug!( + target: "karst", + "Adding frame #{} to channel (ID: {}) at L1 origin #{}", + next_frame.number, + hex::encode(channel.id()), + origin.number + ); + + if channel.add_frame(next_frame, origin).is_err() { + error!( + target: "karst", + "Failed to add frame to channel (ID: {}) at L1 origin #{}", + hex::encode(channel.id()), + origin.number + ); + return Err(PipelineError::NotEnoughData.temp()); + } + + // Always use Fjord size limit (Holocene+ only). + if channel.size() > MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize { + warn!( + target: "karst", + "Channel size exceeded max RLP bytes, dropping channel (ID: {}) with {} bytes", + hex::encode(channel.id()), + channel.size() + ); + self.channel = None; + return Err(PipelineError::NotEnoughData.temp()); + } + + // If the channel is complete, return the compressed bytes. + if channel.is_ready() { + let channel_bytes = channel + .frame_data() + .ok_or(PipelineError::ChannelNotFound.crit())?; + + info!( + target: "karst", + "Channel (ID: {}) ready for decompression.", + hex::encode(channel.id()), + ); + + self.channel = None; + return Ok(Some(channel_bytes)); + } + } + + Err(PipelineError::NotEnoughData.temp()) + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/frames.rs b/rust/kona/crates/protocol/karst-derive/src/frames.rs new file mode 100644 index 0000000000000..e71504f32891f --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/frames.rs @@ -0,0 +1,133 @@ +//! Frame queue: loading, parsing, and Holocene pruning of frames. + +use alloy_primitives::Bytes; +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineResult, +}; +use kona_protocol::Frame; +use tracing::{debug, error}; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Loads frames from L1 data into the frame queue. + /// + /// Fetches batcher data from the DA provider, parses it into frames, + /// and applies Holocene pruning rules. + pub(crate) async fn load_frames(&mut self) -> PipelineResult<()> { + // Skip if queue already has frames. + if !self.frame_queue.is_empty() { + return Ok(()); + } + + // Get the L1 block to retrieve data from. + if self.retrieval_block.is_none() { + let block = self.next_l1_block()?; + self.retrieval_block = Some(block); + } + + let block = self.retrieval_block.as_ref().expect("retrieval_block set above"); + let batcher_addr = self.system_config.batcher_address; + + let data = match self.da_provider.next(block, batcher_addr).await { + Ok(data) => data, + Err(e) => { + // On EOF, clear retrieval state so next call fetches a new block. + if e == PipelineError::Eof.temp() { + self.retrieval_block = None; + self.da_provider.clear(); + } + debug!(target: "karst", "Failed to retrieve data: {:?}", e); + return Err(e); + } + }; + + let raw: Bytes = data.into(); + let Ok(frames) = Frame::parse_frames(&raw) else { + error!(target: "karst", "Failed to parse frames from data."); + return Ok(()); + }; + + self.frame_queue.extend(frames); + + // Always apply Holocene pruning (karst only supports Holocene+). + self.prune_frames(); + + Ok(()) + } + + /// Holocene frame pruning. + /// + /// Enforces: + /// - Sequential frame numbers within a channel + /// - No frames after a closing frame within the same channel + /// - New channels must start with frame 0 + /// - Incomplete channels are dropped when a new channel starts + pub(crate) fn prune_frames(&mut self) { + if self.frame_queue.len() <= 1 { + return; + } + + let mut i = 0; + while i < self.frame_queue.len() - 1 { + let prev_frame_id = self.frame_queue[i].id; + let prev_frame_number = self.frame_queue[i].number; + let prev_frame_is_last = self.frame_queue[i].is_last; + + let next_frame_id = self.frame_queue[i + 1].id; + let next_frame_number = self.frame_queue[i + 1].number; + + let extends_channel = prev_frame_id == next_frame_id; + + // Sequential frame numbers within same channel. + if extends_channel && prev_frame_number + 1 != next_frame_number { + self.frame_queue.remove(i + 1); + continue; + } + + // No frames after closing frame in same channel. + if extends_channel && prev_frame_is_last { + self.frame_queue.remove(i + 1); + continue; + } + + // New channel must start with frame 0. + if !extends_channel && next_frame_number != 0 { + self.frame_queue.remove(i + 1); + continue; + } + + // Incomplete channel dropped when new channel starts. + if !extends_channel && !prev_frame_is_last && next_frame_number == 0 { + let first_frame = self + .frame_queue + .iter() + .position(|f| f.id == prev_frame_id) + .expect("infallible"); + let drained = self.frame_queue.drain(first_frame..=i); + i = i.saturating_sub(drained.len()); + continue; + } + + i += 1; + } + } + + /// Pops and returns the next frame from the queue, loading more if needed. + pub(crate) async fn next_frame(&mut self) -> PipelineResult { + self.load_frames().await?; + + if self.frame_queue.is_empty() { + return Err(PipelineError::NotEnoughData.temp()); + } + + Ok(self.frame_queue.pop_front().expect("checked non-empty")) + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/lib.rs b/rust/kona/crates/protocol/karst-derive/src/lib.rs new file mode 100644 index 0000000000000..49244c2862ebd --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/lib.rs @@ -0,0 +1,24 @@ +//! # karst-derive +//! +//! A flat, inlined OP Stack derivation pipeline that only supports Holocene+. +//! This replaces the staged, trait-composed `kona-derive` pipeline with a single +//! `Pipeline` struct that holds all intermediate state as fields and runs all +//! derivation steps as direct method calls. + +extern crate alloc; + +mod pipeline; +mod traversal; +mod frames; +mod channel; +mod reader; +mod batch; +mod attributes; + +pub use pipeline::{Pipeline, StepResult}; + +// Re-export key types and traits from kona-derive that consumers need. +pub use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineErrorKind, PipelineResult, +}; diff --git a/rust/kona/crates/protocol/karst-derive/src/pipeline.rs b/rust/kona/crates/protocol/karst-derive/src/pipeline.rs new file mode 100644 index 0000000000000..2208d9b323b5d --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/pipeline.rs @@ -0,0 +1,207 @@ +//! The core `Pipeline` struct and public API. + +use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; +use core::fmt::Debug; + +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineErrorKind, PipelineResult, +}; +use kona_genesis::{RollupConfig, SystemConfig}; +use kona_protocol::{ + BatchReader, BlockInfo, Channel, Frame, L2BlockInfo, OpAttributesWithParent, SingleBatch, +}; + +/// Result of a single pipeline step. +#[derive(Debug, PartialEq, Eq)] +pub enum StepResult { + /// Payload attributes were prepared and buffered. + PreparedAttributes, + /// The L1 origin was advanced. + AdvancedOrigin, + /// Origin advance failed. + OriginAdvanceErr(PipelineErrorKind), + /// Step failed with an error. + StepFailed(PipelineErrorKind), +} + +/// A flat, inlined derivation pipeline for the OP Stack (Holocene+ only). +/// +/// All derivation stages are collapsed into fields on this struct. Methods +/// implement each stage's logic directly, without trait-based stage composition. +#[derive(Debug)] +pub struct Pipeline { + // === Config === + pub(crate) cfg: Arc, + + // === L1 Traversal state === + /// The current L1 origin block. + pub(crate) l1_origin: Option, + /// Whether the current L1 origin has been consumed by the retrieval step. + pub(crate) l1_traversal_done: bool, + /// The current system config, updated from L1 receipts. + pub(crate) system_config: SystemConfig, + + // === L1 Retrieval state === + /// The current L1 block being used for data retrieval. + pub(crate) retrieval_block: Option, + + // === Frame Queue state === + pub(crate) frame_queue: VecDeque, + + // === Channel Assembler state (Holocene-only, no ChannelBank) === + pub(crate) channel: Option, + + // === Channel Reader state === + pub(crate) batch_reader: Option, + + // === Batch Stream state (span batch decomposition) === + pub(crate) single_batch_buffer: VecDeque, + + // === Batch Validator state === + /// The L1 origin tracked by the batch validator (may differ from pipeline origin). + pub(crate) batch_origin: Option, + /// A window of L1 blocks used for batch validation. + pub(crate) l1_blocks: Vec, + + // === Attributes Queue state === + pub(crate) pending_batch: Option, + pub(crate) is_last_in_span: bool, + + // === Output buffer === + pub(crate) prepared: VecDeque, + + // === External providers === + pub(crate) chain_provider: CP, + pub(crate) da_provider: DAP, + pub(crate) l2_provider: L2P, + pub(crate) attributes_builder: AB, +} + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Creates a new pipeline with the given configuration and providers. + pub fn new( + cfg: Arc, + chain_provider: CP, + da_provider: DAP, + l2_provider: L2P, + attributes_builder: AB, + ) -> Self { + Self { + cfg, + l1_origin: None, + l1_traversal_done: false, + system_config: SystemConfig::default(), + retrieval_block: None, + frame_queue: VecDeque::new(), + channel: None, + batch_reader: None, + single_batch_buffer: VecDeque::new(), + batch_origin: None, + l1_blocks: Vec::new(), + pending_batch: None, + is_last_in_span: false, + prepared: VecDeque::new(), + chain_provider, + da_provider, + l2_provider, + attributes_builder, + } + } + + /// Steps through the derivation pipeline once. + /// + /// On success, buffers a new `OpAttributesWithParent`. On EOF from the + /// attributes stage, advances the L1 origin instead. + pub async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { + match self.next_attributes(cursor).await { + Ok(attrs) => { + self.prepared.push_back(attrs); + StepResult::PreparedAttributes + } + Err(PipelineErrorKind::Temporary(PipelineError::Eof)) => { + if let Err(e) = self.advance_origin().await { + return StepResult::OriginAdvanceErr(e); + } + StepResult::AdvancedOrigin + } + Err(e) => StepResult::StepFailed(e), + } + } + + /// Resets the pipeline to a known state. + /// + /// This clears all intermediate state and sets the L1 origin and system config. + pub async fn reset( + &mut self, + l1_origin: BlockInfo, + l2_safe_head: L2BlockInfo, + ) -> PipelineResult<()> { + // Fetch system config for the L2 safe head. + let system_config = self + .l2_provider + .system_config_by_number(l2_safe_head.block_info.number, Arc::clone(&self.cfg)) + .await + .map_err(Into::into)?; + + // Reset L1 traversal. + self.l1_origin = Some(l1_origin); + self.l1_traversal_done = false; + self.system_config = system_config; + + // Reset L1 retrieval. + self.retrieval_block = Some(l1_origin); + + // Reset frame queue. + self.frame_queue.clear(); + + // Reset channel assembler. + self.channel = None; + + // Reset channel reader. + self.batch_reader = None; + + // Reset batch stream. + self.single_batch_buffer.clear(); + + // Reset batch validator. + self.batch_origin = Some(l1_origin); + self.l1_blocks.clear(); + self.l1_blocks.push(l1_origin); + + // Reset attributes queue. + self.pending_batch = None; + self.is_last_in_span = false; + + Ok(()) + } + + /// Flushes the current in-progress channel, discarding partial data. + pub fn flush_channel(&mut self) { + self.batch_reader = None; + self.channel = None; + self.single_batch_buffer.clear(); + self.pending_batch = None; + } + + /// Returns the current L1 origin block, if set. + pub const fn origin(&self) -> Option { + self.l1_origin + } + + /// Peeks at the next prepared attributes without consuming them. + pub fn peek(&self) -> Option<&OpAttributesWithParent> { + self.prepared.front() + } + + /// Consumes and returns the next prepared attributes. + pub fn pop_attributes(&mut self) -> Option { + self.prepared.pop_front() + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/reader.rs b/rust/kona/crates/protocol/karst-derive/src/reader.rs new file mode 100644 index 0000000000000..5046be4a8cfa4 --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/reader.rs @@ -0,0 +1,66 @@ +//! Channel reader: decompressing channel data into batches. + +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineResult, +}; +use kona_genesis::MAX_RLP_BYTES_PER_CHANNEL_FJORD; +use kona_protocol::{Batch, BatchReader}; +use tracing::debug; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Ensures a `BatchReader` is available by assembling a channel if needed. + pub(crate) async fn ensure_batch_reader(&mut self) -> PipelineResult<()> { + if self.batch_reader.is_none() { + let channel_data = self + .assemble_next_channel() + .await? + .ok_or(PipelineError::ChannelReaderEmpty.temp())?; + + // Always use Fjord max size (Holocene+ only). + self.batch_reader = Some(BatchReader::new( + &channel_data[..], + MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, + )); + } + Ok(()) + } + + /// Reads the next batch from the channel reader. + /// + /// Decompresses and decodes a batch from the current `BatchReader`. + /// If decompression or decoding fails, the reader is discarded. + pub(crate) async fn next_batch_from_reader(&mut self) -> PipelineResult { + if let Err(e) = self.ensure_batch_reader().await { + debug!(target: "karst", "Failed to set batch reader: {:?}", e); + self.batch_reader = None; + return Err(e); + } + + let reader = self.batch_reader.as_mut().expect("batch reader set above"); + + // Decompress. + if let Err(err) = reader.decompress() { + debug!(target: "karst", ?err, "Failed to decompress batch"); + self.batch_reader = None; + return Err(PipelineError::NotEnoughData.temp()); + } + + // Decode the next batch. + match reader.next_batch(self.cfg.as_ref()) { + Some(batch) => Ok(batch), + None => { + self.batch_reader = None; + Err(PipelineError::NotEnoughData.temp()) + } + } + } +} diff --git a/rust/kona/crates/protocol/karst-derive/src/traversal.rs b/rust/kona/crates/protocol/karst-derive/src/traversal.rs new file mode 100644 index 0000000000000..5cdde9c3d53a6 --- /dev/null +++ b/rust/kona/crates/protocol/karst-derive/src/traversal.rs @@ -0,0 +1,87 @@ +//! L1 traversal and origin advancing. + +use kona_derive::{ + AttributesBuilder, ChainProvider, DataAvailabilityProvider, L2ChainProvider, PipelineError, + PipelineResult, ResetError, +}; +use kona_protocol::BlockInfo; +use tracing::{error, info, warn}; + +use crate::Pipeline; + +impl Pipeline +where + CP: ChainProvider + Send, + DAP: DataAvailabilityProvider + Send, + L2P: L2ChainProvider + Send, + AB: AttributesBuilder + Send, +{ + /// Advances the L1 origin to the next block. + /// + /// Fetches the next L1 block, checks for reorgs, and updates the system config. + pub(crate) async fn advance_origin(&mut self) -> PipelineResult<()> { + let block = match self.l1_origin { + Some(block) => block, + None => { + warn!(target: "karst", "Missing current block, can't advance origin."); + return Err(PipelineError::Eof.temp()); + } + }; + + let next_l1_origin = self + .chain_provider + .block_info_by_number(block.number + 1) + .await + .map_err(Into::into)?; + + // Check for reorgs. + if block.hash != next_l1_origin.parent_hash { + return Err(ResetError::ReorgDetected(block.hash, next_l1_origin.parent_hash).into()); + } + + // Fetch receipts and update system config. + let receipts = self + .chain_provider + .receipts_by_hash(next_l1_origin.hash) + .await + .map_err(Into::into)?; + + let addr = self.cfg.l1_system_config_address; + let ecotone_active = self.cfg.is_ecotone_active(next_l1_origin.timestamp); + match self + .system_config + .update_with_receipts(&receipts[..], addr, ecotone_active) + { + Ok(true) => { + info!(target: "karst", "System config updated at block {}.", next_l1_origin.number); + } + Ok(false) => { /* No update applied */ } + Err(err) => { + error!(target: "karst", ?err, "Failed to update system config at block {}", next_l1_origin.number); + return Err(PipelineError::SystemConfigUpdate(err).crit()); + } + } + + // Update origin. + self.l1_origin = Some(next_l1_origin); + self.l1_traversal_done = false; + + // Reset retrieval state for the new origin. + self.retrieval_block = None; + self.da_provider.clear(); + + Ok(()) + } + + /// Returns the next L1 block for data retrieval, consuming it. + /// + /// Can only be called once per origin; subsequent calls return EOF until + /// `advance_origin()` is called. + pub(crate) fn next_l1_block(&mut self) -> PipelineResult { + if self.l1_traversal_done { + return Err(PipelineError::Eof.temp()); + } + self.l1_traversal_done = true; + self.l1_origin.ok_or(PipelineError::MissingOrigin.crit()) + } +}