From 2424da5868a4e2b7311aff77a3265890c07494de Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Sun, 7 Dec 2025 10:44:54 +0530 Subject: [PATCH 1/7] perf(historic): Optimized reorg handling T_EDITOR=true git rebase --continue 2>&1 o "" | git rebase --continue t status TOR=":" git rebase --continue --- src/block_range_scanner/common.rs | 134 +++++++++++++++++++----- src/block_range_scanner/scanner.rs | 5 - src/block_range_scanner/sync_handler.rs | 3 - 3 files changed, 107 insertions(+), 35 deletions(-) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 8ce36f61..494c3cb4 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -12,7 +12,7 @@ use crate::{ use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, - network::{BlockResponse, Network}, + network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::BlockNumber, }; @@ -385,17 +385,13 @@ struct LiveStreamingState { } #[must_use] -#[cfg_attr( - feature = "tracing", - tracing::instrument(level = "trace", skip(sender, provider, reorg_handler)) -)] +#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] pub(crate) async fn stream_historical_range( start: BlockNumber, end: BlockNumber, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, ) -> Option<()> { // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized // depth) just use zero. @@ -407,7 +403,7 @@ pub(crate) async fn stream_historical_range( let finalized_block_num = provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - // no reorg check for finalized blocks + // Phase 1: Stream all finalized blocks without any reorg checks (using RangeIterator) let finalized_batch_end = finalized_block_num.min(end); let finalized_range_count = RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); @@ -434,29 +430,113 @@ pub(crate) async fn stream_historical_range( return Some(()); // we're done } - // we have non-finalized block numbers to stream, a reorg can occur + // Phase 2: Stream non-finalized blocks, then check for reorg only after the last range. + // If a reorg occurred, re-stream all non-finalized blocks. Repeat until stable. + // This is an optimization: we defer reorg checks to the end instead of checking per-batch. + let non_finalized_start = batch_start; + + // Get the end block's hash BEFORE streaming (critical for reorg detection) + // Block numbers persist through reorgs, only hashes change + let mut end_block_hash = match provider.get_block_by_number(end.into()).await { + Ok(block) => block.header().hash(), + Err(e) => { + error!("Failed to get end block hash"); + _ = sender.try_stream(e).await; + return None; + } + }; - // Possible minimal common ancestors when a reorg occurs: - // * start > finalized -> the common ancestor we care about is the block before `start`, that's - // where the stream should restart -> this is why we used `start - 1` - // * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart - // on `start + 1` - // * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only - // re-stream non-finalized blocks - let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num); + loop { + // Stream all non-finalized ranges without intermediate reorg checks (using RangeIterator) + let non_finalized_range_count = + RangeIterator::forward(non_finalized_start, end, max_block_range).count(); + trace!( + non_finalized_start = non_finalized_start, + end = end, + batch_count = non_finalized_range_count, + "Streaming non-finalized blocks (deferred reorg check)" + ); - stream_range_with_reorg_handling( - min_common_ancestor, - batch_start, - end, - max_block_range, - sender, - provider, - reorg_handler, - ) - .await?; + for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming non-finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; // channel closed + } + } + + // After streaming the last range, check if end block HASH still exists (reorg check) + // Using hash instead of number because block numbers persist through reorgs + match provider.get_block_by_hash(end_block_hash).await { + Ok(_) => { + // Same hash still exists, no reorg - we're done + debug!( + end_block_hash = %end_block_hash, + "Historical sync completed, end block hash verified" + ); + return Some(()); + } + Err(robust_provider::Error::BlockNotFound) => { + // Reorg detected: the block with our recorded hash no longer exists + warn!( + end_block = end, + end_hash = %end_block_hash, + "Reorg detected after streaming last range, re-streaming non-finalized blocks" + ); + + // Note: For historic mode, we don't have a precise common_ancestor since we + // deferred reorg checks. We use `non_finalized_start - 1` as a reasonable estimate. + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender + .try_stream(Notification::ReorgDetected { common_ancestor }) + .await + .is_closed() + { + return None; // channel closed + } - Some(()) + // Get the new end block hash for the next iteration + end_block_hash = match provider.get_block_by_number(end.into()).await { + Ok(block) => block.header().hash(), + Err(e) => { + error!("Failed to get new end block hash after reorg"); + _ = sender.try_stream(e).await; + return None; + } + }; + + // Re-stream all non-finalized blocks in the next iteration + } + Err(e) => { + error!("Failed to verify end block hash"); + _ = sender.try_stream(e).await; + return None; + } + } + + // Check if finalized has advanced past end (all blocks now finalized) + let current_finalized = + match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to get updated finalized block"); + _ = sender.try_stream(e).await; + return None; + } + }; + + if current_finalized >= end { + debug!( + finalized = current_finalized, + end = end, + "End block is now finalized, historical sync completed" + ); + return Some(()); + } + } } /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. diff --git a/src/block_range_scanner/scanner.rs b/src/block_range_scanner/scanner.rs index ecd6d333..67dd8691 100644 --- a/src/block_range_scanner/scanner.rs +++ b/src/block_range_scanner/scanner.rs @@ -219,7 +219,6 @@ impl BlockRangeScanner { let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let max_block_range = self.max_block_range; - let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let provider = self.provider.clone(); let (start_block, end_block) = tokio::try_join!( @@ -245,16 +244,12 @@ impl BlockRangeScanner { ); tokio::spawn(async move { - let mut reorg_handler = - ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); - _ = common::stream_historical_range( start_block_num, end_block_num, max_block_range, &blocks_sender, &provider, - &mut reorg_handler, ) .await; diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index 8f3e5839..e9590be2 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -113,7 +113,6 @@ impl SyncHandler { max_block_range, &sender, &provider, - &mut reorg_handler, ) .await { @@ -154,7 +153,6 @@ impl SyncHandler { max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, ) -> Result, ScannerError> { while start_block < confirmed_tip { if common::stream_historical_range( @@ -163,7 +161,6 @@ impl SyncHandler { max_block_range, sender, provider, - reorg_handler, ) .await .is_none() From d522302950eeb3ba56ef6f0ec3ae0ffddc969c9a Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Wed, 21 Jan 2026 12:28:43 +0530 Subject: [PATCH 2/7] fix: cargo clippy --- src/block_range_scanner/common.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 494c3cb4..a2e144cf 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -385,6 +385,7 @@ struct LiveStreamingState { } #[must_use] +#[allow(clippy::too_many_lines)] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] pub(crate) async fn stream_historical_range( start: BlockNumber, From 50fb9957e8009aa4f00b475b10218a1a571383d4 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Wed, 21 Jan 2026 13:13:16 +0530 Subject: [PATCH 3/7] refactor: simplify reorg check by comparing hashes directly --- src/block_range_scanner/common.rs | 83 +++++++++++++------------------ 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index a2e144cf..2ad79d05 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -404,7 +404,7 @@ pub(crate) async fn stream_historical_range( let finalized_block_num = provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - // Phase 1: Stream all finalized blocks without any reorg checks (using RangeIterator) + // Phase 1: Stream all finalized blocks without any reorg checks let finalized_batch_end = finalized_block_num.min(end); let finalized_range_count = RangeIterator::forward(start, finalized_batch_end, max_block_range).count(); @@ -433,11 +433,9 @@ pub(crate) async fn stream_historical_range( // Phase 2: Stream non-finalized blocks, then check for reorg only after the last range. // If a reorg occurred, re-stream all non-finalized blocks. Repeat until stable. - // This is an optimization: we defer reorg checks to the end instead of checking per-batch. let non_finalized_start = batch_start; - // Get the end block's hash BEFORE streaming (critical for reorg detection) - // Block numbers persist through reorgs, only hashes change + // Get the end block's hash before streaming let mut end_block_hash = match provider.get_block_by_number(end.into()).await { Ok(block) => block.header().hash(), Err(e) => { @@ -448,7 +446,7 @@ pub(crate) async fn stream_historical_range( }; loop { - // Stream all non-finalized ranges without intermediate reorg checks (using RangeIterator) + // Stream all non-finalized ranges without intermediate reorg checks let non_finalized_range_count = RangeIterator::forward(non_finalized_start, end, max_block_range).count(); trace!( @@ -469,55 +467,44 @@ pub(crate) async fn stream_historical_range( } } - // After streaming the last range, check if end block HASH still exists (reorg check) - // Using hash instead of number because block numbers persist through reorgs - match provider.get_block_by_hash(end_block_hash).await { - Ok(_) => { - // Same hash still exists, no reorg - we're done - debug!( - end_block_hash = %end_block_hash, - "Historical sync completed, end block hash verified" - ); - return Some(()); - } - Err(robust_provider::Error::BlockNotFound) => { - // Reorg detected: the block with our recorded hash no longer exists - warn!( - end_block = end, - end_hash = %end_block_hash, - "Reorg detected after streaming last range, re-streaming non-finalized blocks" - ); - - // Note: For historic mode, we don't have a precise common_ancestor since we - // deferred reorg checks. We use `non_finalized_start - 1` as a reasonable estimate. - let common_ancestor = non_finalized_start.saturating_sub(1); - if sender - .try_stream(Notification::ReorgDetected { common_ancestor }) - .await - .is_closed() - { - return None; // channel closed - } - - // Get the new end block hash for the next iteration - end_block_hash = match provider.get_block_by_number(end.into()).await { - Ok(block) => block.header().hash(), - Err(e) => { - error!("Failed to get new end block hash after reorg"); - _ = sender.try_stream(e).await; - return None; - } - }; - - // Re-stream all non-finalized blocks in the next iteration - } + // After streaming, fetch the current canonical block and compare hashes (reorg check) + let current_end_block = match provider.get_block_by_number(end.into()).await { + Ok(block) => block, Err(e) => { - error!("Failed to verify end block hash"); + error!("Failed to fetch end block for reorg check"); _ = sender.try_stream(e).await; return None; } + }; + + let current_hash = current_end_block.header().hash(); + if current_hash == end_block_hash { + // Same hash - no reorg, we're done + debug!( + end_block_hash = %end_block_hash, + "Historical sync completed, end block hash verified" + ); + return Some(()); + } + + // Different hash - reorg detected + warn!( + end_block = end, + old_hash = %end_block_hash, + new_hash = %current_hash, + "Reorg detected after streaming last range, re-streaming non-finalized blocks" + ); + + // For historic mode, using `non_finalized_start - 1` as a reasonable estimate for + // common_ancestor. + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { + return None; // channel closed } + // Update to the new canonical hash for the next iteration + end_block_hash = current_hash; + // Check if finalized has advanced past end (all blocks now finalized) let current_finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { From cbe08c250d96c64ac882d853da58bf2fc9823f5e Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Thu, 29 Jan 2026 02:47:05 +0530 Subject: [PATCH 4/7] fix: remove unused R bound from catchup_historical_blocks --- src/block_range_scanner/sync_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index 77168f51..f5f7116f 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -147,7 +147,7 @@ impl SyncHandler { /// Catches up on historical blocks until we reach the chain tip /// Returns the block number where live streaming should begin - async fn catchup_historical_blocks>( + async fn catchup_historical_blocks( mut start_block: BlockNumber, mut confirmed_tip: BlockNumber, block_confirmations: u64, From 35b2e50e47cf6289baa0b7913c165b42044d6ba9 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Thu, 29 Jan 2026 02:53:35 +0530 Subject: [PATCH 5/7] refactor: remove redundant trace logging for non-finalized range count --- src/block_range_scanner/common.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index de68f18e..76b972a7 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -448,15 +448,6 @@ pub(crate) async fn stream_historical_range( loop { // Stream all non-finalized ranges without intermediate reorg checks - let non_finalized_range_count = - RangeIterator::forward(non_finalized_start, end, max_block_range).count(); - trace!( - non_finalized_start = non_finalized_start, - end = end, - batch_count = non_finalized_range_count, - "Streaming non-finalized blocks (deferred reorg check)" - ); - for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { trace!( range_start = *range.start(), From 6973c032589401557c144f0596d5405350fd2d72 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Thu, 29 Jan 2026 17:27:01 +0530 Subject: [PATCH 6/7] refactor: extract HistoricalRangeHandler from common.rs --- src/block_range_scanner/common.rs | 135 +---------- .../historical_range_handler.rs | 225 ++++++++++++++++++ src/block_range_scanner/mod.rs | 1 + src/block_range_scanner/scanner.rs | 21 +- src/block_range_scanner/sync_handler.rs | 3 +- 5 files changed, 238 insertions(+), 147 deletions(-) create mode 100644 src/block_range_scanner/historical_range_handler.rs diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index f146ee8d..639b9473 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -11,8 +11,7 @@ use crate::{ }; use alloy::{ consensus::BlockHeader, - eips::BlockNumberOrTag, - network::{BlockResponse, Network, primitives::HeaderResponse}, + network::{BlockResponse, Network}, primitives::BlockNumber, }; @@ -385,138 +384,6 @@ struct LiveStreamingState { previous_batch_end: Option, } -#[must_use] -#[allow(clippy::too_many_lines)] -#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] -pub(crate) async fn stream_historical_range( - start: BlockNumber, - end: BlockNumber, - max_block_range: u64, - sender: &mpsc::Sender, - provider: &RobustProvider, -) -> Option<()> { - // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized - // depth) just use zero. - // Since we use the finalized block number only to determine whether to run reorg checks - // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0` - // even on errors. Here `0` is used because it effectively just enables reorg checks. - // If there was actually a provider problem, any subsequent provider call will catch and - // properly log it and return the error to the caller. - let finalized_block_num = - provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); - - // Phase 1: Stream all finalized blocks without any reorg checks - let finalized_batch_end = finalized_block_num.min(end); - trace!( - start = start, - finalized_batch_end = finalized_batch_end, - batch_count = RangeIterator::forward(start, finalized_batch_end, max_block_range).count(), - "Streaming finalized blocks (no reorg check)" - ); - - for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { - trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range"); - if sender.try_stream(range).await.is_closed() { - return None; // channel closed - } - } - - // If start > finalized_batch_end, the loop above was empty and we should - // continue from start. Otherwise, continue from after finalized_batch_end. - let batch_start = start.max(finalized_batch_end + 1); - - // covers case when `end <= finalized` - if batch_start > end { - return Some(()); // we're done - } - - // Phase 2: Stream non-finalized blocks, then check for reorg only after the last range. - // If a reorg occurred, re-stream all non-finalized blocks. Repeat until stable. - let non_finalized_start = batch_start; - - // Get the end block's hash before streaming - let mut end_block_hash = match provider.get_block_by_number(end.into()).await { - Ok(block) => block.header().hash(), - Err(e) => { - error!("Failed to get end block hash"); - _ = sender.try_stream(e).await; - return None; - } - }; - - loop { - // Stream all non-finalized ranges without intermediate reorg checks - for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { - trace!( - range_start = *range.start(), - range_end = *range.end(), - "Streaming non-finalized range" - ); - if sender.try_stream(range).await.is_closed() { - return None; // channel closed - } - } - - // After streaming, fetch the current canonical block and compare hashes (reorg check) - let current_end_block = match provider.get_block_by_number(end.into()).await { - Ok(block) => block, - Err(e) => { - error!("Failed to fetch end block for reorg check"); - _ = sender.try_stream(e).await; - return None; - } - }; - - let current_hash = current_end_block.header().hash(); - if current_hash == end_block_hash { - // Same hash - no reorg, we're done - debug!( - end_block_hash = %end_block_hash, - "Historical sync completed, end block hash verified" - ); - return Some(()); - } - - // Different hash - reorg detected - warn!( - end_block = end, - old_hash = %end_block_hash, - new_hash = %current_hash, - "Reorg detected after streaming last range, re-streaming non-finalized blocks" - ); - - // For historic mode, using `non_finalized_start - 1` as a reasonable estimate for - // common_ancestor. - let common_ancestor = non_finalized_start.saturating_sub(1); - if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { - return None; // channel closed - } - - // Update to the new canonical hash for the next iteration - end_block_hash = current_hash; - - // Check if finalized has advanced past end (all blocks now finalized) - let current_finalized = - match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { - Ok(block) => block, - Err(e) => { - error!("Failed to get updated finalized block"); - _ = sender.try_stream(e).await; - return None; - } - }; - - if current_finalized >= end { - debug!( - finalized = current_finalized, - end = end, - "End block is now finalized, historical sync completed" - ); - return Some(()); - } - } -} - /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. #[cfg_attr( feature = "tracing", diff --git a/src/block_range_scanner/historical_range_handler.rs b/src/block_range_scanner/historical_range_handler.rs new file mode 100644 index 00000000..5b38c164 --- /dev/null +++ b/src/block_range_scanner/historical_range_handler.rs @@ -0,0 +1,225 @@ +use alloy::{ + eips::BlockNumberOrTag, + network::{BlockResponse, Network, primitives::HeaderResponse}, + primitives::{BlockHash, BlockNumber}, +}; +use robust_provider::RobustProvider; +use tokio::sync::mpsc; + +use crate::{ + Notification, + block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator}, + types::TryStream, +}; + +pub(crate) struct HistoricalRangeHandler { + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, +} + +impl HistoricalRangeHandler { + pub fn new( + provider: RobustProvider, + max_block_range: u64, + start: BlockNumber, + end: BlockNumber, + sender: mpsc::Sender, + ) -> Self { + Self { provider, max_block_range, start, end, sender } + } + + pub fn run(self) { + let HistoricalRangeHandler { provider, max_block_range, start, end, sender } = self; + + tokio::spawn(async move { + let _ = Self::handle_stream_historical_range( + start, + end, + max_block_range, + &sender, + &provider, + ) + .await; + debug!("Historical range stream ended"); + }); + } + + /// Public method for use by `sync_handler` during catchup phase. + #[must_use] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + pub async fn stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await + } + + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] + async fn handle_stream_historical_range( + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + // Phase 1: Stream all finalized blocks without any reorg checks + let non_finalized_start = + Self::stream_finalized_blocks(provider, start, end, max_block_range, sender).await?; + + // All blocks already finalized + if non_finalized_start > end { + return Some(()); + } + + // Phase 2: Stream non-finalized blocks with reorg detection + Self::stream_non_finalized_blocks( + non_finalized_start, + end, + max_block_range, + sender, + provider, + ) + .await + } + + /// Streams finalized blocks without reorg checks. + /// Returns the starting block number for non-finalized streaming, or None if channel closed. + async fn stream_finalized_blocks( + provider: &RobustProvider, + start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + ) -> Option { + // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < + // finalized depth) just use zero. Since we use the finalized block number only to + // determine whether to run reorg checks or not, this is a "low-stakes" RPC call. + let finalized_block_num = + provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); + + let finalized_batch_end = finalized_block_num.min(end); + + for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; + } + } + + // If start > finalized_batch_end, the loop above was empty and we should + // continue from start. Otherwise, continue from after finalized_batch_end. + Some(start.max(finalized_batch_end + 1)) + } + + /// Streams non-finalized blocks with reorg detection. + /// Re-streams if a reorg is detected, repeating until stable. + async fn stream_non_finalized_blocks( + non_finalized_start: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option<()> { + // Get the end block's hash before streaming + let mut end_block_hash = match provider.get_block_by_number(end.into()).await { + Ok(block) => block.header().hash(), + Err(e) => { + error!("Failed to get end block hash"); + _ = sender.try_stream(e).await; + return None; + } + }; + + loop { + // Stream all non-finalized ranges + for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { + trace!( + range_start = *range.start(), + range_end = *range.end(), + "Streaming non-finalized range" + ); + if sender.try_stream(range).await.is_closed() { + return None; + } + } + + // Check for reorg - returns Some(new_hash) if reorg detected + match Self::check_reorg(end, end_block_hash, non_finalized_start, sender, provider) + .await + { + Some(new_hash) => end_block_hash = new_hash, + None => return Some(()), + } + } + } + + /// Checks if a reorg occurred by comparing block hashes. + /// Returns `Some(new_hash)` if a reorg was detected (caller should re-stream), + /// or `None` if no reorg (caller can finish) or if channel closed / error occurred. + async fn check_reorg( + end: BlockNumber, + expected_hash: BlockHash, + non_finalized_start: BlockNumber, + sender: &mpsc::Sender, + provider: &RobustProvider, + ) -> Option { + let current_end_block = match provider.get_block_by_number(end.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to fetch end block for reorg check"); + _ = sender.try_stream(e).await; + return None; + } + }; + + let current_hash = current_end_block.header().hash(); + if current_hash == expected_hash { + debug!(end_block_hash = %expected_hash, "Historical sync completed"); + return None; + } + + warn!( + end_block = end, + old_hash = %expected_hash, + new_hash = %current_hash, + "Reorg detected, re-streaming non-finalized blocks" + ); + + let common_ancestor = non_finalized_start.saturating_sub(1); + if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { + return None; + } + + // Check if finalized has advanced past end + let current_finalized = + match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { + Ok(block) => block, + Err(e) => { + error!("Failed to get updated finalized block"); + _ = sender.try_stream(e).await; + return None; + } + }; + + if current_finalized >= end { + debug!( + finalized = current_finalized, + end = end, + "End block is now finalized, historical sync completed" + ); + return None; + } + + Some(current_hash) + } +} diff --git a/src/block_range_scanner/mod.rs b/src/block_range_scanner/mod.rs index c7bd8745..2c561f37 100644 --- a/src/block_range_scanner/mod.rs +++ b/src/block_range_scanner/mod.rs @@ -1,5 +1,6 @@ mod builder; mod common; +mod historical_range_handler; mod range_iterator; mod reorg_handler; mod rewind_handler; diff --git a/src/block_range_scanner/scanner.rs b/src/block_range_scanner/scanner.rs index 9cf21139..125861e4 100644 --- a/src/block_range_scanner/scanner.rs +++ b/src/block_range_scanner/scanner.rs @@ -87,6 +87,7 @@ use crate::{ block_range_scanner::{ RingBufferCapacity, common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::DefaultReorgHandler, rewind_handler::RewindHandler, sync_handler::SyncHandler, @@ -242,18 +243,14 @@ impl BlockRangeScanner { "Starting historical block stream" ); - tokio::spawn(async move { - _ = common::stream_historical_range( - start_block_num, - end_block_num, - max_block_range, - &blocks_sender, - &provider, - ) - .await; - - debug!("Historical block stream completed"); - }); + let handler = HistoricalRangeHandler::new( + provider, + max_block_range, + start_block_num, + end_block_num, + blocks_sender, + ); + handler.run(); Ok(ReceiverStream::new(blocks_receiver)) } diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index f5f7116f..a1bd9234 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -6,6 +6,7 @@ use crate::{ Notification, ScannerError, block_range_scanner::{ common::{self, BlockScannerResult}, + historical_range_handler::HistoricalRangeHandler, reorg_handler::{DefaultReorgHandler, ReorgHandler}, ring_buffer::RingBufferCapacity, }, @@ -156,7 +157,7 @@ impl SyncHandler { provider: &RobustProvider, ) -> Result, ScannerError> { while start_block < confirmed_tip { - if common::stream_historical_range( + if HistoricalRangeHandler::stream_historical_range( start_block, confirmed_tip, max_block_range, From dc528c9859cce1dfa1e8fed54dc03454c846af55 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Sun, 1 Feb 2026 18:41:13 +0530 Subject: [PATCH 7/7] refactor: remove redundant finalized check in check_reorg() --- .../historical_range_handler.rs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/src/block_range_scanner/historical_range_handler.rs b/src/block_range_scanner/historical_range_handler.rs index 5b38c164..35f1b6c4 100644 --- a/src/block_range_scanner/historical_range_handler.rs +++ b/src/block_range_scanner/historical_range_handler.rs @@ -200,26 +200,6 @@ impl HistoricalRangeHandler { return None; } - // Check if finalized has advanced past end - let current_finalized = - match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await { - Ok(block) => block, - Err(e) => { - error!("Failed to get updated finalized block"); - _ = sender.try_stream(e).await; - return None; - } - }; - - if current_finalized >= end { - debug!( - finalized = current_finalized, - end = end, - "End block is now finalized, historical sync completed" - ); - return None; - } - Some(current_hash) } }