From dcad700a4b21fc7102f9f84faab9a52b5220b1c6 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 27 Jan 2026 15:32:52 +0100 Subject: [PATCH 1/2] feat: refactor to have historical range handler --- src/block_range_scanner/common.rs | 76 ------ .../historical_range_handler.rs | 217 ++++++++++++++++++ src/block_range_scanner/mod.rs | 1 + src/block_range_scanner/scanner.rs | 26 +-- src/block_range_scanner/sync_handler.rs | 8 +- 5 files changed, 230 insertions(+), 98 deletions(-) create mode 100644 src/block_range_scanner/historical_range_handler.rs diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 0bae1332..639b9473 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -11,7 +11,6 @@ use crate::{ }; use alloy::{ consensus::BlockHeader, - eips::BlockNumberOrTag, network::{BlockResponse, Network}, primitives::BlockNumber, }; @@ -385,81 +384,6 @@ struct LiveStreamingState { previous_batch_end: Option, } -#[must_use] -#[cfg_attr( - feature = "tracing", - tracing::instrument(level = "trace", skip(sender, provider, reorg_handler)) -)] -pub(crate) async fn stream_historical_range>( - start: BlockNumber, - end: BlockNumber, - max_block_range: u64, - sender: &mpsc::Sender, - provider: &RobustProvider, - reorg_handler: &mut R, -) -> Option<()> { - // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized - // depth) just use zero. - // Since we use the finalized block number only to determine whether to run reorg checks - // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0` - // even on errors. Here `0` is used because it effectively just enables reorg checks. - // If there was actually a provider problem, any subsequent provider call will catch and - // properly log it and return the error to the caller. - let finalized_block_num = - provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - - // no reorg check for finalized blocks - let finalized_batch_end = finalized_block_num.min(end); - let finalized_range_count = - RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); - trace!( - start = start, - finalized_batch_end = finalized_batch_end, - batch_count = finalized_range_count, - "Streaming finalized blocks (no reorg check)" - ); - - for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { - trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range"); - if sender.try_stream(range).await.is_closed() { - return None; // channel closed - } - } - - // If start > finalized_batch_end, the loop above was empty and we should - // continue from start. Otherwise, continue from after finalized_batch_end. - let batch_start = start.max(finalized_batch_end + 1); - - // covers case when `end <= finalized` - if batch_start > end { - return Some(()); // we're done - } - - // we have non-finalized block numbers to stream, a reorg can occur - - // Possible minimal common ancestors when a reorg occurs: - // * start > finalized -> the common ancestor we care about is the block before `start`, that's - // where the stream should restart -> this is why we used `start - 1` - // * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart - // on `start + 1` - // * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only - // re-stream non-finalized blocks - let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num); - - stream_range_with_reorg_handling( - min_common_ancestor, - batch_start, - end, - max_block_range, - sender, - provider, - reorg_handler, - ) - .await?; - - Some(()) -} - /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. #[cfg_attr( feature = "tracing", diff --git a/src/block_range_scanner/historical_range_handler.rs b/src/block_range_scanner/historical_range_handler.rs new file mode 100644 index 00000000..1b94f7e3 --- /dev/null +++ b/src/block_range_scanner/historical_range_handler.rs @@ -0,0 +1,217 @@ +use alloy::{ + eips::BlockNumberOrTag, + network::{BlockResponse, Network, primitives::HeaderResponse}, + primitives::BlockNumber, +}; +use tokio::sync::mpsc; + +use robust_provider::RobustProvider; + +use crate::{ + Notification, + block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator}, + types::TryStream, +}; + +pub(crate) struct HistoricalRangeHandler { + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, +} + +impl HistoricalRangeHandler { + pub fn new( + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, + ) -> Self { + Self { provider, max_block_range, start, end, sender } + } + + pub fn run(self) { + let HistoricalRangeHandler { provider, max_block_range, start, end, sender } = self; + + info!( + start_block = start, + end_block = end, + total_blocks = end.saturating_sub(start) + 1, + "Starting historical range stream" + ); + + tokio::spawn(async move { + let _ = Self::handle_stream_historical_range( + start, + end, + max_block_range, + &sender, + &provider, + ) + .await; + debug!("Historical range stream ended"); + }); + } + + #[must_use] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + pub(crate) async fn stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await + } + + #[must_use] + #[allow(clippy::too_many_lines)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + async fn handle_stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < + // finalized depth) just use zero. + // Since we use the finalized block number only to determine whether to run reorg checks + // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0` + // even on errors. Here `0` is used because it effectively just enables reorg checks. + // If there was actually a provider problem, any subsequent provider call will catch and + // properly log it and return the error to the caller. + let finalized_block_num = + provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); + + // Phase 1: Stream all finalized blocks without any reorg checks + let finalized_batch_end = finalized_block_num.min(end); + let finalized_range_count = + RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); + trace!( + start = start, + finalized_batch_end = finalized_batch_end, + batch_count = finalized_range_count, + "Streaming finalized blocks (no reorg check)" + ); + + for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; // channel closed + } + } + + // If start > finalized_batch_end, the loop above was empty and we should + // continue from start. Otherwise, continue from after finalized_batch_end. + let batch_start = start.max(finalized_batch_end + 1); + + // covers case when `end <= finalized` + if batch_start > end { + return Some(()); // we're done + } + + // Phase 2: Stream non-finalized blocks, then check for reorg only after the last range. + // If a reorg occurred, re-stream all non-finalized blocks. Repeat until stable. + let non_finalized_start = batch_start; + + // Get the end block's hash before streaming + let mut end_block_hash = match provider.get_block_by_number(end.into()).await { + Ok(block) => block.header().hash(), + Err(e) => { + error!("Failed to get end block hash"); + _ = sender.try_stream(e).await; + return None; + } + }; + + loop { + // Stream all non-finalized ranges without intermediate reorg checks + let non_finalized_range_count = + RangeIterator::forward(non_finalized_start, end, max_block_range).count(); + trace!( + non_finalized_start = non_finalized_start, + end = end, + batch_count = non_finalized_range_count, + "Streaming non-finalized blocks (deferred reorg check)" + ); + + for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming non-finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; // channel closed + } + } + + // After streaming, fetch the current canonical block and compare hashes (reorg check) + let current_end_block = match provider.get_block_by_number(end.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to fetch end block for reorg check"); + _ = sender.try_stream(e).await; + return None; + } + }; + + let current_hash = current_end_block.header().hash(); + if current_hash == end_block_hash { + // Same hash - no reorg, we're done + debug!( + end_block_hash = %end_block_hash, + "Historical sync completed, end block hash verified" + ); + return Some(()); + } + + // Different hash - reorg detected + warn!( + end_block = end, + old_hash = %end_block_hash, + new_hash = %current_hash, + "Reorg detected after streaming last range, re-streaming non-finalized blocks" + ); + + // For historic mode, using `non_finalized_start - 1` as a reasonable estimate for + // common_ancestor. + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() + { + return None; // channel closed + } + + // Update to the new canonical hash for the next iteration + end_block_hash = current_hash; + + // Check if finalized has advanced past end (all blocks now finalized) + let current_finalized = + match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to get updated finalized block"); + _ = sender.try_stream(e).await; + return None; + } + }; + + if current_finalized >= end { + debug!( + finalized = current_finalized, + end = end, + "End block is now finalized, historical sync completed" + ); + return Some(()); + } + } + } +} diff --git a/src/block_range_scanner/mod.rs b/src/block_range_scanner/mod.rs index c7bd8745..2c561f37 100644 --- a/src/block_range_scanner/mod.rs +++ b/src/block_range_scanner/mod.rs @@ -1,5 +1,6 @@ mod builder; mod common; +mod historical_range_handler; mod range_iterator; mod reorg_handler; mod rewind_handler; diff --git a/src/block_range_scanner/scanner.rs b/src/block_range_scanner/scanner.rs index e9ab0954..6d7ccd23 100644 --- a/src/block_range_scanner/scanner.rs +++ b/src/block_range_scanner/scanner.rs @@ -87,6 +87,7 @@ use crate::{ block_range_scanner::{ RingBufferCapacity, common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::DefaultReorgHandler, rewind_handler::RewindHandler, sync_handler::SyncHandler, @@ -219,7 +220,6 @@ impl BlockRangeScanner { let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let max_block_range = self.max_block_range; - let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let provider = self.provider.clone(); let (start_block, end_block) = tokio::try_join!( @@ -244,22 +244,14 @@ impl BlockRangeScanner { "Starting historical block stream" ); - tokio::spawn(async move { - let mut reorg_handler = - DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); - - _ = common::stream_historical_range( - start_block_num, - end_block_num, - max_block_range, - &blocks_sender, - &provider, - &mut reorg_handler, - ) - .await; - - debug!("Historical block stream completed"); - }); + let handler = HistoricalRangeHandler::new( + provider, + max_block_range, + start_block_num, + end_block_num, + blocks_sender, + ); + handler.run(); Ok(ReceiverStream::new(blocks_receiver)) } diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index 9e8375c3..a1bd9234 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -6,6 +6,7 @@ use crate::{ Notification, ScannerError, block_range_scanner::{ common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::{DefaultReorgHandler, ReorgHandler}, ring_buffer::RingBufferCapacity, }, @@ -114,7 +115,6 @@ impl SyncHandler { max_block_range, &sender, &provider, - &mut reorg_handler, ) .await { @@ -148,23 +148,21 @@ impl SyncHandler { /// Catches up on historical blocks until we reach the chain tip /// Returns the block number where live streaming should begin - async fn catchup_historical_blocks>( + async fn catchup_historical_blocks( mut start_block: BlockNumber, mut confirmed_tip: BlockNumber, block_confirmations: u64, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut R, ) -> Result, ScannerError> { while start_block < confirmed_tip { - if common::stream_historical_range( + if HistoricalRangeHandler::stream_historical_range( start_block, confirmed_tip, max_block_range, sender, provider, - reorg_handler, ) .await .is_none() From b0d96e3cbfd4246d26e8a3272b0f978a26d73bc3 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 27 Jan 2026 16:15:58 +0100 Subject: [PATCH 2/2] ref: better seperation of logic --- .../historical_range_handler.rs | 197 +++++++++--------- 1 file changed, 102 insertions(+), 95 deletions(-) diff --git a/src/block_range_scanner/historical_range_handler.rs b/src/block_range_scanner/historical_range_handler.rs index 1b94f7e3..5f1b88bf 100644 --- a/src/block_range_scanner/historical_range_handler.rs +++ b/src/block_range_scanner/historical_range_handler.rs @@ -1,7 +1,7 @@ use alloy::{ eips::BlockNumberOrTag, network::{BlockResponse, Network, primitives::HeaderResponse}, - primitives::BlockNumber, + primitives::{BlockHash, BlockNumber}, }; use tokio::sync::mpsc; @@ -67,8 +67,6 @@ impl HistoricalRangeHandler { Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await } - #[must_use] - #[allow(clippy::too_many_lines)] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] async fn handle_stream_historical_range( start: BlockNumber, @@ -77,26 +75,42 @@ impl HistoricalRangeHandler { sender: &mpsc::Sender, provider: &RobustProvider, ) -> Option<()> { + // Phase 1: Stream all finalized blocks without any reorg checks + let non_finalized_start = + Self::stream_finalized_blocks(provider, start, end, max_block_range, sender).await?; + + // All blocks already finalized + if non_finalized_start > end { + return Some(()); + } + + // Phase 2: Stream non-finalized blocks with reorg detection + Self::stream_non_finalized_blocks( + non_finalized_start, + end, + max_block_range, + sender, + provider, + ) + .await + } + + /// Streams finalized blocks without reorg checks. + /// Returns the starting block number for non-finalized streaming, or None if channel closed. + async fn stream_finalized_blocks( + provider: &RobustProvider, + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + ) -> Option { // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < - // finalized depth) just use zero. - // Since we use the finalized block number only to determine whether to run reorg checks - // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0` - // even on errors. Here `0` is used because it effectively just enables reorg checks. - // If there was actually a provider problem, any subsequent provider call will catch and - // properly log it and return the error to the caller. + // finalized depth) just use zero. Since we use the finalized block number only to + // determine whether to run reorg checks or not, this is a "low-stakes" RPC call. let finalized_block_num = provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - // Phase 1: Stream all finalized blocks without any reorg checks let finalized_batch_end = finalized_block_num.min(end); - let finalized_range_count = - RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); - trace!( - start = start, - finalized_batch_end = finalized_batch_end, - batch_count = finalized_range_count, - "Streaming finalized blocks (no reorg check)" - ); for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { trace!( @@ -105,23 +119,24 @@ impl HistoricalRangeHandler { "Streaming finalized range" ); if sender.try_stream(range).await.is_closed() { - return None; // channel closed + return None; } } // If start > finalized_batch_end, the loop above was empty and we should // continue from start. Otherwise, continue from after finalized_batch_end. - let batch_start = start.max(finalized_batch_end + 1); - - // covers case when `end <= finalized` - if batch_start > end { - return Some(()); // we're done - } - - // Phase 2: Stream non-finalized blocks, then check for reorg only after the last range. - // If a reorg occurred, re-stream all non-finalized blocks. Repeat until stable. - let non_finalized_start = batch_start; + Some(start.max(finalized_batch_end + 1)) + } + /// Streams non-finalized blocks with reorg detection. + /// Re-streams if a reorg is detected, repeating until stable. + async fn stream_non_finalized_blocks( + non_finalized_start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { // Get the end block's hash before streaming let mut end_block_hash = match provider.get_block_by_number(end.into()).await { Ok(block) => block.header().hash(), @@ -133,85 +148,77 @@ impl HistoricalRangeHandler { }; loop { - // Stream all non-finalized ranges without intermediate reorg checks - let non_finalized_range_count = - RangeIterator::forward(non_finalized_start, end, max_block_range).count(); - trace!( - non_finalized_start = non_finalized_start, - end = end, - batch_count = non_finalized_range_count, - "Streaming non-finalized blocks (deferred reorg check)" - ); - + // Stream all non-finalized ranges for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { - trace!( - range_start = *range.start(), - range_end = *range.end(), - "Streaming non-finalized range" - ); if sender.try_stream(range).await.is_closed() { - return None; // channel closed + return None; } } - // After streaming, fetch the current canonical block and compare hashes (reorg check) - let current_end_block = match provider.get_block_by_number(end.into()).await { + // Check for reorg - returns Some(new_hash) if reorg detected + match Self::check_reorg(end, end_block_hash, non_finalized_start, sender, provider) + .await + { + Some(new_hash) => end_block_hash = new_hash, + None => return Some(()), + } + } + } + + async fn check_reorg( + end: BlockNumber, + expected_hash: BlockHash, + non_finalized_start: BlockNumber, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option { + let current_end_block = match provider.get_block_by_number(end.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to fetch end block for reorg check"); + _ = sender.try_stream(e).await; + return None; + } + }; + + let current_hash = current_end_block.header().hash(); + if current_hash == expected_hash { + debug!(end_block_hash = %expected_hash, "Historical sync completed"); + return None; + } + + warn!( + end_block = end, + old_hash = %expected_hash, + new_hash = %current_hash, + "Reorg detected, re-streaming non-finalized blocks" + ); + + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { + return None; + } + + // Check if finalized has advanced past end + let current_finalized = + match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { Ok(block) => block, Err(e) => { - error!("Failed to fetch end block for reorg check"); + error!("Failed to get updated finalized block"); _ = sender.try_stream(e).await; return None; } }; - let current_hash = current_end_block.header().hash(); - if current_hash == end_block_hash { - // Same hash - no reorg, we're done - debug!( - end_block_hash = %end_block_hash, - "Historical sync completed, end block hash verified" - ); - return Some(()); - } - - // Different hash - reorg detected - warn!( - end_block = end, - old_hash = %end_block_hash, - new_hash = %current_hash, - "Reorg detected after streaming last range, re-streaming non-finalized blocks" + if current_finalized >= end { + debug!( + finalized = current_finalized, + end = end, + "End block is now finalized, historical sync completed" ); - - // For historic mode, using `non_finalized_start - 1` as a reasonable estimate for - // common_ancestor. - let common_ancestor = non_finalized_start.saturating_sub(1); - if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() - { - return None; // channel closed - } - - // Update to the new canonical hash for the next iteration - end_block_hash = current_hash; - - // Check if finalized has advanced past end (all blocks now finalized) - let current_finalized = - match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { - Ok(block) => block, - Err(e) => { - error!("Failed to get updated finalized block"); - _ = sender.try_stream(e).await; - return None; - } - }; - - if current_finalized >= end { - debug!( - finalized = current_finalized, - end = end, - "End block is now finalized, historical sync completed" - ); - return Some(()); - } + return None; } + + Some(current_hash) } }