From 6718eeec9e1ad5c414e0ceb9552da3b6e8023d30 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Fri, 5 Dec 2025 03:09:34 +0530 Subject: [PATCH] refactor: reorg detection --- src/block_range_scanner/reorg_handler.rs | 332 +++++++++++++++++++---- src/block_range_scanner/ring_buffer.rs | 79 ++++++ 2 files changed, 351 insertions(+), 60 deletions(-) diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index 9fd12614..dc0fde17 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -2,9 +2,9 @@ use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse}, - primitives::BlockHash, + primitives::{BlockHash, BlockNumber}, }; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::{ ScannerError, @@ -12,12 +12,12 @@ use crate::{ robust_provider::{self, RobustProvider}, }; -use super::ring_buffer::RingBuffer; +use super::ring_buffer::{BlockInfo, RingBuffer}; #[derive(Clone)] pub(crate) struct ReorgHandler { provider: RobustProvider, - buffer: RingBuffer, + buffer: RingBuffer>, } impl ReorgHandler { @@ -27,6 +27,11 @@ impl ReorgHandler { /// Checks if a block was reorged and returns the common ancestor if found. /// + /// This implementation uses a hybrid approach: + /// - For previously seen blocks (duplicates), verifies they still exist on-chain + /// - For new blocks, verifies parent hash against the buffer for efficient detection + /// - Falls back to finalized block for deep reorgs beyond buffer capacity + /// /// # Arguments /// /// * `block` - The block to check for reorg. @@ -39,97 +44,304 @@ impl ReorgHandler { /// /// # Edge Cases /// - /// * **Duplicate block detection** - If the incoming block hash matches the last buffered hash, - /// it won't be added again to prevent buffer pollution from duplicate checks. + /// * **Duplicate block detection** - If the incoming block matches a buffered block, it's + /// verified on-chain to detect reorgs affecting previously processed blocks. + /// + /// * **Empty buffer** - If the buffer is empty, no reorg detection is possible. The block is + /// simply added to the buffer. /// - /// * **Empty buffer on reorg** - If a reorg is detected but the buffer is empty (e.g., first - /// block after initialization), the function falls back to the finalized block as the common - /// ancestor. + /// * **Gap in block numbers** - If there's a gap between the incoming block and the buffer, + /// intermediate blocks are fetched to verify chain continuity. /// /// * **Deep reorg beyond buffer capacity** - If all buffered blocks are reorged (buffer - /// exhausted), the finalized block is used as a safe fallback to prevent data loss. + /// exhausted), the finalized block is used as a safe fallback. /// - /// * **Common ancestor beyond finalized** - This can happen if not all sequental blocks are - /// checked and stored. If the found common ancestor has a lower block number than the - /// finalized block, the finalized block is used instead and the buffer is cleared. + /// * **Common ancestor beyond finalized** - If the found common ancestor has a lower block + /// number than the finalized block, the finalized block is used instead. /// /// * **Network errors during lookup** - Non-`BlockNotFound` errors (e.g., RPC failures) are /// propagated immediately rather than being treated as reorgs. - /// - /// * **Finalized block unavailable** - If the finalized block cannot be fetched when needed as - /// a fallback, the error is propagated to the caller. pub async fn check( &mut self, block: &N::BlockResponse, ) -> Result, ScannerError> { - let block = block.header(); - info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged"); - - if !self.reorg_detected(block).await? { - let block_hash = block.hash(); - info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected"); - // store the incoming block's hash for future reference - if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) { - self.buffer.push(block_hash); - } + let header = block.header(); + let block_number = header.number(); + let block_hash = header.hash(); + let parent_hash = header.parent_hash(); + + debug!( + block_hash = %block_hash, + block_number = block_number, + parent_hash = %parent_hash, + "Checking block for reorg" + ); + + // Handle empty buffer case - no reorg detection possible yet + if self.buffer.is_empty() { + info!(block_number = block_number, "Buffer empty, adding first block"); + self.buffer.push(BlockInfo { number: block_number, hash: block_hash }); return Ok(None); } - info!("Reorg detected, searching for common ancestor"); + // Check if this is a previously seen block (duplicate check) + // This happens when checking `previous_batch_end` to verify it wasn't reorged + if let Some(buffered) = self.buffer.find_by_number(block_number) { + if buffered.hash == block_hash { + // This is a known block - verify it still exists on-chain + debug!( + block_number = block_number, + "Block already in buffer, verifying on-chain existence" + ); + return self.verify_block_on_chain(block).await; + } + // Same number but different hash - definite reorg + info!( + block_number = block_number, + buffered_hash = %buffered.hash, + incoming_hash = %block_hash, + "Block number matches but hash differs, reorg detected" + ); + return self.find_common_ancestor(block).await; + } - while let Some(&block_hash) = self.buffer.back() { - info!(block_hash = %block_hash, "Checking if block exists on-chain"); - match self.provider.get_block_by_hash(block_hash).await { - Ok(common_ancestor) => return self.return_common_ancestor(common_ancestor).await, - Err(robust_provider::Error::BlockNotFound(_)) => { - // block was reorged - _ = self.buffer.pop_back(); - } - Err(e) => return Err(e.into()), + // New block - use parent hash verification + let parent_number = block_number.saturating_sub(1); + + // Try to find the parent in our buffer + if let Some(buffered_parent) = self.buffer.find_by_number(parent_number) { + if buffered_parent.hash == parent_hash { + // Parent hash matches - no reorg, add block to buffer + debug!( + block_number = block_number, + parent_number = parent_number, + "Parent hash matches buffer, no reorg detected" + ); + self.buffer.push(BlockInfo { number: block_number, hash: block_hash }); + return Ok(None); + } + + // Parent hash mismatch - reorg detected! + info!( + block_number = block_number, + parent_number = parent_number, + expected_hash = %buffered_parent.hash, + actual_hash = %parent_hash, + "Parent hash mismatch, reorg detected" + ); + return self.find_common_ancestor(block).await; + } + + // Parent not in buffer - might be a gap, need to verify chain continuity + self.handle_gap(block, parent_number).await + } + + /// Verifies that a previously seen block still exists on-chain. + /// Used for duplicate blocks to detect if they were reorged out. + async fn verify_block_on_chain( + &mut self, + block: &N::BlockResponse, + ) -> Result, ScannerError> { + let header = block.header(); + let block_hash = header.hash(); + let block_number = header.number(); + + match self.provider.get_block_by_hash(block_hash).await { + Ok(_) => { + // Block still exists on-chain, no reorg + debug!(block_number = block_number, "Block verified on-chain, no reorg"); + Ok(None) + } + Err(robust_provider::Error::BlockNotFound(_)) => { + // Block was reorged out + info!( + block_number = block_number, + block_hash = %block_hash, + "Block no longer exists on-chain, reorg detected" + ); + self.find_common_ancestor(block).await } + Err(e) => Err(e.into()), } + } - // return last finalized block as common ancestor + /// Handles the case where there's a gap between the incoming block and the buffer. + /// Fetches intermediate blocks to verify chain continuity. + async fn handle_gap( + &mut self, + incoming_block: &N::BlockResponse, + parent_number: BlockNumber, + ) -> Result, ScannerError> { + let incoming_header = incoming_block.header(); + let block_number = incoming_header.number(); + let block_hash = incoming_header.hash(); - // no need to store finalized block's hash in the buffer, as it is returned by default only - // if not buffered hashes exist on-chain + // Get the latest block we have in buffer + let Some(last_buffered) = self.buffer.back().copied() else { + // Buffer became empty somehow, just add the block + self.buffer.push(BlockInfo { number: block_number, hash: block_hash }); + return Ok(None); + }; - warn!("Possible deep reorg detected, setting finalized block as common ancestor"); + // If incoming block is not ahead of buffer, something is wrong + if block_number <= last_buffered.number { + // Block is at or before our buffer tip - this could indicate a reorg + // where we're receiving an alternative chain + info!( + incoming_number = block_number, + buffer_tip = last_buffered.number, + "Incoming block not ahead of buffer, checking for reorg" + ); + return self.find_common_ancestor(incoming_block).await; + } - let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + debug!( + buffer_tip = last_buffered.number, + incoming_number = block_number, + gap_size = block_number - last_buffered.number - 1, + "Gap detected, fetching intermediate blocks" + ); - let header = finalized.header(); - info!(finalized_hash = %header.hash(), block_number = header.number(), "Finalized block set as common ancestor"); + // Fetch blocks from (last_buffered.number + 1) to parent_number to verify chain + let mut current_number = last_buffered.number + 1; + let mut expected_parent_hash = last_buffered.hash; - Ok(Some(finalized)) + while current_number <= parent_number { + let intermediate_block = + match self.provider.get_block_by_number(current_number.into()).await { + Ok(block) => block, + Err(e) => { + warn!( + block_number = current_number, + error = %e, + "Failed to fetch intermediate block" + ); + return Err(e.into()); + } + }; + + let intermediate_header = intermediate_block.header(); + let intermediate_hash = intermediate_header.hash(); + let intermediate_parent = intermediate_header.parent_hash(); + + // Verify this block's parent matches what we expect + if intermediate_parent != expected_parent_hash { + info!( + block_number = current_number, + expected_parent = %expected_parent_hash, + actual_parent = %intermediate_parent, + "Chain discontinuity detected during gap fill, reorg detected" + ); + return self.find_common_ancestor(incoming_block).await; + } + + // Add to buffer and continue + self.buffer.push(BlockInfo { number: current_number, hash: intermediate_hash }); + expected_parent_hash = intermediate_hash; + current_number += 1; + } + + // Now verify the incoming block's parent hash + if incoming_header.parent_hash() != expected_parent_hash { + info!( + block_number = block_number, + expected_parent = %expected_parent_hash, + actual_parent = %incoming_header.parent_hash(), + "Incoming block parent mismatch after gap fill, reorg detected" + ); + return self.find_common_ancestor(incoming_block).await; + } + + // Chain is continuous, add the incoming block + self.buffer.push(BlockInfo { number: block_number, hash: block_hash }); + Ok(None) } - async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result { - match self.provider.get_block_by_hash(block.hash()).await { - Ok(_) => Ok(false), - Err(robust_provider::Error::BlockNotFound(_)) => Ok(true), - Err(e) => Err(e.into()), + /// Finds the common ancestor by walking back through the buffer and verifying + /// each block's existence on-chain. + async fn find_common_ancestor( + &mut self, + _incoming_block: &N::BlockResponse, + ) -> Result, ScannerError> { + info!("Searching for common ancestor"); + + // Walk back through buffer to find a block that still exists on-chain + while let Some(block_info) = self.buffer.pop_back() { + debug!( + block_number = block_info.number, + block_hash = %block_info.hash, + "Checking if buffered block exists on-chain" + ); + + match self.provider.get_block_by_hash(block_info.hash).await { + Ok(block) => { + // Found a block that exists on-chain - this is our common ancestor + return self.validate_and_return_ancestor(block).await; + } + Err(robust_provider::Error::BlockNotFound(_)) => { + // Block was reorged, continue walking back + debug!( + block_number = block_info.number, + "Block was reorged, continuing search" + ); + } + Err(e) => { + // Network error, propagate it + return Err(e.into()); + } + } } + + // Buffer exhausted - fall back to finalized block + self.fallback_to_finalized().await } - async fn return_common_ancestor( + /// Validates the common ancestor against the finalized block and returns it. + async fn validate_and_return_ancestor( &mut self, - common_ancestor: ::BlockResponse, + common_ancestor: N::BlockResponse, ) -> Result, ScannerError> { - let common_ancestor_header = common_ancestor.header(); + let ancestor_header = common_ancestor.header(); + let ancestor_number = ancestor_header.number(); + let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; let finalized_header = finalized.header(); - let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() { - info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found"); - common_ancestor + let finalized_number = finalized_header.number(); + + if ancestor_number >= finalized_number { + info!( + common_ancestor_number = ancestor_number, + common_ancestor_hash = %ancestor_header.hash(), + "Common ancestor found" + ); + // Truncate buffer to remove reorged blocks + self.buffer.truncate_from(ancestor_number + 1); + Ok(Some(common_ancestor)) } else { warn!( - finalized_hash = %finalized_header.hash(), block_number = finalized_header.number(), "Possible deep reorg detected, using finalized block as common ancestor" + ancestor_number = ancestor_number, + finalized_number = finalized_number, + "Common ancestor is before finalized block, using finalized as ancestor" ); - // all buffered blocks are finalized, so no more need to track them self.buffer.clear(); - finalized - }; - Ok(Some(common_ancestor)) + Ok(Some(finalized)) + } + } + + /// Falls back to the finalized block when buffer is exhausted. + async fn fallback_to_finalized(&mut self) -> Result, ScannerError> { + warn!("Buffer exhausted, falling back to finalized block as common ancestor"); + + let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + let finalized_header = finalized.header(); + + info!( + finalized_number = finalized_header.number(), + finalized_hash = %finalized_header.hash(), + "Using finalized block as common ancestor" + ); + + self.buffer.clear(); + Ok(Some(finalized)) } } diff --git a/src/block_range_scanner/ring_buffer.rs b/src/block_range_scanner/ring_buffer.rs index 6b00be69..234f5447 100644 --- a/src/block_range_scanner/ring_buffer.rs +++ b/src/block_range_scanner/ring_buffer.rs @@ -1,5 +1,7 @@ use std::collections::VecDeque; +use alloy::primitives::BlockNumber; + #[derive(Copy, Clone, Debug)] pub enum RingBufferCapacity { Limited(usize), @@ -20,6 +22,13 @@ macro_rules! impl_from_unsigned { impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize); +/// Information about a block stored in the ring buffer. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct BlockInfo { + pub number: BlockNumber, + pub hash: H, +} + #[derive(Clone)] pub(crate) struct RingBuffer { inner: VecDeque, @@ -67,10 +76,32 @@ impl RingBuffer { pub fn clear(&mut self) { self.inner.clear(); } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +impl RingBuffer> { + /// Finds a block by its number. + /// + /// Performs a reverse linear search since we typically look for recent blocks. + pub fn find_by_number(&self, number: BlockNumber) -> Option<&BlockInfo> { + self.inner.iter().rev().find(|info| info.number == number) + } + + /// Truncates the buffer to remove all blocks after (and including) the given block number. + /// This is used when a reorg is detected to remove the reorged blocks. + pub fn truncate_from(&mut self, from_number: BlockNumber) { + self.inner.retain(|info| info.number < from_number); + } } #[cfg(test)] mod tests { + use alloy::primitives::B256; + use super::*; #[test] @@ -79,4 +110,52 @@ mod tests { buf.push(1); assert!(buf.inner.is_empty()); } + + #[test] + fn find_by_number_returns_correct_block() { + let mut buf = RingBuffer::>::new(RingBufferCapacity::Limited(10)); + + let hash1 = B256::repeat_byte(1); + let hash2 = B256::repeat_byte(2); + let hash3 = B256::repeat_byte(3); + + buf.push(BlockInfo { number: 100, hash: hash1 }); + buf.push(BlockInfo { number: 101, hash: hash2 }); + buf.push(BlockInfo { number: 102, hash: hash3 }); + + assert_eq!(buf.find_by_number(100).map(|b| b.hash), Some(hash1)); + assert_eq!(buf.find_by_number(101).map(|b| b.hash), Some(hash2)); + assert_eq!(buf.find_by_number(102).map(|b| b.hash), Some(hash3)); + assert_eq!(buf.find_by_number(99), None); + assert_eq!(buf.find_by_number(103), None); + } + + #[test] + fn truncate_from_removes_blocks_from_given_number() { + let mut buf = RingBuffer::>::new(RingBufferCapacity::Limited(10)); + + buf.push(BlockInfo { number: 100, hash: B256::repeat_byte(1) }); + buf.push(BlockInfo { number: 101, hash: B256::repeat_byte(2) }); + buf.push(BlockInfo { number: 102, hash: B256::repeat_byte(3) }); + buf.push(BlockInfo { number: 103, hash: B256::repeat_byte(4) }); + + buf.truncate_from(102); + + assert!(buf.find_by_number(100).is_some()); + assert!(buf.find_by_number(101).is_some()); + assert!(buf.find_by_number(102).is_none()); + assert!(buf.find_by_number(103).is_none()); + } + + #[test] + fn is_empty_works_correctly() { + let mut buf = RingBuffer::>::new(RingBufferCapacity::Limited(10)); + assert!(buf.is_empty()); + + buf.push(BlockInfo { number: 100, hash: B256::repeat_byte(1) }); + assert!(!buf.is_empty()); + + buf.clear(); + assert!(buf.is_empty()); + } }