diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 0bae1332..639b9473 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -11,7 +11,6 @@ use crate::{ }; use alloy::{ consensus::BlockHeader, - eips::BlockNumberOrTag, network::{BlockResponse, Network}, primitives::BlockNumber, }; @@ -385,81 +384,6 @@ struct LiveStreamingState { previous_batch_end: Option, } -#[must_use] -#[cfg_attr( - feature = "tracing", - tracing::instrument(level = "trace", skip(sender, provider, reorg_handler)) -)] -pub(crate) async fn stream_historical_range>( - start: BlockNumber, - end: BlockNumber, - max_block_range: u64, - sender: &mpsc::Sender, - provider: &RobustProvider, - reorg_handler: &mut R, -) -> Option<()> { - // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized - // depth) just use zero. - // Since we use the finalized block number only to determine whether to run reorg checks - // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0` - // even on errors. Here `0` is used because it effectively just enables reorg checks. - // If there was actually a provider problem, any subsequent provider call will catch and - // properly log it and return the error to the caller. - let finalized_block_num = - provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - - // no reorg check for finalized blocks - let finalized_batch_end = finalized_block_num.min(end); - let finalized_range_count = - RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); - trace!( - start = start, - finalized_batch_end = finalized_batch_end, - batch_count = finalized_range_count, - "Streaming finalized blocks (no reorg check)" - ); - - for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { - trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range"); - if sender.try_stream(range).await.is_closed() { - return None; // channel closed - } - } - - // If start > finalized_batch_end, the loop above was empty and we should - // continue from start. Otherwise, continue from after finalized_batch_end. - let batch_start = start.max(finalized_batch_end + 1); - - // covers case when `end <= finalized` - if batch_start > end { - return Some(()); // we're done - } - - // we have non-finalized block numbers to stream, a reorg can occur - - // Possible minimal common ancestors when a reorg occurs: - // * start > finalized -> the common ancestor we care about is the block before `start`, that's - // where the stream should restart -> this is why we used `start - 1` - // * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart - // on `start + 1` - // * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only - // re-stream non-finalized blocks - let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num); - - stream_range_with_reorg_handling( - min_common_ancestor, - batch_start, - end, - max_block_range, - sender, - provider, - reorg_handler, - ) - .await?; - - Some(()) -} - /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. #[cfg_attr( feature = "tracing", diff --git a/src/block_range_scanner/historical_range_handler.rs b/src/block_range_scanner/historical_range_handler.rs new file mode 100644 index 00000000..5f1b88bf --- /dev/null +++ b/src/block_range_scanner/historical_range_handler.rs @@ -0,0 +1,224 @@ +use alloy::{ + eips::BlockNumberOrTag, + network::{BlockResponse, Network, primitives::HeaderResponse}, + primitives::{BlockHash, BlockNumber}, +}; +use tokio::sync::mpsc; + +use robust_provider::RobustProvider; + +use crate::{ + Notification, + block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator}, + types::TryStream, +}; + +pub(crate) struct HistoricalRangeHandler { + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, +} + +impl HistoricalRangeHandler { + pub fn new( + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, + ) -> Self { + Self { provider, max_block_range, start, end, sender } + } + + pub fn run(self) { + let HistoricalRangeHandler { provider, max_block_range, start, end, sender } = self; + + info!( + start_block = start, + end_block = end, + total_blocks = end.saturating_sub(start) + 1, + "Starting historical range stream" + ); + + tokio::spawn(async move { + let _ = Self::handle_stream_historical_range( + start, + end, + max_block_range, + &sender, + &provider, + ) + .await; + debug!("Historical range stream ended"); + }); + } + + #[must_use] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + pub(crate) async fn stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await + } + + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + async fn handle_stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + // Phase 1: Stream all finalized blocks without any reorg checks + let non_finalized_start = + Self::stream_finalized_blocks(provider, start, end, max_block_range, sender).await?; + + // All blocks already finalized + if non_finalized_start > end { + return Some(()); + } + + // Phase 2: Stream non-finalized blocks with reorg detection + Self::stream_non_finalized_blocks( + non_finalized_start, + end, + max_block_range, + sender, + provider, + ) + .await + } + + /// Streams finalized blocks without reorg checks. + /// Returns the starting block number for non-finalized streaming, or None if channel closed. + async fn stream_finalized_blocks( + provider: &RobustProvider, + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + ) -> Option { + // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < + // finalized depth) just use zero. Since we use the finalized block number only to + // determine whether to run reorg checks or not, this is a "low-stakes" RPC call. + let finalized_block_num = + provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); + + let finalized_batch_end = finalized_block_num.min(end); + + for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; + } + } + + // If start > finalized_batch_end, the loop above was empty and we should + // continue from start. Otherwise, continue from after finalized_batch_end. + Some(start.max(finalized_batch_end + 1)) + } + + /// Streams non-finalized blocks with reorg detection. + /// Re-streams if a reorg is detected, repeating until stable. + async fn stream_non_finalized_blocks( + non_finalized_start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + // Get the end block's hash before streaming + let mut end_block_hash = match provider.get_block_by_number(end.into()).await { + Ok(block) => block.header().hash(), + Err(e) => { + error!("Failed to get end block hash"); + _ = sender.try_stream(e).await; + return None; + } + }; + + loop { + // Stream all non-finalized ranges + for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { + if sender.try_stream(range).await.is_closed() { + return None; + } + } + + // Check for reorg - returns Some(new_hash) if reorg detected + match Self::check_reorg(end, end_block_hash, non_finalized_start, sender, provider) + .await + { + Some(new_hash) => end_block_hash = new_hash, + None => return Some(()), + } + } + } + + async fn check_reorg( + end: BlockNumber, + expected_hash: BlockHash, + non_finalized_start: BlockNumber, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option { + let current_end_block = match provider.get_block_by_number(end.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to fetch end block for reorg check"); + _ = sender.try_stream(e).await; + return None; + } + }; + + let current_hash = current_end_block.header().hash(); + if current_hash == expected_hash { + debug!(end_block_hash = %expected_hash, "Historical sync completed"); + return None; + } + + warn!( + end_block = end, + old_hash = %expected_hash, + new_hash = %current_hash, + "Reorg detected, re-streaming non-finalized blocks" + ); + + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { + return None; + } + + // Check if finalized has advanced past end + let current_finalized = + match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to get updated finalized block"); + _ = sender.try_stream(e).await; + return None; + } + }; + + if current_finalized >= end { + debug!( + finalized = current_finalized, + end = end, + "End block is now finalized, historical sync completed" + ); + return None; + } + + Some(current_hash) + } +} diff --git a/src/block_range_scanner/mod.rs b/src/block_range_scanner/mod.rs index c7bd8745..2c561f37 100644 --- a/src/block_range_scanner/mod.rs +++ b/src/block_range_scanner/mod.rs @@ -1,5 +1,6 @@ mod builder; mod common; +mod historical_range_handler; mod range_iterator; mod reorg_handler; mod rewind_handler; diff --git a/src/block_range_scanner/scanner.rs b/src/block_range_scanner/scanner.rs index e9ab0954..6d7ccd23 100644 --- a/src/block_range_scanner/scanner.rs +++ b/src/block_range_scanner/scanner.rs @@ -87,6 +87,7 @@ use crate::{ block_range_scanner::{ RingBufferCapacity, common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::DefaultReorgHandler, rewind_handler::RewindHandler, sync_handler::SyncHandler, @@ -219,7 +220,6 @@ impl BlockRangeScanner { let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let max_block_range = self.max_block_range; - let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let provider = self.provider.clone(); let (start_block, end_block) = tokio::try_join!( @@ -244,22 +244,14 @@ impl BlockRangeScanner { "Starting historical block stream" ); - tokio::spawn(async move { - let mut reorg_handler = - DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); - - _ = common::stream_historical_range( - start_block_num, - end_block_num, - max_block_range, - &blocks_sender, - &provider, - &mut reorg_handler, - ) - .await; - - debug!("Historical block stream completed"); - }); + let handler = HistoricalRangeHandler::new( + provider, + max_block_range, + start_block_num, + end_block_num, + blocks_sender, + ); + handler.run(); Ok(ReceiverStream::new(blocks_receiver)) } diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index 9e8375c3..a1bd9234 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -6,6 +6,7 @@ use crate::{ Notification, ScannerError, block_range_scanner::{ common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::{DefaultReorgHandler, ReorgHandler}, ring_buffer::RingBufferCapacity, }, @@ -114,7 +115,6 @@ impl SyncHandler { max_block_range, &sender, &provider, - &mut reorg_handler, ) .await { @@ -148,23 +148,21 @@ impl SyncHandler { /// Catches up on historical blocks until we reach the chain tip /// Returns the block number where live streaming should begin - async fn catchup_historical_blocks>( + async fn catchup_historical_blocks( mut start_block: BlockNumber, mut confirmed_tip: BlockNumber, block_confirmations: u64, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut R, ) -> Result, ScannerError> { while start_block < confirmed_tip { - if common::stream_historical_range( + if HistoricalRangeHandler::stream_historical_range( start_block, confirmed_tip, max_block_range, sender, provider, - reorg_handler, ) .await .is_none()