From 2e725a27c73cd37ea01b61348af5c7b533c336d1 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 11:51:33 -0500 Subject: [PATCH 1/8] feat(flashblocks): validation encapsulation --- crates/flashblocks/src/lib.rs | 6 + crates/flashblocks/src/processor.rs | 268 +++---- crates/flashblocks/src/validation.rs | 998 +++++++++++++++++++++++++++ 3 files changed, 1138 insertions(+), 134 deletions(-) create mode 100644 crates/flashblocks/src/validation.rs diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index b9bec3e5..99c851f3 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -23,3 +23,9 @@ pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI}; mod state_builder; pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder}; + +pub mod validation; +pub use validation::{ + CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, + ReorgDetectionResult, ReorgDetector, SequenceValidationResult, +}; diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 5524383d..5b3323b1 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -1,10 +1,6 @@ //! Flashblocks state processor. -use std::{ - collections::{BTreeMap, HashSet}, - sync::Arc, - time::Instant, -}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; use alloy_consensus::{ Header, @@ -34,7 +30,13 @@ use reth_primitives::RecoveredBlock; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; use tracing::{debug, error, info, warn}; -use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder}; +use crate::{ + Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, + validation::{ + CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, + ReorgDetector, SequenceValidationResult, + }, +}; /// Messages consumed by the state processor. #[derive(Debug, Clone)] @@ -122,86 +124,88 @@ where prev_pending_blocks: Option>, block: &RecoveredBlock, ) -> eyre::Result>> { - match &prev_pending_blocks { - Some(pending_blocks) => { - let mut flashblocks = pending_blocks.get_flashblocks(); - let num_flashblocks_for_canon = flashblocks - .iter() - .filter(|fb| fb.metadata.block_number == block.number) - .count(); - self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); - self.metrics - .pending_snapshot_height - .set(pending_blocks.latest_block_number() as f64); - - if pending_blocks.latest_block_number() <= block.number { - debug!( - message = "pending snapshot cleared because canonical caught up", - latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number, - ); - self.metrics.pending_clear_catchup.increment(1); - self.metrics - .pending_snapshot_fb_index - .set(pending_blocks.latest_flashblock_index() as f64); - - Ok(None) - } else { - // If we had a reorg, we need to reset all flashblocks state - let tracked_txns = pending_blocks.get_transactions_for_block(block.number); - let tracked_txn_hashes: HashSet<_> = - tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); - let block_txn_hashes: HashSet<_> = - block.body().transactions().map(|tx| tx.tx_hash()).collect(); - let pending_blocks_depth = - block.number - pending_blocks.earliest_block_number(); - - debug!( - message = "canonical block behind latest pending block, checking for reorg and max depth", - latest_pending_block = pending_blocks.latest_block_number(), - earliest_pending_block = pending_blocks.earliest_block_number(), - canonical_block = block.number, - pending_txns_for_block = ?tracked_txn_hashes.len(), - canonical_txns_for_block = ?block_txn_hashes.len(), - pending_blocks_depth = pending_blocks_depth, - max_depth = self.max_depth, - ); - - if tracked_txn_hashes.len() != block_txn_hashes.len() - || tracked_txn_hashes != block_txn_hashes - { - debug!( - message = "reorg detected, recomputing pending flashblocks going ahead of reorg", - tracked_txn_hashes = ?tracked_txn_hashes, - block_txn_hashes = ?block_txn_hashes, - ); - self.metrics.pending_clear_reorg.increment(1); - - // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); - return self.build_pending_state(None, &flashblocks); - } - - if pending_blocks_depth > self.max_depth { - debug!( - message = - "pending blocks depth exceeds max depth, resetting pending blocks", - pending_blocks_depth = pending_blocks_depth, - max_depth = self.max_depth, - ); - - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); - return self.build_pending_state(None, &flashblocks); - } + let pending_blocks = match &prev_pending_blocks { + Some(pb) => pb, + None => { + debug!(message = "no pending state to update with canonical block, skipping"); + return Ok(None); + } + }; - // If no reorg, we can continue building on top of the existing pending state - // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number - self.build_pending_state(prev_pending_blocks, &flashblocks) - } + let mut flashblocks = pending_blocks.get_flashblocks(); + let num_flashblocks_for_canon = + flashblocks.iter().filter(|fb| fb.metadata.block_number == block.number).count(); + self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); + self.metrics.pending_snapshot_height.set(pending_blocks.latest_block_number() as f64); + + // Check for reorg by comparing transaction sets + let tracked_txns = pending_blocks.get_transactions_for_block(block.number); + let tracked_txn_hashes: Vec<_> = tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); + let block_txn_hashes: Vec<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); + + let reorg_result = + ReorgDetector::detect(tracked_txn_hashes.iter(), block_txn_hashes.iter()); + let reorg_detected = reorg_result.is_reorg(); + + // Determine the reconciliation strategy + let strategy = CanonicalBlockReconciler::reconcile( + Some(pending_blocks.earliest_block_number()), + Some(pending_blocks.latest_block_number()), + block.number, + self.max_depth, + reorg_detected, + ); + + match strategy { + ReconciliationStrategy::CatchUp => { + debug!( + message = "pending snapshot cleared because canonical caught up", + latest_pending_block = pending_blocks.latest_block_number(), + canonical_block = block.number, + ); + self.metrics.pending_clear_catchup.increment(1); + self.metrics + .pending_snapshot_fb_index + .set(pending_blocks.latest_flashblock_index() as f64); + Ok(None) } - None => { + ReconciliationStrategy::HandleReorg => { + debug!( + message = "reorg detected, recomputing pending flashblocks going ahead of reorg", + tracked_txn_hashes = ?tracked_txn_hashes, + block_txn_hashes = ?block_txn_hashes, + ); + self.metrics.pending_clear_reorg.increment(1); + + // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state + flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number); + self.build_pending_state(None, &flashblocks) + } + ReconciliationStrategy::DepthLimitExceeded { depth, max_depth } => { + debug!( + message = "pending blocks depth exceeds max depth, resetting pending blocks", + pending_blocks_depth = depth, + max_depth = max_depth, + ); + + flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number); + self.build_pending_state(None, &flashblocks) + } + ReconciliationStrategy::Continue => { + debug!( + message = "canonical block behind latest pending block, continuing with existing pending state", + latest_pending_block = pending_blocks.latest_block_number(), + earliest_pending_block = pending_blocks.earliest_block_number(), + canonical_block = block.number, + pending_txns_for_block = ?tracked_txn_hashes.len(), + canonical_txns_for_block = ?block_txn_hashes.len(), + ); + // If no reorg, we can continue building on top of the existing pending state + // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number + self.build_pending_state(prev_pending_blocks, &flashblocks) + } + ReconciliationStrategy::NoPendingState => { + // This case is already handled above, but included for completeness debug!(message = "no pending state to update with canonical block, skipping"); Ok(None) } @@ -215,41 +219,52 @@ where ) -> eyre::Result>> { match &prev_pending_blocks { Some(pending_blocks) => { - if self.is_next_flashblock(pending_blocks, &flashblock) { - // We have received the next flashblock for the current block - // or the first flashblock for the next block - let mut flashblocks = pending_blocks.get_flashblocks(); - flashblocks.push(flashblock); - self.build_pending_state(prev_pending_blocks, &flashblocks) - } else if pending_blocks.latest_block_number() != flashblock.metadata.block_number { - // We have received a non-zero flashblock for a new block - self.metrics.unexpected_block_order.increment(1); - error!( - message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %flashblock.metadata.block_number, - ); - Ok(None) - } else if pending_blocks.latest_flashblock_index() == flashblock.index { - // We have received a duplicate flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - warn!( - message = "Received duplicate Flashblock for current block, ignoring", - curr_block = %pending_blocks.latest_block_number(), - flashblock_index = %flashblock.index, - ); - Ok(prev_pending_blocks) - } else { - // We have received a non-sequential Flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - - error!( - message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %flashblock.metadata.block_number, - ); - - Ok(None) + let validation_result = FlashblockSequenceValidator::validate( + pending_blocks.latest_block_number(), + pending_blocks.latest_flashblock_index(), + flashblock.metadata.block_number, + flashblock.index, + ); + + match validation_result { + SequenceValidationResult::NextInSequence + | SequenceValidationResult::FirstOfNextBlock => { + // We have received the next flashblock for the current block + // or the first flashblock for the next block + let mut flashblocks = pending_blocks.get_flashblocks(); + flashblocks.push(flashblock); + self.build_pending_state(prev_pending_blocks, &flashblocks) + } + SequenceValidationResult::Duplicate => { + // We have received a duplicate flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + warn!( + message = "Received duplicate Flashblock for current block, ignoring", + curr_block = %pending_blocks.latest_block_number(), + flashblock_index = %flashblock.index, + ); + Ok(prev_pending_blocks) + } + SequenceValidationResult::InvalidNewBlockIndex { block_number, index: _ } => { + // We have received a non-zero flashblock for a new block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %block_number, + ); + Ok(None) + } + SequenceValidationResult::NonSequentialGap { expected: _, actual: _ } => { + // We have received a non-sequential Flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %flashblock.metadata.block_number, + ); + Ok(None) + } } } None => { @@ -432,19 +447,4 @@ where Ok(Some(Arc::new(pending_blocks_builder.build()?))) } - - fn is_next_flashblock( - &self, - pending_blocks: &Arc, - flashblock: &Flashblock, - ) -> bool { - let is_next_of_block = flashblock.metadata.block_number - == pending_blocks.latest_block_number() - && flashblock.index == pending_blocks.latest_flashblock_index() + 1; - let is_first_of_next_block = flashblock.metadata.block_number - == pending_blocks.latest_block_number() + 1 - && flashblock.index == 0; - - is_next_of_block || is_first_of_next_block - } } diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs new file mode 100644 index 00000000..09a27b13 --- /dev/null +++ b/crates/flashblocks/src/validation.rs @@ -0,0 +1,998 @@ +//! Flashblock sequence validation and reorganization detection. +//! +//! This module provides pure, stateless validation logic for determining +//! whether an incoming flashblock is valid in the context of the current +//! pending state. The validator is designed to be easily unit-testable +//! without any external dependencies. +//! +//! It also provides utilities for detecting chain reorganizations by comparing +//! tracked transaction sets against canonical chain data. + +use std::collections::HashSet; + +use alloy_primitives::B256; + +/// Result of validating a flashblock's position in the sequence. +/// +/// This enum represents all possible outcomes when validating whether +/// an incoming flashblock follows the expected sequence relative to +/// the current latest flashblock. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SequenceValidationResult { + /// The flashblock is the next consecutive flashblock within the current block. + /// + /// This occurs when: + /// - `incoming_block_number == latest_block_number` + /// - `incoming_index == latest_flashblock_index + 1` + NextInSequence, + + /// The flashblock is the first flashblock (index 0) of the next block. + /// + /// This occurs when: + /// - `incoming_block_number == latest_block_number + 1` + /// - `incoming_index == 0` + FirstOfNextBlock, + + /// The flashblock has the same index as the current latest flashblock. + /// + /// This is a duplicate that should be ignored. + Duplicate, + + /// The flashblock has a non-sequential index within the same block. + /// + /// This indicates a gap in the flashblock sequence, which means + /// some flashblocks were missed. + NonSequentialGap { + /// The expected flashblock index. + expected: u64, + /// The actual incoming flashblock index. + actual: u64, + }, + + /// A new block was received with a non-zero flashblock index. + /// + /// The first flashblock of any new block must have index 0. + /// Receiving a non-zero index for a new block means we missed + /// the base flashblock. + InvalidNewBlockIndex { + /// The block number of the incoming flashblock. + block_number: u64, + /// The invalid (non-zero) index received. + index: u64, + }, +} + +/// Pure validator for flashblock sequence ordering. +/// +/// This validator determines whether an incoming flashblock is valid +/// in the context of the current pending state. It is designed to be +/// stateless and easily testable. +/// +/// # Example +/// +/// ``` +/// use base_reth_flashblocks::validation::{FlashblockSequenceValidator, SequenceValidationResult}; +/// +/// // Validate that flashblock index 3 follows index 2 in block 100 +/// let result = FlashblockSequenceValidator::validate(100, 2, 100, 3); +/// assert_eq!(result, SequenceValidationResult::NextInSequence); +/// +/// // Validate that flashblock index 0 of block 101 follows any flashblock in block 100 +/// let result = FlashblockSequenceValidator::validate(100, 5, 101, 0); +/// assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); +/// ``` +#[derive(Debug, Clone, Copy, Default)] +pub struct FlashblockSequenceValidator; + +impl FlashblockSequenceValidator { + /// Validates whether an incoming flashblock follows the expected sequence. + /// + /// This method implements the core validation logic for flashblock ordering: + /// + /// 1. **Next in sequence**: The incoming flashblock is the next consecutive + /// flashblock within the current block (same block number, index + 1). + /// + /// 2. **First of next block**: The incoming flashblock is the first flashblock + /// (index 0) of the next block (block number + 1). + /// + /// 3. **Duplicate**: The incoming flashblock has the same index as the current + /// latest flashblock within the same block. + /// + /// 4. **Non-sequential gap**: The incoming flashblock has a different block number + /// or a non-consecutive index within the same block. + /// + /// 5. **Invalid new block index**: A new block is received with a non-zero index. + /// + /// # Arguments + /// + /// * `latest_block_number` - The block number of the current latest flashblock. + /// * `latest_flashblock_index` - The index of the current latest flashblock. + /// * `incoming_block_number` - The block number of the incoming flashblock. + /// * `incoming_index` - The index of the incoming flashblock. + /// + /// # Returns + /// + /// A [`SequenceValidationResult`] indicating the validation outcome. + pub const fn validate( + latest_block_number: u64, + latest_flashblock_index: u64, + incoming_block_number: u64, + incoming_index: u64, + ) -> SequenceValidationResult { + // Check if this is the next flashblock within the current block + let is_next_of_block = incoming_block_number == latest_block_number + && incoming_index == latest_flashblock_index + 1; + + // Check if this is the first flashblock of the next block + let is_first_of_next_block = + incoming_block_number == latest_block_number + 1 && incoming_index == 0; + + if is_next_of_block || is_first_of_next_block { + if is_next_of_block { + SequenceValidationResult::NextInSequence + } else { + SequenceValidationResult::FirstOfNextBlock + } + } else if incoming_block_number != latest_block_number { + // New block with non-zero index + SequenceValidationResult::InvalidNewBlockIndex { + block_number: incoming_block_number, + index: incoming_index, + } + } else if incoming_index == latest_flashblock_index { + // Duplicate flashblock + SequenceValidationResult::Duplicate + } else { + // Non-sequential index within the same block + SequenceValidationResult::NonSequentialGap { + expected: latest_flashblock_index + 1, + actual: incoming_index, + } + } + } +} + +/// Result of a reorganization detection check. +/// +/// This enum represents whether a chain reorganization was detected +/// by comparing tracked transaction hashes against canonical chain data. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReorgDetectionResult { + /// No reorganization detected - transaction sets match exactly. + NoReorg, + /// Reorganization detected - transaction sets differ. + /// + /// Contains the counts from both sets for diagnostic purposes. + ReorgDetected { + /// Number of transactions in the tracked (pending) set. + tracked_count: usize, + /// Number of transactions in the canonical chain set. + canonical_count: usize, + }, +} + +impl ReorgDetectionResult { + /// Returns `true` if a reorganization was detected. + #[inline] + pub const fn is_reorg(&self) -> bool { + matches!(self, Self::ReorgDetected { .. }) + } + + /// Returns `true` if no reorganization was detected. + #[inline] + pub const fn is_no_reorg(&self) -> bool { + matches!(self, Self::NoReorg) + } +} + +/// A pure utility for detecting chain reorganizations. +/// +/// `ReorgDetector` compares two sets of transaction hashes to determine +/// if a reorganization has occurred. A reorg is detected when the tracked +/// transaction set differs from the canonical chain's transaction set, +/// either in count or content. +/// +/// # Example +/// +/// ``` +/// use alloy_primitives::B256; +/// use base_reth_flashblocks::validation::{ReorgDetector, ReorgDetectionResult}; +/// +/// let tracked = vec![B256::ZERO]; +/// let canonical = vec![B256::ZERO]; +/// +/// let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); +/// assert!(result.is_no_reorg()); +/// ``` +#[derive(Debug, Clone, Copy, Default)] +pub struct ReorgDetector; + +impl ReorgDetector { + /// Detects whether a chain reorganization occurred by comparing transaction hash sets. + /// + /// This method compares the tracked (pending) transaction hashes against the + /// canonical chain's transaction hashes. A reorganization is detected if: + /// - The number of transactions differs, or + /// - The sets contain different transaction hashes + /// + /// # Arguments + /// + /// * `tracked_tx_hashes` - Iterator over transaction hashes from the tracked/pending state. + /// * `canonical_tx_hashes` - Iterator over transaction hashes from the canonical chain. + /// + /// # Returns + /// + /// Returns [`ReorgDetectionResult::NoReorg`] if the transaction sets match exactly, + /// or [`ReorgDetectionResult::ReorgDetected`] with the counts if they differ. + /// + /// # Example + /// + /// ``` + /// use alloy_primitives::B256; + /// use base_reth_flashblocks::validation::{ReorgDetector, ReorgDetectionResult}; + /// + /// // Same transactions - no reorg + /// let hash = B256::repeat_byte(0x42); + /// let tracked = vec![hash]; + /// let canonical = vec![hash]; + /// + /// match ReorgDetector::detect(tracked.iter(), canonical.iter()) { + /// ReorgDetectionResult::NoReorg => println!("No reorg detected"), + /// ReorgDetectionResult::ReorgDetected { tracked_count, canonical_count } => { + /// println!("Reorg! tracked: {}, canonical: {}", tracked_count, canonical_count); + /// } + /// } + /// ``` + pub fn detect<'a, I1, I2>( + tracked_tx_hashes: I1, + canonical_tx_hashes: I2, + ) -> ReorgDetectionResult + where + I1: Iterator, + I2: Iterator, + { + let tracked_set: HashSet<&B256> = tracked_tx_hashes.collect(); + let canonical_set: HashSet<&B256> = canonical_tx_hashes.collect(); + + let tracked_count = tracked_set.len(); + let canonical_count = canonical_set.len(); + + // Check both count and content - if counts differ or sets are not equal, it's a reorg + if tracked_count != canonical_count || tracked_set != canonical_set { + ReorgDetectionResult::ReorgDetected { tracked_count, canonical_count } + } else { + ReorgDetectionResult::NoReorg + } + } +} + +/// Defines explicit handling approaches for reconciling pending state with canonical state. +/// +/// When a canonical block is received, the reconciliation strategy determines +/// how to handle the pending flashblock state based on the relationship between +/// the canonical chain and the pending blocks. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReconciliationStrategy { + /// Canonical chain has caught up to or passed the pending state. + /// + /// This occurs when the canonical block number is >= the latest pending block number. + /// The pending state should be cleared/reset as it's no longer ahead of canonical. + CatchUp, + + /// A chain reorganization has been detected. + /// + /// This occurs when the transactions in the pending state for a given block + /// don't match the transactions in the canonical block. The pending state + /// should be rebuilt from canonical without reusing existing state. + HandleReorg, + + /// The pending blocks have grown too far ahead of the canonical chain. + /// + /// This occurs when the depth (canonical_block - earliest_pending_block) + /// exceeds the configured maximum depth. Contains the current depth and + /// the configured maximum for diagnostic purposes. + DepthLimitExceeded { + /// The current depth of pending blocks. + depth: u64, + /// The configured maximum depth. + max_depth: u64, + }, + + /// No issues detected, continue building on existing pending state. + /// + /// This occurs when the canonical block is behind the pending state, + /// no reorg is detected, and depth limits are not exceeded. + Continue, + + /// No pending state exists yet. + /// + /// This occurs when there is no pending flashblock state to reconcile. + /// Typically happens at startup or after the pending state has been cleared. + NoPendingState, +} + +/// Reconciler for determining how to handle canonical block updates. +/// +/// This struct encapsulates the logic for determining which [`ReconciliationStrategy`] +/// should be used when a new canonical block is received. +/// +/// # Example +/// +/// ``` +/// use base_reth_flashblocks::validation::{CanonicalBlockReconciler, ReconciliationStrategy}; +/// +/// // Determine strategy when canonical catches up +/// let strategy = CanonicalBlockReconciler::reconcile( +/// Some(100), // earliest pending block +/// Some(105), // latest pending block +/// 105, // canonical block number (caught up) +/// 10, // max depth +/// false, // no reorg detected +/// ); +/// assert_eq!(strategy, ReconciliationStrategy::CatchUp); +/// ``` +#[derive(Debug, Clone, Copy, Default)] +pub struct CanonicalBlockReconciler; + +impl CanonicalBlockReconciler { + /// Determines the appropriate reconciliation strategy based on the current state. + /// + /// # Arguments + /// + /// * `pending_earliest_block` - The earliest block number in the pending state, if any. + /// * `pending_latest_block` - The latest block number in the pending state, if any. + /// * `canonical_block_number` - The block number of the new canonical block. + /// * `max_depth` - The maximum allowed depth between canonical and earliest pending block. + /// * `reorg_detected` - Whether a reorg was detected (transaction mismatch). + /// + /// # Returns + /// + /// The [`ReconciliationStrategy`] that should be used to handle this situation. + /// + /// # Strategy Selection Logic + /// + /// 1. If no pending state exists (`pending_earliest_block` or `pending_latest_block` is `None`), + /// returns [`ReconciliationStrategy::NoPendingState`]. + /// + /// 2. If canonical has caught up or passed pending (`canonical_block_number >= pending_latest_block`), + /// returns [`ReconciliationStrategy::CatchUp`]. + /// + /// 3. If a reorg is detected, returns [`ReconciliationStrategy::HandleReorg`]. + /// + /// 4. If depth limit is exceeded (`canonical_block_number - pending_earliest_block > max_depth`), + /// returns [`ReconciliationStrategy::DepthLimitExceeded`]. + /// + /// 5. Otherwise, returns [`ReconciliationStrategy::Continue`]. + pub const fn reconcile( + pending_earliest_block: Option, + pending_latest_block: Option, + canonical_block_number: u64, + max_depth: u64, + reorg_detected: bool, + ) -> ReconciliationStrategy { + // Check if pending state exists + let (earliest, latest) = match (pending_earliest_block, pending_latest_block) { + (Some(e), Some(l)) => (e, l), + _ => return ReconciliationStrategy::NoPendingState, + }; + + // Check if canonical has caught up or passed pending + if latest <= canonical_block_number { + return ReconciliationStrategy::CatchUp; + } + + // Check for reorg + if reorg_detected { + return ReconciliationStrategy::HandleReorg; + } + + // Check depth limit + let depth = canonical_block_number.saturating_sub(earliest); + if depth > max_depth { + return ReconciliationStrategy::DepthLimitExceeded { depth, max_depth }; + } + + // No issues, continue building + ReconciliationStrategy::Continue + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ==================== FlashblockSequenceValidator Tests ==================== + + /// Test the first flashblock ever (bootstrap case). + /// When starting fresh, we expect index 0 to be valid for any block. + #[test] + fn test_first_flashblock_bootstrap() { + // Simulating bootstrap: latest is block 0, index 0 (initial state) + // Incoming is block 1, index 0 (first real flashblock) + let result = FlashblockSequenceValidator::validate(0, 0, 1, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + } + + /// Test normal sequential flashblocks within the same block. + #[test] + fn test_next_in_sequence() { + // Block 100, index 2 -> Block 100, index 3 + let result = FlashblockSequenceValidator::validate(100, 2, 100, 3); + assert_eq!(result, SequenceValidationResult::NextInSequence); + + // Block 100, index 0 -> Block 100, index 1 + let result = FlashblockSequenceValidator::validate(100, 0, 100, 1); + assert_eq!(result, SequenceValidationResult::NextInSequence); + + // Large index values + let result = FlashblockSequenceValidator::validate(100, 999, 100, 1000); + assert_eq!(result, SequenceValidationResult::NextInSequence); + } + + /// Test first flashblock of a new block. + #[test] + fn test_first_of_next_block() { + // Block 100, index 5 -> Block 101, index 0 + let result = FlashblockSequenceValidator::validate(100, 5, 101, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + + // Block 100, index 0 -> Block 101, index 0 + let result = FlashblockSequenceValidator::validate(100, 0, 101, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + + // Large block numbers + let result = FlashblockSequenceValidator::validate(999999, 10, 1000000, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + } + + /// Test duplicate detection. + #[test] + fn test_duplicate() { + // Same block and same index + let result = FlashblockSequenceValidator::validate(100, 5, 100, 5); + assert_eq!(result, SequenceValidationResult::Duplicate); + + // Duplicate at index 0 + let result = FlashblockSequenceValidator::validate(100, 0, 100, 0); + assert_eq!(result, SequenceValidationResult::Duplicate); + } + + /// Test gap detection within the same block. + #[test] + fn test_non_sequential_gap() { + // Skipping an index: 2 -> 4 (expected 3) + let result = FlashblockSequenceValidator::validate(100, 2, 100, 4); + assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 3, actual: 4 }); + + // Large gap: 0 -> 10 (expected 1) + let result = FlashblockSequenceValidator::validate(100, 0, 100, 10); + assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 1, actual: 10 }); + + // Going backwards within same block: 5 -> 3 (expected 6) + let result = FlashblockSequenceValidator::validate(100, 5, 100, 3); + assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 6, actual: 3 }); + } + + /// Test non-zero index on a new block. + #[test] + fn test_invalid_new_block_index() { + // New block with non-zero index + let result = FlashblockSequenceValidator::validate(100, 5, 101, 1); + assert_eq!( + result, + SequenceValidationResult::InvalidNewBlockIndex { block_number: 101, index: 1 } + ); + + // Skipping blocks with non-zero index + let result = FlashblockSequenceValidator::validate(100, 5, 105, 3); + assert_eq!( + result, + SequenceValidationResult::InvalidNewBlockIndex { block_number: 105, index: 3 } + ); + + // Future block with index 0 is NOT first of next block (block gap) + let result = FlashblockSequenceValidator::validate(100, 5, 102, 0); + assert_eq!( + result, + SequenceValidationResult::InvalidNewBlockIndex { block_number: 102, index: 0 } + ); + } + + /// Test edge case: block number going backwards. + #[test] + fn test_block_number_regression() { + // Incoming block number is less than current + let result = FlashblockSequenceValidator::validate(100, 5, 99, 0); + assert_eq!( + result, + SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 0 } + ); + + let result = FlashblockSequenceValidator::validate(100, 5, 99, 5); + assert_eq!( + result, + SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 5 } + ); + } + + /// Test edge case: maximum u64 values. + #[test] + fn test_max_values() { + // Near max block number + let result = FlashblockSequenceValidator::validate(u64::MAX - 1, 0, u64::MAX, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + + // Near max index + let result = FlashblockSequenceValidator::validate(100, u64::MAX - 1, 100, u64::MAX); + assert_eq!(result, SequenceValidationResult::NextInSequence); + } + + /// Test edge case: zero block number. + #[test] + fn test_zero_block_number() { + // Block 0 to block 1 + let result = FlashblockSequenceValidator::validate(0, 5, 1, 0); + assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); + + // Sequential within block 0 + let result = FlashblockSequenceValidator::validate(0, 0, 0, 1); + assert_eq!(result, SequenceValidationResult::NextInSequence); + } + + /// Test that the validator is stateless and consistent. + #[test] + fn test_validator_is_stateless() { + // Same inputs should always produce the same output + for _ in 0..100 { + let result = FlashblockSequenceValidator::validate(100, 5, 100, 6); + assert_eq!(result, SequenceValidationResult::NextInSequence); + } + } + + /// Test comprehensive sequence of flashblocks. + #[test] + fn test_full_sequence() { + // Simulate a full sequence of flashblocks across two blocks + let test_cases = vec![ + // Block 100: index 0 -> 1 -> 2 -> 3 + ((100, 0, 100, 1), SequenceValidationResult::NextInSequence), + ((100, 1, 100, 2), SequenceValidationResult::NextInSequence), + ((100, 2, 100, 3), SequenceValidationResult::NextInSequence), + // Block 100 -> Block 101 (first flashblock) + ((100, 3, 101, 0), SequenceValidationResult::FirstOfNextBlock), + // Block 101: index 0 -> 1 + ((101, 0, 101, 1), SequenceValidationResult::NextInSequence), + ]; + + for ((latest_block, latest_idx, incoming_block, incoming_idx), expected) in test_cases { + let result = FlashblockSequenceValidator::validate( + latest_block, + latest_idx, + incoming_block, + incoming_idx, + ); + assert_eq!( + result, expected, + "Failed for latest=({}, {}), incoming=({}, {})", + latest_block, latest_idx, incoming_block, incoming_idx + ); + } + } + + // ==================== ReorgDetector Tests ==================== + + #[test] + fn test_reorg_identical_transaction_sets_no_reorg() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + + let tracked = vec![hash1, hash2, hash3]; + let canonical = vec![hash1, hash2, hash3]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!(result, ReorgDetectionResult::NoReorg); + assert!(result.is_no_reorg()); + assert!(!result.is_reorg()); + } + + #[test] + fn test_reorg_identical_sets_different_order_no_reorg() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + + // Different order should still be considered equal (set comparison) + let tracked = vec![hash1, hash2, hash3]; + let canonical = vec![hash3, hash1, hash2]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!(result, ReorgDetectionResult::NoReorg); + assert!(result.is_no_reorg()); + } + + #[test] + fn test_reorg_different_counts_reorg_detected() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + + let tracked = vec![hash1, hash2, hash3]; + let canonical = vec![hash1, hash2]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 } + ); + assert!(result.is_reorg()); + assert!(!result.is_no_reorg()); + } + + #[test] + fn test_reorg_canonical_has_more_transactions() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + + let tracked = vec![hash1]; + let canonical = vec![hash1, hash2, hash3]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 3 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_same_count_different_hashes() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + let hash4 = B256::repeat_byte(0x04); + + let tracked = vec![hash1, hash2]; + let canonical = vec![hash3, hash4]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_partial_overlap_different_hashes() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + let hash3 = B256::repeat_byte(0x03); + + // One hash in common, but different overall sets + let tracked = vec![hash1, hash2]; + let canonical = vec![hash1, hash3]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_empty_sets_no_reorg() { + let tracked: Vec = vec![]; + let canonical: Vec = vec![]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!(result, ReorgDetectionResult::NoReorg); + assert!(result.is_no_reorg()); + } + + #[test] + fn test_reorg_empty_tracked_non_empty_canonical() { + let hash1 = B256::repeat_byte(0x01); + + let tracked: Vec = vec![]; + let canonical = vec![hash1]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 0, canonical_count: 1 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_non_empty_tracked_empty_canonical() { + let hash1 = B256::repeat_byte(0x01); + + let tracked = vec![hash1]; + let canonical: Vec = vec![]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 0 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_single_transaction_match_no_reorg() { + let hash = B256::repeat_byte(0x42); + + let tracked = vec![hash]; + let canonical = vec![hash]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!(result, ReorgDetectionResult::NoReorg); + assert!(result.is_no_reorg()); + } + + #[test] + fn test_reorg_single_transaction_mismatch() { + let hash1 = B256::repeat_byte(0x42); + let hash2 = B256::repeat_byte(0x43); + + let tracked = vec![hash1]; + let canonical = vec![hash2]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + assert_eq!( + result, + ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 1 } + ); + assert!(result.is_reorg()); + } + + #[test] + fn test_reorg_duplicate_hashes_are_deduplicated() { + let hash1 = B256::repeat_byte(0x01); + let hash2 = B256::repeat_byte(0x02); + + // Duplicates should be deduplicated by the HashSet + let tracked = vec![hash1, hash1, hash2]; + let canonical = vec![hash1, hash2]; + + let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + + // After deduplication, both sets have 2 unique hashes + assert_eq!(result, ReorgDetectionResult::NoReorg); + } + + #[test] + fn test_reorg_detection_result_debug_impl() { + let result = ReorgDetectionResult::NoReorg; + assert_eq!(format!("{:?}", result), "NoReorg"); + + let result = ReorgDetectionResult::ReorgDetected { tracked_count: 5, canonical_count: 3 }; + assert!(format!("{:?}", result).contains("ReorgDetected")); + assert!(format!("{:?}", result).contains("5")); + assert!(format!("{:?}", result).contains("3")); + } + + #[test] + fn test_reorg_detector_is_copy() { + let detector = ReorgDetector; + let _copied = detector; + let _also_copied = detector; // Should compile since ReorgDetector is Copy + } + + #[test] + fn test_reorg_detector_default() { + let _detector = ReorgDetector::default(); + } + + // ==================== CanonicalBlockReconciler Tests ==================== + + /// Test that canonical catching up to pending returns CatchUp strategy. + #[test] + fn test_reconciler_canonical_catches_up_to_pending() { + // Canonical block equals latest pending block + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 105, // canonical (equal to latest pending) + 10, // max depth + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::CatchUp); + } + + /// Test that canonical passing pending returns CatchUp strategy. + #[test] + fn test_reconciler_canonical_passes_pending() { + // Canonical block is ahead of latest pending block + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 110, // canonical (ahead of latest pending) + 10, // max depth + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::CatchUp); + } + + /// Test that reorg detection returns HandleReorg strategy. + #[test] + fn test_reconciler_reorg_detected() { + // Canonical is behind pending but reorg detected + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(110), // latest pending + 102, // canonical (behind pending) + 10, // max depth + true, // reorg detected! + ); + assert_eq!(strategy, ReconciliationStrategy::HandleReorg); + } + + /// Test that exceeding depth limit returns DepthLimitExceeded strategy. + #[test] + fn test_reconciler_depth_limit_exceeded() { + // Pending blocks are too far ahead of canonical + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(120), // latest pending + 115, // canonical + 10, // max depth (115 - 100 = 15 > 10) + false, // no reorg + ); + assert_eq!( + strategy, + ReconciliationStrategy::DepthLimitExceeded { depth: 15, max_depth: 10 } + ); + } + + /// Test that normal operation returns Continue strategy. + #[test] + fn test_reconciler_continue_no_issues() { + // Everything is fine, continue building + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(110), // latest pending + 105, // canonical (behind pending) + 10, // max depth (105 - 100 = 5 <= 10) + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::Continue); + } + + /// Test that missing pending state returns NoPendingState strategy. + #[test] + fn test_reconciler_no_pending_state() { + // No pending state exists + let strategy = CanonicalBlockReconciler::reconcile(None, None, 100, 10, false); + assert_eq!(strategy, ReconciliationStrategy::NoPendingState); + + // Only earliest is Some + let strategy = CanonicalBlockReconciler::reconcile(Some(100), None, 100, 10, false); + assert_eq!(strategy, ReconciliationStrategy::NoPendingState); + + // Only latest is Some + let strategy = CanonicalBlockReconciler::reconcile(None, Some(100), 100, 10, false); + assert_eq!(strategy, ReconciliationStrategy::NoPendingState); + } + + /// Test edge case: depth exactly at limit should continue. + #[test] + fn test_reconciler_depth_at_limit_continues() { + // Depth exactly equals max_depth (not exceeded) + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(120), // latest pending + 110, // canonical (110 - 100 = 10, exactly at limit) + 10, // max depth + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::Continue); + } + + /// Test that reorg takes priority over depth limit. + #[test] + fn test_reconciler_reorg_priority_over_depth() { + // Both reorg and depth limit exceeded - reorg should take priority + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(130), // latest pending + 120, // canonical (120 - 100 = 20 > 10) + 10, // max depth + true, // reorg detected! + ); + // Reorg is checked before depth limit + assert_eq!(strategy, ReconciliationStrategy::HandleReorg); + } + + /// Test that CatchUp takes priority over reorg. + #[test] + fn test_reconciler_catchup_priority_over_reorg() { + // Canonical caught up and reorg detected - CatchUp should take priority + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 105, // canonical (caught up) + 10, // max depth + true, // reorg detected (but doesn't matter, canonical caught up) + ); + // CatchUp is checked before reorg + assert_eq!(strategy, ReconciliationStrategy::CatchUp); + } + + /// Test with zero depth. + #[test] + fn test_reconciler_zero_depth_canonical_at_earliest() { + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 100, // canonical at earliest + 10, // max depth + false, // no reorg + ); + // Canonical is behind latest, depth is 0, should continue + assert_eq!(strategy, ReconciliationStrategy::Continue); + } + + /// Test with max_depth of zero (strictest setting). + #[test] + fn test_reconciler_zero_max_depth() { + // Any depth > 0 should exceed limit + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 101, // canonical (101 - 100 = 1 > 0) + 0, // max depth of 0 + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::DepthLimitExceeded { depth: 1, max_depth: 0 }); + + // Depth of 0 should still work + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest pending + Some(105), // latest pending + 100, // canonical (100 - 100 = 0, not exceeded) + 0, // max depth of 0 + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::Continue); + } + + /// Test that earliest equals latest (single pending block). + #[test] + fn test_reconciler_single_pending_block() { + // Single pending block, canonical behind + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest = latest + Some(100), // single pending block + 99, // canonical behind + 10, // max depth + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::Continue); + + // Single pending block, canonical caught up + let strategy = CanonicalBlockReconciler::reconcile( + Some(100), // earliest = latest + Some(100), // single pending block + 100, // canonical caught up + 10, // max depth + false, // no reorg + ); + assert_eq!(strategy, ReconciliationStrategy::CatchUp); + } +} From 49e73533438e464ede917c2dc2172de059b36c2f Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 11:59:50 -0500 Subject: [PATCH 2/8] chore(flashblocks): trim down validation with rstest --- Cargo.lock | 1 + crates/flashblocks/Cargo.toml | 1 + crates/flashblocks/src/validation.rs | 677 ++++----------------------- 3 files changed, 97 insertions(+), 582 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8826fe35..4a83d0d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1577,6 +1577,7 @@ dependencies = [ "reth-rpc-eth-api", "reth-testing-utils", "reth-transaction-pool", + "rstest", "serde_json", "tokio", "tokio-tungstenite 0.28.0", diff --git a/crates/flashblocks/Cargo.toml b/crates/flashblocks/Cargo.toml index 0f604b5b..31598f74 100644 --- a/crates/flashblocks/Cargo.toml +++ b/crates/flashblocks/Cargo.toml @@ -73,6 +73,7 @@ reth-primitives-traits.workspace = true reth-optimism-primitives.workspace = true reth-transaction-pool.workspace = true serde_json.workspace = true +rstest.workspace = true criterion = { version = "0.5", features = ["async_tokio"] } [[bench]] diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs index 09a27b13..03e91a4b 100644 --- a/crates/flashblocks/src/validation.rs +++ b/crates/flashblocks/src/validation.rs @@ -400,599 +400,112 @@ impl CanonicalBlockReconciler { #[cfg(test)] mod tests { use super::*; + use rstest::rstest; // ==================== FlashblockSequenceValidator Tests ==================== - /// Test the first flashblock ever (bootstrap case). - /// When starting fresh, we expect index 0 to be valid for any block. - #[test] - fn test_first_flashblock_bootstrap() { - // Simulating bootstrap: latest is block 0, index 0 (initial state) - // Incoming is block 1, index 0 (first real flashblock) - let result = FlashblockSequenceValidator::validate(0, 0, 1, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - } - - /// Test normal sequential flashblocks within the same block. - #[test] - fn test_next_in_sequence() { - // Block 100, index 2 -> Block 100, index 3 - let result = FlashblockSequenceValidator::validate(100, 2, 100, 3); - assert_eq!(result, SequenceValidationResult::NextInSequence); - - // Block 100, index 0 -> Block 100, index 1 - let result = FlashblockSequenceValidator::validate(100, 0, 100, 1); - assert_eq!(result, SequenceValidationResult::NextInSequence); - - // Large index values - let result = FlashblockSequenceValidator::validate(100, 999, 100, 1000); - assert_eq!(result, SequenceValidationResult::NextInSequence); - } - - /// Test first flashblock of a new block. - #[test] - fn test_first_of_next_block() { - // Block 100, index 5 -> Block 101, index 0 - let result = FlashblockSequenceValidator::validate(100, 5, 101, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - - // Block 100, index 0 -> Block 101, index 0 - let result = FlashblockSequenceValidator::validate(100, 0, 101, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - - // Large block numbers - let result = FlashblockSequenceValidator::validate(999999, 10, 1000000, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - } - - /// Test duplicate detection. - #[test] - fn test_duplicate() { - // Same block and same index - let result = FlashblockSequenceValidator::validate(100, 5, 100, 5); - assert_eq!(result, SequenceValidationResult::Duplicate); - - // Duplicate at index 0 - let result = FlashblockSequenceValidator::validate(100, 0, 100, 0); - assert_eq!(result, SequenceValidationResult::Duplicate); - } - - /// Test gap detection within the same block. - #[test] - fn test_non_sequential_gap() { - // Skipping an index: 2 -> 4 (expected 3) - let result = FlashblockSequenceValidator::validate(100, 2, 100, 4); - assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 3, actual: 4 }); - - // Large gap: 0 -> 10 (expected 1) - let result = FlashblockSequenceValidator::validate(100, 0, 100, 10); - assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 1, actual: 10 }); - - // Going backwards within same block: 5 -> 3 (expected 6) - let result = FlashblockSequenceValidator::validate(100, 5, 100, 3); - assert_eq!(result, SequenceValidationResult::NonSequentialGap { expected: 6, actual: 3 }); - } - - /// Test non-zero index on a new block. - #[test] - fn test_invalid_new_block_index() { - // New block with non-zero index - let result = FlashblockSequenceValidator::validate(100, 5, 101, 1); - assert_eq!( - result, - SequenceValidationResult::InvalidNewBlockIndex { block_number: 101, index: 1 } - ); - - // Skipping blocks with non-zero index - let result = FlashblockSequenceValidator::validate(100, 5, 105, 3); - assert_eq!( - result, - SequenceValidationResult::InvalidNewBlockIndex { block_number: 105, index: 3 } - ); - - // Future block with index 0 is NOT first of next block (block gap) - let result = FlashblockSequenceValidator::validate(100, 5, 102, 0); - assert_eq!( - result, - SequenceValidationResult::InvalidNewBlockIndex { block_number: 102, index: 0 } - ); - } - - /// Test edge case: block number going backwards. - #[test] - fn test_block_number_regression() { - // Incoming block number is less than current - let result = FlashblockSequenceValidator::validate(100, 5, 99, 0); - assert_eq!( - result, - SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 0 } - ); - - let result = FlashblockSequenceValidator::validate(100, 5, 99, 5); - assert_eq!( - result, - SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 5 } - ); - } - - /// Test edge case: maximum u64 values. - #[test] - fn test_max_values() { - // Near max block number - let result = FlashblockSequenceValidator::validate(u64::MAX - 1, 0, u64::MAX, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - - // Near max index - let result = FlashblockSequenceValidator::validate(100, u64::MAX - 1, 100, u64::MAX); - assert_eq!(result, SequenceValidationResult::NextInSequence); - } - - /// Test edge case: zero block number. - #[test] - fn test_zero_block_number() { - // Block 0 to block 1 - let result = FlashblockSequenceValidator::validate(0, 5, 1, 0); - assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); - - // Sequential within block 0 - let result = FlashblockSequenceValidator::validate(0, 0, 0, 1); - assert_eq!(result, SequenceValidationResult::NextInSequence); - } - - /// Test that the validator is stateless and consistent. - #[test] - fn test_validator_is_stateless() { - // Same inputs should always produce the same output - for _ in 0..100 { - let result = FlashblockSequenceValidator::validate(100, 5, 100, 6); - assert_eq!(result, SequenceValidationResult::NextInSequence); - } - } - - /// Test comprehensive sequence of flashblocks. - #[test] - fn test_full_sequence() { - // Simulate a full sequence of flashblocks across two blocks - let test_cases = vec![ - // Block 100: index 0 -> 1 -> 2 -> 3 - ((100, 0, 100, 1), SequenceValidationResult::NextInSequence), - ((100, 1, 100, 2), SequenceValidationResult::NextInSequence), - ((100, 2, 100, 3), SequenceValidationResult::NextInSequence), - // Block 100 -> Block 101 (first flashblock) - ((100, 3, 101, 0), SequenceValidationResult::FirstOfNextBlock), - // Block 101: index 0 -> 1 - ((101, 0, 101, 1), SequenceValidationResult::NextInSequence), - ]; - - for ((latest_block, latest_idx, incoming_block, incoming_idx), expected) in test_cases { - let result = FlashblockSequenceValidator::validate( - latest_block, - latest_idx, - incoming_block, - incoming_idx, - ); - assert_eq!( - result, expected, - "Failed for latest=({}, {}), incoming=({}, {})", - latest_block, latest_idx, incoming_block, incoming_idx - ); - } + #[rstest] + // NextInSequence: consecutive indices within the same block + #[case(100, 2, 100, 3, SequenceValidationResult::NextInSequence)] + #[case(100, 0, 100, 1, SequenceValidationResult::NextInSequence)] + #[case(100, 999, 100, 1000, SequenceValidationResult::NextInSequence)] + #[case(0, 0, 0, 1, SequenceValidationResult::NextInSequence)] + #[case(100, u64::MAX - 1, 100, u64::MAX, SequenceValidationResult::NextInSequence)] + // FirstOfNextBlock: index 0 of the next block + #[case(0, 0, 1, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(100, 5, 101, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(100, 0, 101, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(999999, 10, 1000000, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(0, 5, 1, 0, SequenceValidationResult::FirstOfNextBlock)] + #[case(u64::MAX - 1, 0, u64::MAX, 0, SequenceValidationResult::FirstOfNextBlock)] + // Duplicate: same block and index + #[case(100, 5, 100, 5, SequenceValidationResult::Duplicate)] + #[case(100, 0, 100, 0, SequenceValidationResult::Duplicate)] + // NonSequentialGap: non-consecutive indices within the same block + #[case(100, 2, 100, 4, SequenceValidationResult::NonSequentialGap { expected: 3, actual: 4 })] + #[case(100, 0, 100, 10, SequenceValidationResult::NonSequentialGap { expected: 1, actual: 10 })] + #[case(100, 5, 100, 3, SequenceValidationResult::NonSequentialGap { expected: 6, actual: 3 })] + // InvalidNewBlockIndex: new block with non-zero index or block gap + #[case(100, 5, 101, 1, SequenceValidationResult::InvalidNewBlockIndex { block_number: 101, index: 1 })] + #[case(100, 5, 105, 3, SequenceValidationResult::InvalidNewBlockIndex { block_number: 105, index: 3 })] + #[case(100, 5, 102, 0, SequenceValidationResult::InvalidNewBlockIndex { block_number: 102, index: 0 })] + #[case(100, 5, 99, 0, SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 0 })] + #[case(100, 5, 99, 5, SequenceValidationResult::InvalidNewBlockIndex { block_number: 99, index: 5 })] + fn test_sequence_validator( + #[case] latest_block: u64, + #[case] latest_idx: u64, + #[case] incoming_block: u64, + #[case] incoming_idx: u64, + #[case] expected: SequenceValidationResult, + ) { + let result = + FlashblockSequenceValidator::validate(latest_block, latest_idx, incoming_block, incoming_idx); + assert_eq!(result, expected); } // ==================== ReorgDetector Tests ==================== - #[test] - fn test_reorg_identical_transaction_sets_no_reorg() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - - let tracked = vec![hash1, hash2, hash3]; - let canonical = vec![hash1, hash2, hash3]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!(result, ReorgDetectionResult::NoReorg); - assert!(result.is_no_reorg()); - assert!(!result.is_reorg()); - } - - #[test] - fn test_reorg_identical_sets_different_order_no_reorg() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - - // Different order should still be considered equal (set comparison) - let tracked = vec![hash1, hash2, hash3]; - let canonical = vec![hash3, hash1, hash2]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!(result, ReorgDetectionResult::NoReorg); - assert!(result.is_no_reorg()); - } - - #[test] - fn test_reorg_different_counts_reorg_detected() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - - let tracked = vec![hash1, hash2, hash3]; - let canonical = vec![hash1, hash2]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 } - ); - assert!(result.is_reorg()); - assert!(!result.is_no_reorg()); - } - - #[test] - fn test_reorg_canonical_has_more_transactions() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - - let tracked = vec![hash1]; - let canonical = vec![hash1, hash2, hash3]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 3 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_same_count_different_hashes() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - let hash4 = B256::repeat_byte(0x04); - - let tracked = vec![hash1, hash2]; - let canonical = vec![hash3, hash4]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_partial_overlap_different_hashes() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - let hash3 = B256::repeat_byte(0x03); - - // One hash in common, but different overall sets - let tracked = vec![hash1, hash2]; - let canonical = vec![hash1, hash3]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_empty_sets_no_reorg() { - let tracked: Vec = vec![]; - let canonical: Vec = vec![]; - + #[rstest] + // No reorg cases + #[case(&[], &[], ReorgDetectionResult::NoReorg)] + #[case(&[0x01], &[0x01], ReorgDetectionResult::NoReorg)] + #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02, 0x03], ReorgDetectionResult::NoReorg)] + #[case(&[0x01, 0x02, 0x03], &[0x03, 0x01, 0x02], ReorgDetectionResult::NoReorg)] // order doesn't matter + #[case(&[0x01, 0x01, 0x02], &[0x01, 0x02], ReorgDetectionResult::NoReorg)] // duplicates deduplicated + // Reorg cases - different counts + #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 })] + #[case(&[0x01], &[0x01, 0x02, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 3 })] + #[case(&[], &[0x01], ReorgDetectionResult::ReorgDetected { tracked_count: 0, canonical_count: 1 })] + #[case(&[0x01], &[], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 0 })] + // Reorg cases - same count, different hashes + #[case(&[0x01, 0x02], &[0x03, 0x04], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] + #[case(&[0x01, 0x02], &[0x01, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] + #[case(&[0x42], &[0x43], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 1 })] + fn test_reorg_detector( + #[case] tracked_bytes: &[u8], + #[case] canonical_bytes: &[u8], + #[case] expected: ReorgDetectionResult, + ) { + let tracked: Vec = tracked_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); + let canonical: Vec = canonical_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!(result, ReorgDetectionResult::NoReorg); - assert!(result.is_no_reorg()); - } - - #[test] - fn test_reorg_empty_tracked_non_empty_canonical() { - let hash1 = B256::repeat_byte(0x01); - - let tracked: Vec = vec![]; - let canonical = vec![hash1]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 0, canonical_count: 1 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_non_empty_tracked_empty_canonical() { - let hash1 = B256::repeat_byte(0x01); - - let tracked = vec![hash1]; - let canonical: Vec = vec![]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 0 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_single_transaction_match_no_reorg() { - let hash = B256::repeat_byte(0x42); - - let tracked = vec![hash]; - let canonical = vec![hash]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!(result, ReorgDetectionResult::NoReorg); - assert!(result.is_no_reorg()); - } - - #[test] - fn test_reorg_single_transaction_mismatch() { - let hash1 = B256::repeat_byte(0x42); - let hash2 = B256::repeat_byte(0x43); - - let tracked = vec![hash1]; - let canonical = vec![hash2]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - assert_eq!( - result, - ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 1 } - ); - assert!(result.is_reorg()); - } - - #[test] - fn test_reorg_duplicate_hashes_are_deduplicated() { - let hash1 = B256::repeat_byte(0x01); - let hash2 = B256::repeat_byte(0x02); - - // Duplicates should be deduplicated by the HashSet - let tracked = vec![hash1, hash1, hash2]; - let canonical = vec![hash1, hash2]; - - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); - - // After deduplication, both sets have 2 unique hashes - assert_eq!(result, ReorgDetectionResult::NoReorg); - } - - #[test] - fn test_reorg_detection_result_debug_impl() { - let result = ReorgDetectionResult::NoReorg; - assert_eq!(format!("{:?}", result), "NoReorg"); - - let result = ReorgDetectionResult::ReorgDetected { tracked_count: 5, canonical_count: 3 }; - assert!(format!("{:?}", result).contains("ReorgDetected")); - assert!(format!("{:?}", result).contains("5")); - assert!(format!("{:?}", result).contains("3")); - } - - #[test] - fn test_reorg_detector_is_copy() { - let detector = ReorgDetector; - let _copied = detector; - let _also_copied = detector; // Should compile since ReorgDetector is Copy - } - - #[test] - fn test_reorg_detector_default() { - let _detector = ReorgDetector::default(); + assert_eq!(result, expected); + assert_eq!(result.is_reorg(), matches!(expected, ReorgDetectionResult::ReorgDetected { .. })); } // ==================== CanonicalBlockReconciler Tests ==================== - /// Test that canonical catching up to pending returns CatchUp strategy. - #[test] - fn test_reconciler_canonical_catches_up_to_pending() { - // Canonical block equals latest pending block - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 105, // canonical (equal to latest pending) - 10, // max depth - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::CatchUp); - } - - /// Test that canonical passing pending returns CatchUp strategy. - #[test] - fn test_reconciler_canonical_passes_pending() { - // Canonical block is ahead of latest pending block - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 110, // canonical (ahead of latest pending) - 10, // max depth - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::CatchUp); - } - - /// Test that reorg detection returns HandleReorg strategy. - #[test] - fn test_reconciler_reorg_detected() { - // Canonical is behind pending but reorg detected - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(110), // latest pending - 102, // canonical (behind pending) - 10, // max depth - true, // reorg detected! - ); - assert_eq!(strategy, ReconciliationStrategy::HandleReorg); - } - - /// Test that exceeding depth limit returns DepthLimitExceeded strategy. - #[test] - fn test_reconciler_depth_limit_exceeded() { - // Pending blocks are too far ahead of canonical - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(120), // latest pending - 115, // canonical - 10, // max depth (115 - 100 = 15 > 10) - false, // no reorg - ); - assert_eq!( - strategy, - ReconciliationStrategy::DepthLimitExceeded { depth: 15, max_depth: 10 } - ); - } - - /// Test that normal operation returns Continue strategy. - #[test] - fn test_reconciler_continue_no_issues() { - // Everything is fine, continue building - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(110), // latest pending - 105, // canonical (behind pending) - 10, // max depth (105 - 100 = 5 <= 10) - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::Continue); - } - - /// Test that missing pending state returns NoPendingState strategy. - #[test] - fn test_reconciler_no_pending_state() { - // No pending state exists - let strategy = CanonicalBlockReconciler::reconcile(None, None, 100, 10, false); - assert_eq!(strategy, ReconciliationStrategy::NoPendingState); - - // Only earliest is Some - let strategy = CanonicalBlockReconciler::reconcile(Some(100), None, 100, 10, false); - assert_eq!(strategy, ReconciliationStrategy::NoPendingState); - - // Only latest is Some - let strategy = CanonicalBlockReconciler::reconcile(None, Some(100), 100, 10, false); - assert_eq!(strategy, ReconciliationStrategy::NoPendingState); - } - - /// Test edge case: depth exactly at limit should continue. - #[test] - fn test_reconciler_depth_at_limit_continues() { - // Depth exactly equals max_depth (not exceeded) - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(120), // latest pending - 110, // canonical (110 - 100 = 10, exactly at limit) - 10, // max depth - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::Continue); - } - - /// Test that reorg takes priority over depth limit. - #[test] - fn test_reconciler_reorg_priority_over_depth() { - // Both reorg and depth limit exceeded - reorg should take priority - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(130), // latest pending - 120, // canonical (120 - 100 = 20 > 10) - 10, // max depth - true, // reorg detected! - ); - // Reorg is checked before depth limit - assert_eq!(strategy, ReconciliationStrategy::HandleReorg); - } - - /// Test that CatchUp takes priority over reorg. - #[test] - fn test_reconciler_catchup_priority_over_reorg() { - // Canonical caught up and reorg detected - CatchUp should take priority - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 105, // canonical (caught up) - 10, // max depth - true, // reorg detected (but doesn't matter, canonical caught up) - ); - // CatchUp is checked before reorg - assert_eq!(strategy, ReconciliationStrategy::CatchUp); - } - - /// Test with zero depth. - #[test] - fn test_reconciler_zero_depth_canonical_at_earliest() { - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 100, // canonical at earliest - 10, // max depth - false, // no reorg - ); - // Canonical is behind latest, depth is 0, should continue - assert_eq!(strategy, ReconciliationStrategy::Continue); - } - - /// Test with max_depth of zero (strictest setting). - #[test] - fn test_reconciler_zero_max_depth() { - // Any depth > 0 should exceed limit - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 101, // canonical (101 - 100 = 1 > 0) - 0, // max depth of 0 - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::DepthLimitExceeded { depth: 1, max_depth: 0 }); - - // Depth of 0 should still work - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest pending - Some(105), // latest pending - 100, // canonical (100 - 100 = 0, not exceeded) - 0, // max depth of 0 - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::Continue); - } - - /// Test that earliest equals latest (single pending block). - #[test] - fn test_reconciler_single_pending_block() { - // Single pending block, canonical behind - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest = latest - Some(100), // single pending block - 99, // canonical behind - 10, // max depth - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::Continue); - - // Single pending block, canonical caught up - let strategy = CanonicalBlockReconciler::reconcile( - Some(100), // earliest = latest - Some(100), // single pending block - 100, // canonical caught up - 10, // max depth - false, // no reorg - ); - assert_eq!(strategy, ReconciliationStrategy::CatchUp); + #[rstest] + // NoPendingState + #[case(None, None, 100, 10, false, ReconciliationStrategy::NoPendingState)] + #[case(Some(100), None, 100, 10, false, ReconciliationStrategy::NoPendingState)] + #[case(None, Some(100), 100, 10, false, ReconciliationStrategy::NoPendingState)] + // CatchUp: canonical >= latest pending + #[case(Some(100), Some(105), 105, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(105), 110, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(100), 100, 10, false, ReconciliationStrategy::CatchUp)] + #[case(Some(100), Some(105), 105, 10, true, ReconciliationStrategy::CatchUp)] // catchup > reorg priority + // HandleReorg + #[case(Some(100), Some(110), 102, 10, true, ReconciliationStrategy::HandleReorg)] + #[case(Some(100), Some(130), 120, 10, true, ReconciliationStrategy::HandleReorg)] // reorg > depth priority + // DepthLimitExceeded + #[case(Some(100), Some(120), 115, 10, false, ReconciliationStrategy::DepthLimitExceeded { depth: 15, max_depth: 10 })] + #[case(Some(100), Some(105), 101, 0, false, ReconciliationStrategy::DepthLimitExceeded { depth: 1, max_depth: 0 })] + // Continue + #[case(Some(100), Some(110), 105, 10, false, ReconciliationStrategy::Continue)] + #[case(Some(100), Some(120), 110, 10, false, ReconciliationStrategy::Continue)] // depth exactly at limit + #[case(Some(100), Some(105), 100, 10, false, ReconciliationStrategy::Continue)] + #[case(Some(100), Some(105), 100, 0, false, ReconciliationStrategy::Continue)] // zero depth ok with max_depth=0 + #[case(Some(100), Some(100), 99, 10, false, ReconciliationStrategy::Continue)] // single pending block + fn test_reconciler( + #[case] earliest: Option, + #[case] latest: Option, + #[case] canonical: u64, + #[case] max_depth: u64, + #[case] reorg: bool, + #[case] expected: ReconciliationStrategy, + ) { + let result = CanonicalBlockReconciler::reconcile(earliest, latest, canonical, max_depth, reorg); + assert_eq!(result, expected); } } From eaaf3d32cb3484124d668d637b0ec3a652a4366e Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 12:12:47 -0500 Subject: [PATCH 3/8] chore(flashblocks): trim down validation code docs --- crates/flashblocks/src/validation.rs | 276 +++++---------------------- 1 file changed, 46 insertions(+), 230 deletions(-) diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs index 03e91a4b..c3e13921 100644 --- a/crates/flashblocks/src/validation.rs +++ b/crates/flashblocks/src/validation.rs @@ -1,118 +1,49 @@ //! Flashblock sequence validation and reorganization detection. //! -//! This module provides pure, stateless validation logic for determining -//! whether an incoming flashblock is valid in the context of the current -//! pending state. The validator is designed to be easily unit-testable -//! without any external dependencies. -//! -//! It also provides utilities for detecting chain reorganizations by comparing -//! tracked transaction sets against canonical chain data. +//! Provides stateless validation logic for flashblock sequencing and chain reorg detection. use std::collections::HashSet; use alloy_primitives::B256; /// Result of validating a flashblock's position in the sequence. -/// -/// This enum represents all possible outcomes when validating whether -/// an incoming flashblock follows the expected sequence relative to -/// the current latest flashblock. #[derive(Debug, Clone, PartialEq, Eq)] pub enum SequenceValidationResult { - /// The flashblock is the next consecutive flashblock within the current block. - /// - /// This occurs when: - /// - `incoming_block_number == latest_block_number` - /// - `incoming_index == latest_flashblock_index + 1` + /// Next consecutive flashblock within the current block (same block, index + 1). NextInSequence, - - /// The flashblock is the first flashblock (index 0) of the next block. - /// - /// This occurs when: - /// - `incoming_block_number == latest_block_number + 1` - /// - `incoming_index == 0` + /// First flashblock (index 0) of the next block (block + 1). FirstOfNextBlock, - - /// The flashblock has the same index as the current latest flashblock. - /// - /// This is a duplicate that should be ignored. + /// Duplicate flashblock (same block and index) - should be ignored. Duplicate, - - /// The flashblock has a non-sequential index within the same block. - /// - /// This indicates a gap in the flashblock sequence, which means - /// some flashblocks were missed. + /// Non-sequential index within the same block - indicates missed flashblocks. NonSequentialGap { - /// The expected flashblock index. + /// Expected flashblock index. expected: u64, - /// The actual incoming flashblock index. + /// Actual incoming flashblock index. actual: u64, }, - - /// A new block was received with a non-zero flashblock index. - /// - /// The first flashblock of any new block must have index 0. - /// Receiving a non-zero index for a new block means we missed - /// the base flashblock. + /// New block received with non-zero index - missed the base flashblock. InvalidNewBlockIndex { - /// The block number of the incoming flashblock. + /// Block number of the incoming flashblock. block_number: u64, /// The invalid (non-zero) index received. index: u64, }, } -/// Pure validator for flashblock sequence ordering. -/// -/// This validator determines whether an incoming flashblock is valid -/// in the context of the current pending state. It is designed to be -/// stateless and easily testable. -/// -/// # Example -/// -/// ``` -/// use base_reth_flashblocks::validation::{FlashblockSequenceValidator, SequenceValidationResult}; -/// -/// // Validate that flashblock index 3 follows index 2 in block 100 -/// let result = FlashblockSequenceValidator::validate(100, 2, 100, 3); -/// assert_eq!(result, SequenceValidationResult::NextInSequence); -/// -/// // Validate that flashblock index 0 of block 101 follows any flashblock in block 100 -/// let result = FlashblockSequenceValidator::validate(100, 5, 101, 0); -/// assert_eq!(result, SequenceValidationResult::FirstOfNextBlock); -/// ``` +/// Stateless validator for flashblock sequence ordering. #[derive(Debug, Clone, Copy, Default)] pub struct FlashblockSequenceValidator; impl FlashblockSequenceValidator { /// Validates whether an incoming flashblock follows the expected sequence. /// - /// This method implements the core validation logic for flashblock ordering: - /// - /// 1. **Next in sequence**: The incoming flashblock is the next consecutive - /// flashblock within the current block (same block number, index + 1). - /// - /// 2. **First of next block**: The incoming flashblock is the first flashblock - /// (index 0) of the next block (block number + 1). - /// - /// 3. **Duplicate**: The incoming flashblock has the same index as the current - /// latest flashblock within the same block. - /// - /// 4. **Non-sequential gap**: The incoming flashblock has a different block number - /// or a non-consecutive index within the same block. - /// - /// 5. **Invalid new block index**: A new block is received with a non-zero index. - /// - /// # Arguments - /// - /// * `latest_block_number` - The block number of the current latest flashblock. - /// * `latest_flashblock_index` - The index of the current latest flashblock. - /// * `incoming_block_number` - The block number of the incoming flashblock. - /// * `incoming_index` - The index of the incoming flashblock. - /// - /// # Returns - /// - /// A [`SequenceValidationResult`] indicating the validation outcome. + /// Returns the appropriate [`SequenceValidationResult`] based on: + /// - Same block, index + 1 → `NextInSequence` + /// - Next block, index 0 → `FirstOfNextBlock` + /// - Same block and index → `Duplicate` + /// - Same block, wrong index → `NonSequentialGap` + /// - Different block, non-zero index or block gap → `InvalidNewBlockIndex` pub const fn validate( latest_block_number: u64, latest_flashblock_index: u64, @@ -153,16 +84,11 @@ impl FlashblockSequenceValidator { } /// Result of a reorganization detection check. -/// -/// This enum represents whether a chain reorganization was detected -/// by comparing tracked transaction hashes against canonical chain data. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ReorgDetectionResult { - /// No reorganization detected - transaction sets match exactly. + /// Transaction sets match exactly. NoReorg, - /// Reorganization detected - transaction sets differ. - /// - /// Contains the counts from both sets for diagnostic purposes. + /// Transaction sets differ (counts included for diagnostics). ReorgDetected { /// Number of transactions in the tracked (pending) set. tracked_count: usize, @@ -185,64 +111,14 @@ impl ReorgDetectionResult { } } -/// A pure utility for detecting chain reorganizations. -/// -/// `ReorgDetector` compares two sets of transaction hashes to determine -/// if a reorganization has occurred. A reorg is detected when the tracked -/// transaction set differs from the canonical chain's transaction set, -/// either in count or content. -/// -/// # Example -/// -/// ``` -/// use alloy_primitives::B256; -/// use base_reth_flashblocks::validation::{ReorgDetector, ReorgDetectionResult}; -/// -/// let tracked = vec![B256::ZERO]; -/// let canonical = vec![B256::ZERO]; -/// -/// let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); -/// assert!(result.is_no_reorg()); -/// ``` +/// Detects chain reorganizations by comparing transaction hash sets. #[derive(Debug, Clone, Copy, Default)] pub struct ReorgDetector; impl ReorgDetector { - /// Detects whether a chain reorganization occurred by comparing transaction hash sets. - /// - /// This method compares the tracked (pending) transaction hashes against the - /// canonical chain's transaction hashes. A reorganization is detected if: - /// - The number of transactions differs, or - /// - The sets contain different transaction hashes - /// - /// # Arguments - /// - /// * `tracked_tx_hashes` - Iterator over transaction hashes from the tracked/pending state. - /// * `canonical_tx_hashes` - Iterator over transaction hashes from the canonical chain. - /// - /// # Returns - /// - /// Returns [`ReorgDetectionResult::NoReorg`] if the transaction sets match exactly, - /// or [`ReorgDetectionResult::ReorgDetected`] with the counts if they differ. - /// - /// # Example + /// Compares tracked vs canonical transaction hashes to detect reorgs. /// - /// ``` - /// use alloy_primitives::B256; - /// use base_reth_flashblocks::validation::{ReorgDetector, ReorgDetectionResult}; - /// - /// // Same transactions - no reorg - /// let hash = B256::repeat_byte(0x42); - /// let tracked = vec![hash]; - /// let canonical = vec![hash]; - /// - /// match ReorgDetector::detect(tracked.iter(), canonical.iter()) { - /// ReorgDetectionResult::NoReorg => println!("No reorg detected"), - /// ReorgDetectionResult::ReorgDetected { tracked_count, canonical_count } => { - /// println!("Reorg! tracked: {}, canonical: {}", tracked_count, canonical_count); - /// } - /// } - /// ``` + /// Returns `ReorgDetected` if counts differ or sets contain different hashes. pub fn detect<'a, I1, I2>( tracked_tx_hashes: I1, canonical_tx_hashes: I2, @@ -266,103 +142,34 @@ impl ReorgDetector { } } -/// Defines explicit handling approaches for reconciling pending state with canonical state. -/// -/// When a canonical block is received, the reconciliation strategy determines -/// how to handle the pending flashblock state based on the relationship between -/// the canonical chain and the pending blocks. +/// Strategy for reconciling pending state with canonical state on new canonical blocks. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ReconciliationStrategy { - /// Canonical chain has caught up to or passed the pending state. - /// - /// This occurs when the canonical block number is >= the latest pending block number. - /// The pending state should be cleared/reset as it's no longer ahead of canonical. + /// Canonical caught up or passed pending (canonical >= latest pending). Clear pending state. CatchUp, - - /// A chain reorganization has been detected. - /// - /// This occurs when the transactions in the pending state for a given block - /// don't match the transactions in the canonical block. The pending state - /// should be rebuilt from canonical without reusing existing state. + /// Reorg detected (tx mismatch). Rebuild pending from canonical. HandleReorg, - - /// The pending blocks have grown too far ahead of the canonical chain. - /// - /// This occurs when the depth (canonical_block - earliest_pending_block) - /// exceeds the configured maximum depth. Contains the current depth and - /// the configured maximum for diagnostic purposes. + /// Pending too far ahead of canonical. DepthLimitExceeded { - /// The current depth of pending blocks. + /// Current depth of pending blocks. depth: u64, - /// The configured maximum depth. + /// Configured maximum depth. max_depth: u64, }, - - /// No issues detected, continue building on existing pending state. - /// - /// This occurs when the canonical block is behind the pending state, - /// no reorg is detected, and depth limits are not exceeded. + /// No issues - continue building on pending state. Continue, - - /// No pending state exists yet. - /// - /// This occurs when there is no pending flashblock state to reconcile. - /// Typically happens at startup or after the pending state has been cleared. + /// No pending state exists (startup or after clear). NoPendingState, } -/// Reconciler for determining how to handle canonical block updates. -/// -/// This struct encapsulates the logic for determining which [`ReconciliationStrategy`] -/// should be used when a new canonical block is received. -/// -/// # Example -/// -/// ``` -/// use base_reth_flashblocks::validation::{CanonicalBlockReconciler, ReconciliationStrategy}; -/// -/// // Determine strategy when canonical catches up -/// let strategy = CanonicalBlockReconciler::reconcile( -/// Some(100), // earliest pending block -/// Some(105), // latest pending block -/// 105, // canonical block number (caught up) -/// 10, // max depth -/// false, // no reorg detected -/// ); -/// assert_eq!(strategy, ReconciliationStrategy::CatchUp); -/// ``` +/// Determines reconciliation strategy for canonical block updates. #[derive(Debug, Clone, Copy, Default)] pub struct CanonicalBlockReconciler; impl CanonicalBlockReconciler { - /// Determines the appropriate reconciliation strategy based on the current state. - /// - /// # Arguments - /// - /// * `pending_earliest_block` - The earliest block number in the pending state, if any. - /// * `pending_latest_block` - The latest block number in the pending state, if any. - /// * `canonical_block_number` - The block number of the new canonical block. - /// * `max_depth` - The maximum allowed depth between canonical and earliest pending block. - /// * `reorg_detected` - Whether a reorg was detected (transaction mismatch). + /// Returns the appropriate [`ReconciliationStrategy`] based on pending vs canonical state. /// - /// # Returns - /// - /// The [`ReconciliationStrategy`] that should be used to handle this situation. - /// - /// # Strategy Selection Logic - /// - /// 1. If no pending state exists (`pending_earliest_block` or `pending_latest_block` is `None`), - /// returns [`ReconciliationStrategy::NoPendingState`]. - /// - /// 2. If canonical has caught up or passed pending (`canonical_block_number >= pending_latest_block`), - /// returns [`ReconciliationStrategy::CatchUp`]. - /// - /// 3. If a reorg is detected, returns [`ReconciliationStrategy::HandleReorg`]. - /// - /// 4. If depth limit is exceeded (`canonical_block_number - pending_earliest_block > max_depth`), - /// returns [`ReconciliationStrategy::DepthLimitExceeded`]. - /// - /// 5. Otherwise, returns [`ReconciliationStrategy::Continue`]. + /// Priority: `NoPendingState` → `CatchUp` → `HandleReorg` → `DepthLimitExceeded` → `Continue` pub const fn reconcile( pending_earliest_block: Option, pending_latest_block: Option, @@ -399,9 +206,10 @@ impl CanonicalBlockReconciler { #[cfg(test)] mod tests { - use super::*; use rstest::rstest; + use super::*; + // ==================== FlashblockSequenceValidator Tests ==================== #[rstest] @@ -438,8 +246,12 @@ mod tests { #[case] incoming_idx: u64, #[case] expected: SequenceValidationResult, ) { - let result = - FlashblockSequenceValidator::validate(latest_block, latest_idx, incoming_block, incoming_idx); + let result = FlashblockSequenceValidator::validate( + latest_block, + latest_idx, + incoming_block, + incoming_idx, + ); assert_eq!(result, expected); } @@ -470,7 +282,10 @@ mod tests { let canonical: Vec = canonical_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); assert_eq!(result, expected); - assert_eq!(result.is_reorg(), matches!(expected, ReorgDetectionResult::ReorgDetected { .. })); + assert_eq!( + result.is_reorg(), + matches!(expected, ReorgDetectionResult::ReorgDetected { .. }) + ); } // ==================== CanonicalBlockReconciler Tests ==================== @@ -505,7 +320,8 @@ mod tests { #[case] reorg: bool, #[case] expected: ReconciliationStrategy, ) { - let result = CanonicalBlockReconciler::reconcile(earliest, latest, canonical, max_depth, reorg); + let result = + CanonicalBlockReconciler::reconcile(earliest, latest, canonical, max_depth, reorg); assert_eq!(result, expected); } } From 5112ad10f12cf3e2d268227823153a7a87efc139 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 12:15:48 -0500 Subject: [PATCH 4/8] fix(flashblocks): pub mod --- crates/flashblocks/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 99c851f3..17d619fd 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -24,7 +24,7 @@ pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI}; mod state_builder; pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder}; -pub mod validation; +mod validation; pub use validation::{ CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, ReorgDetectionResult, ReorgDetector, SequenceValidationResult, From 50eba136987db9a6ae5fadd34b6100b8a0526232 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 12:46:57 -0500 Subject: [PATCH 5/8] fix(flashblocks): missing validation case with unordered blocks = reorg --- crates/flashblocks/src/validation.rs | 29 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs index c3e13921..a50c1489 100644 --- a/crates/flashblocks/src/validation.rs +++ b/crates/flashblocks/src/validation.rs @@ -2,8 +2,6 @@ //! //! Provides stateless validation logic for flashblock sequencing and chain reorg detection. -use std::collections::HashSet; - use alloy_primitives::B256; /// Result of validating a flashblock's position in the sequence. @@ -118,7 +116,7 @@ pub struct ReorgDetector; impl ReorgDetector { /// Compares tracked vs canonical transaction hashes to detect reorgs. /// - /// Returns `ReorgDetected` if counts differ or sets contain different hashes. + /// Returns `ReorgDetected` if counts differ, hashes differ, or order differs. pub fn detect<'a, I1, I2>( tracked_tx_hashes: I1, canonical_tx_hashes: I2, @@ -127,15 +125,15 @@ impl ReorgDetector { I1: Iterator, I2: Iterator, { - let tracked_set: HashSet<&B256> = tracked_tx_hashes.collect(); - let canonical_set: HashSet<&B256> = canonical_tx_hashes.collect(); - - let tracked_count = tracked_set.len(); - let canonical_count = canonical_set.len(); + let tracked: Vec<&B256> = tracked_tx_hashes.collect(); + let canonical: Vec<&B256> = canonical_tx_hashes.collect(); - // Check both count and content - if counts differ or sets are not equal, it's a reorg - if tracked_count != canonical_count || tracked_set != canonical_set { - ReorgDetectionResult::ReorgDetected { tracked_count, canonical_count } + // Check count, content, AND order - any difference indicates a reorg + if tracked != canonical { + ReorgDetectionResult::ReorgDetected { + tracked_count: tracked.len(), + canonical_count: canonical.len(), + } } else { ReorgDetectionResult::NoReorg } @@ -258,17 +256,20 @@ mod tests { // ==================== ReorgDetector Tests ==================== #[rstest] - // No reorg cases + // No reorg cases - identical sequences #[case(&[], &[], ReorgDetectionResult::NoReorg)] #[case(&[0x01], &[0x01], ReorgDetectionResult::NoReorg)] #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02, 0x03], ReorgDetectionResult::NoReorg)] - #[case(&[0x01, 0x02, 0x03], &[0x03, 0x01, 0x02], ReorgDetectionResult::NoReorg)] // order doesn't matter - #[case(&[0x01, 0x01, 0x02], &[0x01, 0x02], ReorgDetectionResult::NoReorg)] // duplicates deduplicated + #[case(&[0x01, 0x01, 0x02], &[0x01, 0x01, 0x02], ReorgDetectionResult::NoReorg)] + // Reorg cases - different order (order matters!) + #[case(&[0x01, 0x02, 0x03], &[0x03, 0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 3 })] + #[case(&[0x01, 0x02], &[0x02, 0x01], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] // Reorg cases - different counts #[case(&[0x01, 0x02, 0x03], &[0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 })] #[case(&[0x01], &[0x01, 0x02, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 3 })] #[case(&[], &[0x01], ReorgDetectionResult::ReorgDetected { tracked_count: 0, canonical_count: 1 })] #[case(&[0x01], &[], ReorgDetectionResult::ReorgDetected { tracked_count: 1, canonical_count: 0 })] + #[case(&[0x01, 0x01, 0x02], &[0x01, 0x02], ReorgDetectionResult::ReorgDetected { tracked_count: 3, canonical_count: 2 })] // Reorg cases - same count, different hashes #[case(&[0x01, 0x02], &[0x03, 0x04], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] #[case(&[0x01, 0x02], &[0x01, 0x03], ReorgDetectionResult::ReorgDetected { tracked_count: 2, canonical_count: 2 })] From f0433cbf7001da9a3a8327f05afb1933566fc588 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 12:56:48 -0500 Subject: [PATCH 6/8] chore(flashblocks): flatten nested branches in validation --- crates/flashblocks/src/validation.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs index a50c1489..31dc549f 100644 --- a/crates/flashblocks/src/validation.rs +++ b/crates/flashblocks/src/validation.rs @@ -48,22 +48,16 @@ impl FlashblockSequenceValidator { incoming_block_number: u64, incoming_index: u64, ) -> SequenceValidationResult { - // Check if this is the next flashblock within the current block - let is_next_of_block = incoming_block_number == latest_block_number - && incoming_index == latest_flashblock_index + 1; - - // Check if this is the first flashblock of the next block - let is_first_of_next_block = - incoming_block_number == latest_block_number + 1 && incoming_index == 0; - - if is_next_of_block || is_first_of_next_block { - if is_next_of_block { - SequenceValidationResult::NextInSequence - } else { - SequenceValidationResult::FirstOfNextBlock - } + // Next flashblock within the current block + if incoming_block_number == latest_block_number + && incoming_index == latest_flashblock_index + 1 + { + SequenceValidationResult::NextInSequence + // First flashblock of the next block + } else if incoming_block_number == latest_block_number + 1 && incoming_index == 0 { + SequenceValidationResult::FirstOfNextBlock + // New block with non-zero index or block gap } else if incoming_block_number != latest_block_number { - // New block with non-zero index SequenceValidationResult::InvalidNewBlockIndex { block_number: incoming_block_number, index: incoming_index, From 73021ab672fed8a93332d72cecd94bb090672e29 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 13:09:02 -0500 Subject: [PATCH 7/8] fix(flashblocks): accept slices in detection --- crates/flashblocks/src/processor.rs | 107 +++++++++++++-------------- crates/flashblocks/src/validation.rs | 24 ++---- 2 files changed, 61 insertions(+), 70 deletions(-) diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 5b3323b1..ebdbc818 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -143,8 +143,7 @@ where let tracked_txn_hashes: Vec<_> = tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); let block_txn_hashes: Vec<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); - let reorg_result = - ReorgDetector::detect(tracked_txn_hashes.iter(), block_txn_hashes.iter()); + let reorg_result = ReorgDetector::detect(&tracked_txn_hashes, &block_txn_hashes); let reorg_detected = reorg_result.is_reorg(); // Determine the reconciliation strategy @@ -217,64 +216,64 @@ where prev_pending_blocks: Option>, flashblock: Flashblock, ) -> eyre::Result>> { - match &prev_pending_blocks { - Some(pending_blocks) => { - let validation_result = FlashblockSequenceValidator::validate( - pending_blocks.latest_block_number(), - pending_blocks.latest_flashblock_index(), - flashblock.metadata.block_number, - flashblock.index, - ); - - match validation_result { - SequenceValidationResult::NextInSequence - | SequenceValidationResult::FirstOfNextBlock => { - // We have received the next flashblock for the current block - // or the first flashblock for the next block - let mut flashblocks = pending_blocks.get_flashblocks(); - flashblocks.push(flashblock); - self.build_pending_state(prev_pending_blocks, &flashblocks) - } - SequenceValidationResult::Duplicate => { - // We have received a duplicate flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - warn!( - message = "Received duplicate Flashblock for current block, ignoring", - curr_block = %pending_blocks.latest_block_number(), - flashblock_index = %flashblock.index, - ); - Ok(prev_pending_blocks) - } - SequenceValidationResult::InvalidNewBlockIndex { block_number, index: _ } => { - // We have received a non-zero flashblock for a new block - self.metrics.unexpected_block_order.increment(1); - error!( - message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %block_number, - ); - Ok(None) - } - SequenceValidationResult::NonSequentialGap { expected: _, actual: _ } => { - // We have received a non-sequential Flashblock for the current block - self.metrics.unexpected_block_order.increment(1); - error!( - message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_blocks.latest_block_number(), - new_block = %flashblock.metadata.block_number, - ); - Ok(None) - } - } - } + let pending_blocks = match &prev_pending_blocks { + Some(pb) => pb, None => { if flashblock.index == 0 { - self.build_pending_state(None, &vec![flashblock]) + return self.build_pending_state(None, &vec![flashblock]); } else { info!(message = "waiting for first Flashblock"); - Ok(None) + return Ok(None); } } + }; + + let validation_result = FlashblockSequenceValidator::validate( + pending_blocks.latest_block_number(), + pending_blocks.latest_flashblock_index(), + flashblock.metadata.block_number, + flashblock.index, + ); + + match validation_result { + SequenceValidationResult::NextInSequence + | SequenceValidationResult::FirstOfNextBlock => { + // We have received the next flashblock for the current block + // or the first flashblock for the next block + let mut flashblocks = pending_blocks.get_flashblocks(); + flashblocks.push(flashblock); + self.build_pending_state(prev_pending_blocks, &flashblocks) + } + SequenceValidationResult::Duplicate => { + // We have received a duplicate flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + warn!( + message = "Received duplicate Flashblock for current block, ignoring", + curr_block = %pending_blocks.latest_block_number(), + flashblock_index = %flashblock.index, + ); + Ok(prev_pending_blocks) + } + SequenceValidationResult::InvalidNewBlockIndex { block_number, index: _ } => { + // We have received a non-zero flashblock for a new block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %block_number, + ); + Ok(None) + } + SequenceValidationResult::NonSequentialGap { expected: _, actual: _ } => { + // We have received a non-sequential Flashblock for the current block + self.metrics.unexpected_block_order.increment(1); + error!( + message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock", + curr_block = %pending_blocks.latest_block_number(), + new_block = %flashblock.metadata.block_number, + ); + Ok(None) + } } } diff --git a/crates/flashblocks/src/validation.rs b/crates/flashblocks/src/validation.rs index 31dc549f..7d5607a9 100644 --- a/crates/flashblocks/src/validation.rs +++ b/crates/flashblocks/src/validation.rs @@ -111,22 +111,14 @@ impl ReorgDetector { /// Compares tracked vs canonical transaction hashes to detect reorgs. /// /// Returns `ReorgDetected` if counts differ, hashes differ, or order differs. - pub fn detect<'a, I1, I2>( - tracked_tx_hashes: I1, - canonical_tx_hashes: I2, - ) -> ReorgDetectionResult - where - I1: Iterator, - I2: Iterator, - { - let tracked: Vec<&B256> = tracked_tx_hashes.collect(); - let canonical: Vec<&B256> = canonical_tx_hashes.collect(); - - // Check count, content, AND order - any difference indicates a reorg - if tracked != canonical { + pub fn detect( + tracked_tx_hashes: &[B256], + canonical_tx_hashes: &[B256], + ) -> ReorgDetectionResult { + if tracked_tx_hashes != canonical_tx_hashes { ReorgDetectionResult::ReorgDetected { - tracked_count: tracked.len(), - canonical_count: canonical.len(), + tracked_count: tracked_tx_hashes.len(), + canonical_count: canonical_tx_hashes.len(), } } else { ReorgDetectionResult::NoReorg @@ -275,7 +267,7 @@ mod tests { ) { let tracked: Vec = tracked_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); let canonical: Vec = canonical_bytes.iter().map(|b| B256::repeat_byte(*b)).collect(); - let result = ReorgDetector::detect(tracked.iter(), canonical.iter()); + let result = ReorgDetector::detect(&tracked, &canonical); assert_eq!(result, expected); assert_eq!( result.is_reorg(), From d7d9806ec28b38d3b49f22089f6ae6387fd4aa3e Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Thu, 8 Jan 2026 15:03:05 -0500 Subject: [PATCH 8/8] chore(flashblocks): nits :sparkles: --- crates/flashblocks/src/lib.rs | 3 +++ crates/flashblocks/src/processor.rs | 3 +-- crates/flashblocks/src/state.rs | 1 - crates/flashblocks/src/subscription.rs | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 17d619fd..24b76bc8 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -3,6 +3,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +#[macro_use] +extern crate tracing; + mod metrics; pub use metrics::Metrics; diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index ebdbc818..5c705531 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -28,7 +28,6 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_primitives::OpBlock; use reth_primitives::RecoveredBlock; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; -use tracing::{debug, error, info, warn}; use crate::{ Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, @@ -169,7 +168,7 @@ where Ok(None) } ReconciliationStrategy::HandleReorg => { - debug!( + warn!( message = "reorg detected, recomputing pending flashblocks going ahead of reorg", tracked_txn_hashes = ?tracked_txn_hashes, block_txn_hashes = ?block_txn_hashes, diff --git a/crates/flashblocks/src/state.rs b/crates/flashblocks/src/state.rs index 5b01f827..f2bcefe9 100644 --- a/crates/flashblocks/src/state.rs +++ b/crates/flashblocks/src/state.rs @@ -17,7 +17,6 @@ use tokio::sync::{ broadcast::{self, Sender}, mpsc, }; -use tracing::{error, info}; use crate::{ FlashblocksAPI, FlashblocksReceiver, PendingBlocks, diff --git a/crates/flashblocks/src/subscription.rs b/crates/flashblocks/src/subscription.rs index 34e208c7..7322fd49 100644 --- a/crates/flashblocks/src/subscription.rs +++ b/crates/flashblocks/src/subscription.rs @@ -6,7 +6,6 @@ use base_flashtypes::Flashblock; use futures_util::{SinkExt as _, StreamExt}; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -use tracing::{error, info, trace, warn}; use url::Url; use crate::{FlashblocksReceiver, Metrics};