diff --git a/Cargo.lock b/Cargo.lock index 8826fe35..4a83d0d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1577,6 +1577,7 @@ dependencies = [ "reth-rpc-eth-api", "reth-testing-utils", "reth-transaction-pool", + "rstest", "serde_json", "tokio", "tokio-tungstenite 0.28.0", diff --git a/crates/flashblocks/Cargo.toml b/crates/flashblocks/Cargo.toml index 0f604b5b..31598f74 100644 --- a/crates/flashblocks/Cargo.toml +++ b/crates/flashblocks/Cargo.toml @@ -73,6 +73,7 @@ reth-primitives-traits.workspace = true reth-optimism-primitives.workspace = true reth-transaction-pool.workspace = true serde_json.workspace = true +rstest.workspace = true criterion = { version = "0.5", features = ["async_tokio"] } [[bench]] diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index b9bec3e5..24b76bc8 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -3,6 +3,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +#[macro_use] +extern crate tracing; + mod metrics; pub use metrics::Metrics; @@ -23,3 +26,9 @@ pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI}; mod state_builder; pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder}; + +mod validation; +pub use validation::{ + CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, + ReorgDetectionResult, ReorgDetector, SequenceValidationResult, +}; diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 5524383d..5c705531 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -1,10 +1,6 @@ //! Flashblocks state processor. -use std::{ - collections::{BTreeMap, HashSet}, - sync::Arc, - time::Instant, -}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; use alloy_consensus::{ Header, @@ -32,9 +28,14 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_primitives::OpBlock; use reth_primitives::RecoveredBlock; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; -use tracing::{debug, error, info, warn}; -use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder}; +use crate::{ + Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, + validation::{ + CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, + ReorgDetector, SequenceValidationResult, + }, +}; /// Messages consumed by the state processor. #[derive(Debug, Clone)] @@ -122,86 +123,87 @@ where prev_pending_blocks: Option>, block: &RecoveredBlock, ) -> eyre::Result>> { - match &prev_pending_blocks { - Some(pending_blocks) => { - let mut flashblocks = pending_blocks.get_flashblocks(); - let num_flashblocks_for_canon = flashblocks - .iter() - .filter(|fb| fb.metadata.block_number == block.number) - .count(); - self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); - self.metrics - .pending_snapshot_height - .set(pending_blocks.latest_block_number() as f64); - - if pending_blocks.latest_block_number() <= block.number { - debug!( - message = "pending snapshot cleared because canonical caught up", - latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number, - ); - self.metrics.pending_clear_catchup.increment(1); - self.metrics - .pending_snapshot_fb_index - .set(pending_blocks.latest_flashblock_index() as f64); - - Ok(None) - } else { - // If we had a reorg, we need to reset all flashblocks state - let tracked_txns = pending_blocks.get_transactions_for_block(block.number); - let tracked_txn_hashes: HashSet<_> = - tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); - let block_txn_hashes: HashSet<_> = - block.body().transactions().map(|tx| tx.tx_hash()).collect(); - let pending_blocks_depth = - block.number - pending_blocks.earliest_block_number(); - - debug!( - message = "canonical block behind latest pending block, checking for reorg and max depth", - latest_pending_block = pending_blocks.latest_block_number(), - earliest_pending_block = pending_blocks.earliest_block_number(), - canonical_block = block.number, - pending_txns_for_block = ?tracked_txn_hashes.len(), - canonical_txns_for_block = ?block_txn_hashes.len(), - pending_blocks_depth = pending_blocks_depth, - max_depth = self.max_depth, - ); - - if tracked_txn_hashes.len() != block_txn_hashes.len() - || tracked_txn_hashes != block_txn_hashes - { - debug!( - message = "reorg detected, recomputing pending flashblocks going ahead of reorg", - tracked_txn_hashes = ?tracked_txn_hashes, - block_txn_hashes = ?block_txn_hashes, - ); - self.metrics.pending_clear_reorg.increment(1); - - // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); - return self.build_pending_state(None, &flashblocks); - } - - if pending_blocks_depth > self.max_depth { - debug!( - message = - "pending blocks depth exceeds max depth, resetting pending blocks", - pending_blocks_depth = pending_blocks_depth, - max_depth = self.max_depth, - ); - - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); - return self.build_pending_state(None, &flashblocks); - } + let pending_blocks = match &prev_pending_blocks { + Some(pb) => pb, + None => { + debug!(message = "no pending state to update with canonical block, skipping"); + return Ok(None); + } + }; - // If no reorg, we can continue building on top of the existing pending state - // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number - self.build_pending_state(prev_pending_blocks, &flashblocks) - } + let mut flashblocks = pending_blocks.get_flashblocks(); + let num_flashblocks_for_canon = + flashblocks.iter().filter(|fb| fb.metadata.block_number == block.number).count(); + self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); + self.metrics.pending_snapshot_height.set(pending_blocks.latest_block_number() as f64); + + // Check for reorg by comparing transaction sets + let tracked_txns = pending_blocks.get_transactions_for_block(block.number); + let tracked_txn_hashes: Vec<_> = tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); + let block_txn_hashes: Vec<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); + + let reorg_result = ReorgDetector::detect(&tracked_txn_hashes, &block_txn_hashes); + let reorg_detected = reorg_result.is_reorg(); + + // Determine the reconciliation strategy + let strategy = CanonicalBlockReconciler::reconcile( + Some(pending_blocks.earliest_block_number()), + Some(pending_blocks.latest_block_number()), + block.number, + self.max_depth, + reorg_detected, + ); + + match strategy { + ReconciliationStrategy::CatchUp => { + debug!( + message = "pending snapshot cleared because canonical caught up", + latest_pending_block = pending_blocks.latest_block_number(), + canonical_block = block.number, + ); + self.metrics.pending_clear_catchup.increment(1); + self.metrics + .pending_snapshot_fb_index + .set(pending_blocks.latest_flashblock_index() as f64); + Ok(None) } - None => { + ReconciliationStrategy::HandleReorg => { + warn!( + message = "reorg detected, recomputing pending flashblocks going ahead of reorg", + tracked_txn_hashes = ?tracked_txn_hashes, + block_txn_hashes = ?block_txn_hashes, + ); + self.metrics.pending_clear_reorg.increment(1); + + // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state + flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number); + self.build_pending_state(None, &flashblocks) + } + ReconciliationStrategy::DepthLimitExceeded { depth, max_depth } => { + debug!( + message = "pending blocks depth exceeds max depth, resetting pending blocks", + pending_blocks_depth = depth, + max_depth = max_depth, + ); + + flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number); + self.build_pending_state(None, &flashblocks) + } + ReconciliationStrategy::Continue => { + debug!( + message = "canonical block behind latest pending block, continuing with existing pending state", + latest_pending_block = pending_blocks.latest_block_number(), + earliest_pending_block = pending_blocks.earliest_block_number(), + canonical_block = block.number, + pending_txns_for_block = ?tracked_txn_hashes.len(), + canonical_txns_for_block = ?block_txn_hashes.len(), + ); + // If no reorg, we can continue building on top of the existing pending state + // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number + self.build_pending_state(prev_pending_blocks, &flashblocks) + } + ReconciliationStrategy::NoPendingState => { + // This case is already handled above, but included for completeness debug!(message = "no pending state to update with canonical block, skipping"); Ok(None) } @@ -213,53 +215,64 @@ where prev_pending_blocks: Option>, flashblock: Flashblock, ) -> eyre::Result>> { - match &prev_pending_blocks { - Some(pending_blocks) => { - if self.is_next_flashblock(pending_blocks, &flashblock) { - // We have received the next flashblock for the current block - // or the first flashblock for the next block - let mut flashblocks = pending_blocks.get_flashblocks(); - flashblocks.push(flashblock); - self.build_pending_state(prev_pending_blocks, &flashblocks) - } else if pending_blocks.latest_block_number() != flashblock.metadata.block_number { - // We have received a non-zero flashblock for a new block - self.metrics.unexpected_block_order.increment(1); - error!( - message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %flashblock.metadata.block_number, - ); - Ok(None) - } else if pending_blocks.latest_flashblock_index() == flashblock.index { - // We have received a duplicate flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - warn!( - message = "Received duplicate Flashblock for current block, ignoring", - curr_block = %pending_blocks.latest_block_number(), - flashblock_index = %flashblock.index, - ); - Ok(prev_pending_blocks) - } else { - // We have received a non-sequential Flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - - error!( - message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %flashblock.metadata.block_number, - ); - - Ok(None) - } - } + let pending_blocks = match &prev_pending_blocks { + Some(pb) => pb, None => { if flashblock.index == 0 { - self.build_pending_state(None, &vec![flashblock]) + return self.build_pending_state(None, &vec![flashblock]); } else { info!(message = "waiting for first Flashblock"); - Ok(None) + return Ok(None); } } + }; + + let validation_result = FlashblockSequenceValidator::validate( + pending_blocks.latest_block_number(), + pending_blocks.latest_flashblock_index(), + flashblock.metadata.block_number, + flashblock.index, + ); + + match validation_result { + SequenceValidationResult::NextInSequence + | SequenceValidationResult::FirstOfNextBlock => { + // We have received the next flashblock for the current block + // or the first flashblock for the next block + let mut flashblocks = pending_blocks.get_flashblocks(); + flashblocks.push(flashblock); + self.build_pending_state(prev_pending_blocks, &flashblocks) + } + SequenceValidationResult::Duplicate => { + // We have received a duplicate flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + warn!( + message = "Received duplicate Flashblock for current block, ignoring", + curr_block = %pending_blocks.latest_block_number(), + flashblock_index = %flashblock.index, + ); + Ok(prev_pending_blocks) + } + SequenceValidationResult::InvalidNewBlockIndex { block_number, index: _ } => { + // We have received a non-zero flashblock for a new block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %block_number, + ); + Ok(None) + } + SequenceValidationResult::NonSequentialGap { expected: _, actual: _ } => { + // We have received a non-sequential Flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %flashblock.metadata.block_number, + ); + Ok(None) + } } } @@ -432,19 +445,4 @@ where Ok(Some(Arc::new(pending_blocks_builder.build()?))) } - - fn is_next_flashblock( - &self, - pending_blocks: &Arc, - flashblock: &Flashblock, - ) -> bool { - let is_next_of_block = flashblock.metadata.block_number - == pending_blocks.latest_block_number() - && flashblock.index == pending_blocks.latest_flashblock_index() + 1; - let is_first_of_next_block = flashblock.metadata.block_number - == pending_blocks.latest_block_number() + 1 - && flashblock.index == 0; - - is_next_of_block || is_first_of_next_block - } } diff --git a/crates/flashblocks/src/state.rs b/crates/flashblocks/src/state.rs index 5b01f827..f2bcefe9 100644 --- a/crates/flashblocks/src/state.rs +++ b/crates/flashblocks/src/state.rs @@ -17,7 +17,6 @@ use tokio::sync::{ broadcast::{self, Sender}, mpsc, }; -use tracing::{error, info}; use crate::{ FlashblocksAPI, FlashblocksReceiver, PendingBlocks, diff --git a/crates/flashblocks/src/subscription.rs b/crates/flashblocks/src/subscription.rs index 34e208c7..7322fd49 100644 --- a/crates/flashblocks/src/subscription.rs +++ b/crates/flashblocks/src/subscription.rs @@ -6,7 +6,6 @@ use base_flashtypes::Flashblock; use futures_util::{SinkExt as _, StreamExt}; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -use tracing::{error, info, trace, warn}; use url::Url; use crate::{FlashblocksReceiver, Metrics}; diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs new file mode 100644 index 00000000..7d5607a9 --- /dev/null +++ b/crates/flashblocks/src/validation.rs @@ -0,0 +1,314 @@ +//! Flashblock sequence validation and reorganization detection. +//! +//! Provides stateless validation logic for flashblock sequencing and chain reorg detection. + +use alloy_primitives::B256; + +/// Result of validating a flashblock's position in the sequence. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SequenceValidationResult { + /// Next consecutive flashblock within the current block (same block, index + 1). + NextInSequence, + /// First flashblock (index 0) of the next block (block + 1). + FirstOfNextBlock, + /// Duplicate flashblock (same block and index) - should be ignored. + Duplicate, + /// Non-sequential index within the same block - indicates missed flashblocks. + NonSequentialGap { + /// Expected flashblock index. + expected: u64, + /// Actual incoming flashblock index. + actual: u64, + }, + /// New block received with non-zero index - missed the base flashblock. + InvalidNewBlockIndex { + /// Block number of the incoming flashblock. + block_number: u64, + /// The invalid (non-zero) index received. + index: u64, + }, +} + +/// Stateless validator for flashblock sequence ordering. +#[derive(Debug, Clone, Copy, Default)] +pub struct FlashblockSequenceValidator; + +impl FlashblockSequenceValidator { + /// Validates whether an incoming flashblock follows the expected sequence. + /// + /// Returns the appropriate [`SequenceValidationResult`] based on: + /// - Same block, index + 1 → `NextInSequence` + /// - Next block, index 0 → `FirstOfNextBlock` + /// - Same block and index → `Duplicate` + /// - Same block, wrong index → `NonSequentialGap` + /// - Different block, non-zero index or block gap → `InvalidNewBlockIndex` + pub const fn validate( + latest_block_number: u64, + latest_flashblock_index: u64, + incoming_block_number: u64, + incoming_index: u64, + ) -> SequenceValidationResult { + // Next flashblock within the current block + if incoming_block_number == latest_block_number + && incoming_index == latest_flashblock_index + 1 + { + SequenceValidationResult::NextInSequence + // First flashblock of the next block + } else if incoming_block_number == latest_block_number + 1 && incoming_index == 0 { + SequenceValidationResult::FirstOfNextBlock + // New block with non-zero index or block gap + } else if incoming_block_number != latest_block_number { + SequenceValidationResult::InvalidNewBlockIndex { + block_number: incoming_block_number, + index: incoming_index, + } + } else if incoming_index == latest_flashblock_index { + // Duplicate flashblock + SequenceValidationResult::Duplicate + } else { + // Non-sequential index within the same block + SequenceValidationResult::NonSequentialGap { + expected: latest_flashblock_index + 1, + actual: incoming_index, + } + } + } +} + +/// Result of a reorganization detection check. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReorgDetectionResult { + /// Transaction sets match exactly. + NoReorg, + /// Transaction sets differ (counts included for diagnostics). + ReorgDetected { + /// Number of transactions in the tracked (pending) set. + tracked_count: usize, + /// Number of transactions in the canonical chain set. + canonical_count: usize, + }, +} + +impl ReorgDetectionResult { + /// Returns `true` if a reorganization was detected. + #[inline] + pub const fn is_reorg(&self) -> bool { + matches!(self, Self::ReorgDetected { .. }) + } + + /// Returns `true` if no reorganization was detected. + #[inline] + pub const fn is_no_reorg(&self) -> bool { + matches!(self, Self::NoReorg) + } +} + +/// Detects chain reorganizations by comparing transaction hash sets. +#[derive(Debug, Clone, Copy, Default)] +pub struct ReorgDetector; + +impl ReorgDetector { + /// Compares tracked vs canonical transaction hashes to detect reorgs. + /// + /// Returns `ReorgDetected` if counts differ, hashes differ, or order differs. + pub fn detect( + tracked_tx_hashes: &[B256], + canonical_tx_hashes: &[B256], + ) -> ReorgDetectionResult { + if tracked_tx_hashes != canonical_tx_hashes { + ReorgDetectionResult::ReorgDetected { + tracked_count: tracked_tx_hashes.len(), + canonical_count: canonical_tx_hashes.len(), + } + } else { + ReorgDetectionResult::NoReorg + } + } +} + +/// Strategy for reconciling pending state with canonical state on new canonical blocks. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReconciliationStrategy { + /// Canonical caught up or passed pending (canonical >= latest pending). Clear pending state. + CatchUp, + /// Reorg detected (tx mismatch). Rebuild pending from canonical. + HandleReorg, + /// Pending too far ahead of canonical. + DepthLimitExceeded { + /// Current depth of pending blocks. + depth: u64, + /// Configured maximum depth. + max_depth: u64, + }, + /// No issues - continue building on pending state. + Continue, + /// No pending state exists (startup or after clear). + NoPendingState, +} + +/// Determines reconciliation strategy for canonical block updates. +#[derive(Debug, Clone, Copy, Default)] +pub struct CanonicalBlockReconciler; + +impl CanonicalBlockReconciler { + /// Returns the appropriate [`ReconciliationStrategy`] based on pending vs canonical state. + /// + /// Priority: `NoPendingState` → `CatchUp` → `HandleReorg` → `DepthLimitExceeded` → `Continue` + pub const fn reconcile( + pending_earliest_block: Option, + pending_latest_block: Option, + canonical_block_number: u64, + max_depth: u64, + reorg_detected: bool, + ) -> ReconciliationStrategy { + // Check if pending state exists + let (earliest, latest) = match (pending_earliest_block, pending_latest_block) { + (Some(e), Some(l)) => (e, l), + _ => return ReconciliationStrategy::NoPendingState, + }; + + // Check if canonical has caught up or passed pending + if latest <= canonical_block_number { + return ReconciliationStrategy::CatchUp; + } + + // Check for reorg + if reorg_detected { + return ReconciliationStrategy::HandleReorg; + } + + // Check depth limit + let depth = canonical_block_number.saturating_sub(earliest); + if depth > max_depth { + return ReconciliationStrategy::DepthLimitExceeded { depth, max_depth }; + } + + // No issues, continue building + ReconciliationStrategy::Continue + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + // ==================== FlashblockSequenceValidator Tests ==================== + + #[rstest] + // NextInSequence: consecutive indices within the same block + #[case(100, 2, 100, 3, SequenceValidationResult::NextInSequence)] + #[case(100, 0, 100, 1, SequenceValidationResult::NextInSequence)] + #[case(100, 999, 100, 1000, SequenceValidationResult::NextInSequence)] + #[case(0, 0, 0, 1, SequenceValidationResult::NextInSequence)] + #[case(100, u64::MAX - 1, 100, u64::MAX, SequenceValidationResult::NextInSequence)] + // FirstOfNextBlock: index 0 of the next block + #[case(0, 0, 1, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(100, 5, 101, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(100, 0, 101, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(999999, 10, 1000000, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(0, 5, 1, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(u64::MAX - 1, 0, u64::MAX, 0, SequenceValidationResult::FirstOfNextBlock)] + // Duplicate: same block and index + #[case(100, 5, 100, 5, SequenceValidationResult::Duplicate)] + #[case(100, 0, 100, 0, SequenceValidationResult::Duplicate)] + // NonSequentialGap: non-consecutive indices within the same block + #[case(100, 2, 100, 4, SequenceValidationResult::NonSequentialGap { expected: 3, actual: 4 })] + #[case(100, 0, 100, 10, SequenceValidationResult::NonSequentialGap { expected: 1, actual: 10 })] + #[case(100, 5, 100, 3, SequenceValidationResult::NonSequentialGap { expected: 6, actual: 3 })] + // InvalidNewBlockIndex: new block with non-zero index or block gap + #[case(100, 5, 101, 1, SequenceValidationResult::InvalidNewBlockIndex { block_number: 101, index: 1 })] + #[case(100, 5, 105, 3, SequenceValidationResult::InvalidNewBlockIndex { block_number: 105, index: 3 })] + #[case(100, 5, 102, 0, SequenceValidationResult::InvalidNewBlockIndex { block_number: 102, index: 0 })] + #[case(100, 5, 99, 0, SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 0 })] + #[case(100, 5, 99, 5, SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 5 })] + fn test_sequence_validator( + #[case] latest_block: u64, + #[case] latest_idx: u64, + #[case] incoming_block: u64, + #[case] incoming_idx: u64, + #[case] expected: SequenceValidationResult, + ) { + let result = FlashblockSequenceValidator::validate( + latest_block, + latest_idx, + incoming_block, + incoming_idx, + ); + assert_eq!(result, expected); + } + + // ==================== ReorgDetector Tests ==================== + + #[rstest] + // No reorg cases - identical sequences + #[case(&[], &[], ReorgDetectionResult::NoReorg)] + #[case(&[0x01], &[0x01], ReorgDetectionResult::NoReorg)] + #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02, 0x03], ReorgDetectionResult::NoReorg)] + #[case(&[0x01, 0x01, 0x02], &[0x01, 0x01, 0x02], ReorgDetectionResult::NoReorg)] + // Reorg cases - different order (order matters!) + #[case(&[0x01, 0x02, 0x03], &[0x03, 0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 3 })] + #[case(&[0x01, 0x02], &[0x02, 0x01], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] + // Reorg cases - different counts + #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 })] + #[case(&[0x01], &[0x01, 0x02, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 3 })] + #[case(&[], &[0x01], ReorgDetectionResult::ReorgDetected { tracked_count: 0, canonical_count: 1 })] + #[case(&[0x01], &[], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 0 })] + #[case(&[0x01, 0x01, 0x02], &[0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 })] + // Reorg cases - same count, different hashes + #[case(&[0x01, 0x02], &[0x03, 0x04], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] + #[case(&[0x01, 0x02], &[0x01, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] + #[case(&[0x42], &[0x43], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 1 })] + fn test_reorg_detector( + #[case] tracked_bytes: &[u8], + #[case] canonical_bytes: &[u8], + #[case] expected: ReorgDetectionResult, + ) { + let tracked: Vec = tracked_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); + let canonical: Vec = canonical_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); + let result = ReorgDetector::detect(&tracked, &canonical); + assert_eq!(result, expected); + assert_eq!( + result.is_reorg(), + matches!(expected, ReorgDetectionResult::ReorgDetected { .. }) + ); + } + + // ==================== CanonicalBlockReconciler Tests ==================== + + #[rstest] + // NoPendingState + #[case(None, None, 100, 10, false, ReconciliationStrategy::NoPendingState)] + #[case(Some(100), None, 100, 10, false, ReconciliationStrategy::NoPendingState)] + #[case(None, Some(100), 100, 10, false, ReconciliationStrategy::NoPendingState)] + // CatchUp: canonical >= latest pending + #[case(Some(100), Some(105), 105, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(105), 110, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(100), 100, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(105), 105, 10, true, ReconciliationStrategy::CatchUp)] // catchup > reorg priority + // HandleReorg + #[case(Some(100), Some(110), 102, 10, true, ReconciliationStrategy::HandleReorg)] + #[case(Some(100), Some(130), 120, 10, true, ReconciliationStrategy::HandleReorg)] // reorg > depth priority + // DepthLimitExceeded + #[case(Some(100), Some(120), 115, 10, false, ReconciliationStrategy::DepthLimitExceeded { depth: 15, max_depth: 10 })] + #[case(Some(100), Some(105), 101, 0, false, ReconciliationStrategy::DepthLimitExceeded { depth: 1, max_depth: 0 })] + // Continue + #[case(Some(100), Some(110), 105, 10, false, ReconciliationStrategy::Continue)] + #[case(Some(100), Some(120), 110, 10, false, ReconciliationStrategy::Continue)] // depth exactly at limit + #[case(Some(100), Some(105), 100, 10, false, ReconciliationStrategy::Continue)] + #[case(Some(100), Some(105), 100, 0, false, ReconciliationStrategy::Continue)] // zero depth ok with max_depth=0 + #[case(Some(100), Some(100), 99, 10, false, ReconciliationStrategy::Continue)] // single pending block + fn test_reconciler( + #[case] earliest: Option, + #[case] latest: Option, + #[case] canonical: u64, + #[case] max_depth: u64, + #[case] reorg: bool, + #[case] expected: ReconciliationStrategy, + ) { + let result = + CanonicalBlockReconciler::reconcile(earliest, latest, canonical, max_depth, reorg); + assert_eq!(result, expected); + } +}