-
Notifications
You must be signed in to change notification settings - Fork 4
enhancement: optimized reorg handling by deferring checks to end of stream (historic) #234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2424da5
d522302
50fb995
ebb2634
7d6a1c1
1f9e88f
802a3bd
cbe08c2
35b2e50
3c9d11e
6973c03
dc528c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,205 @@ | ||||||||||||||||||||||||||||||
| use alloy::{ | ||||||||||||||||||||||||||||||
| eips::BlockNumberOrTag, | ||||||||||||||||||||||||||||||
| network::{BlockResponse, Network, primitives::HeaderResponse}, | ||||||||||||||||||||||||||||||
| primitives::{BlockHash, BlockNumber}, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| use robust_provider::RobustProvider; | ||||||||||||||||||||||||||||||
| use tokio::sync::mpsc; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| use crate::{ | ||||||||||||||||||||||||||||||
| Notification, | ||||||||||||||||||||||||||||||
| block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator}, | ||||||||||||||||||||||||||||||
| types::TryStream, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| pub(crate) struct HistoricalRangeHandler<N: Network> { | ||||||||||||||||||||||||||||||
| provider: RobustProvider<N>, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| sender: mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| impl<N: Network> HistoricalRangeHandler<N> { | ||||||||||||||||||||||||||||||
| pub fn new( | ||||||||||||||||||||||||||||||
| provider: RobustProvider<N>, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| sender: mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| ) -> Self { | ||||||||||||||||||||||||||||||
| Self { provider, max_block_range, start, end, sender } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| pub fn run(self) { | ||||||||||||||||||||||||||||||
| let HistoricalRangeHandler { provider, max_block_range, start, end, sender } = self; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||||||||||||||
| let _ = Self::handle_stream_historical_range( | ||||||||||||||||||||||||||||||
| start, | ||||||||||||||||||||||||||||||
| end, | ||||||||||||||||||||||||||||||
| max_block_range, | ||||||||||||||||||||||||||||||
| &sender, | ||||||||||||||||||||||||||||||
| &provider, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| .await; | ||||||||||||||||||||||||||||||
| debug!("Historical range stream ended"); | ||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Public method for use by `sync_handler` during catchup phase. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] | ||||||||||||||||||||||||||||||
| pub async fn stream_historical_range( | ||||||||||||||||||||||||||||||
| start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| sender: &mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| provider: &RobustProvider<N>, | ||||||||||||||||||||||||||||||
| ) -> Option<()> { | ||||||||||||||||||||||||||||||
|
Comment on lines
+50
to
+59
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||
| Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))] | ||||||||||||||||||||||||||||||
| async fn handle_stream_historical_range( | ||||||||||||||||||||||||||||||
| start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| sender: &mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| provider: &RobustProvider<N>, | ||||||||||||||||||||||||||||||
| ) -> Option<()> { | ||||||||||||||||||||||||||||||
| // Phase 1: Stream all finalized blocks without any reorg checks | ||||||||||||||||||||||||||||||
| let non_finalized_start = | ||||||||||||||||||||||||||||||
| Self::stream_finalized_blocks(provider, start, end, max_block_range, sender).await?; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // All blocks already finalized | ||||||||||||||||||||||||||||||
| if non_finalized_start > end { | ||||||||||||||||||||||||||||||
| return Some(()); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Phase 2: Stream non-finalized blocks with reorg detection | ||||||||||||||||||||||||||||||
| Self::stream_non_finalized_blocks( | ||||||||||||||||||||||||||||||
| non_finalized_start, | ||||||||||||||||||||||||||||||
| end, | ||||||||||||||||||||||||||||||
| max_block_range, | ||||||||||||||||||||||||||||||
| sender, | ||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| .await | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Streams finalized blocks without reorg checks. | ||||||||||||||||||||||||||||||
| /// Returns the starting block number for non-finalized streaming, or None if channel closed. | ||||||||||||||||||||||||||||||
| async fn stream_finalized_blocks( | ||||||||||||||||||||||||||||||
| provider: &RobustProvider<N>, | ||||||||||||||||||||||||||||||
| start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| sender: &mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| ) -> Option<BlockNumber> { | ||||||||||||||||||||||||||||||
| // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < | ||||||||||||||||||||||||||||||
| // finalized depth) just use zero. Since we use the finalized block number only to | ||||||||||||||||||||||||||||||
| // determine whether to run reorg checks or not, this is a "low-stakes" RPC call. | ||||||||||||||||||||||||||||||
| let finalized_block_num = | ||||||||||||||||||||||||||||||
| provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| let finalized_batch_end = finalized_block_num.min(end); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { | ||||||||||||||||||||||||||||||
| trace!( | ||||||||||||||||||||||||||||||
| range_start = *range.start(), | ||||||||||||||||||||||||||||||
| range_end = *range.end(), | ||||||||||||||||||||||||||||||
| "Streaming finalized range" | ||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||
| if sender.try_stream(range).await.is_closed() { | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // If start > finalized_batch_end, the loop above was empty and we should | ||||||||||||||||||||||||||||||
| // continue from start. Otherwise, continue from after finalized_batch_end. | ||||||||||||||||||||||||||||||
| Some(start.max(finalized_batch_end + 1)) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Streams non-finalized blocks with reorg detection. | ||||||||||||||||||||||||||||||
| /// Re-streams if a reorg is detected, repeating until stable. | ||||||||||||||||||||||||||||||
| async fn stream_non_finalized_blocks( | ||||||||||||||||||||||||||||||
| non_finalized_start: BlockNumber, | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| max_block_range: u64, | ||||||||||||||||||||||||||||||
| sender: &mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| provider: &RobustProvider<N>, | ||||||||||||||||||||||||||||||
| ) -> Option<()> { | ||||||||||||||||||||||||||||||
| // Get the end block's hash before streaming | ||||||||||||||||||||||||||||||
| let mut end_block_hash = match provider.get_block_by_number(end.into()).await { | ||||||||||||||||||||||||||||||
| Ok(block) => block.header().hash(), | ||||||||||||||||||||||||||||||
| Err(e) => { | ||||||||||||||||||||||||||||||
| error!("Failed to get end block hash"); | ||||||||||||||||||||||||||||||
| _ = sender.try_stream(e).await; | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| loop { | ||||||||||||||||||||||||||||||
| // Stream all non-finalized ranges | ||||||||||||||||||||||||||||||
| for range in RangeIterator::forward(non_finalized_start, end, max_block_range) { | ||||||||||||||||||||||||||||||
|
Comment on lines
+143
to
+145
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This loop could be rewritten to use existing logic to something like: let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num);
let mut iter = RangeIterator::forward(non_finalized_start, end, max_block_range);
for range in iter {
// streaming...
// reorg check...
if let Some(common_ancestor) = check_reorg_result {
let reset_to = (common_ancestor + 1).max(min_common_ancestor);
iter.reset_to(reset_to)
}
}This reuses some of the old tried-and-true logic. |
||||||||||||||||||||||||||||||
| trace!( | ||||||||||||||||||||||||||||||
| range_start = *range.start(), | ||||||||||||||||||||||||||||||
| range_end = *range.end(), | ||||||||||||||||||||||||||||||
| "Streaming non-finalized range" | ||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||
| if sender.try_stream(range).await.is_closed() { | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Check for reorg - returns Some(new_hash) if reorg detected | ||||||||||||||||||||||||||||||
| match Self::check_reorg(end, end_block_hash, non_finalized_start, sender, provider) | ||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't |
||||||||||||||||||||||||||||||
| .await | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| Some(new_hash) => end_block_hash = new_hash, | ||||||||||||||||||||||||||||||
| None => return Some(()), | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Checks if a reorg occurred by comparing block hashes. | ||||||||||||||||||||||||||||||
| /// Returns `Some(new_hash)` if a reorg was detected (caller should re-stream), | ||||||||||||||||||||||||||||||
| /// or `None` if no reorg (caller can finish) or if channel closed / error occurred. | ||||||||||||||||||||||||||||||
| async fn check_reorg( | ||||||||||||||||||||||||||||||
| end: BlockNumber, | ||||||||||||||||||||||||||||||
| expected_hash: BlockHash, | ||||||||||||||||||||||||||||||
| non_finalized_start: BlockNumber, | ||||||||||||||||||||||||||||||
| sender: &mpsc::Sender<BlockScannerResult>, | ||||||||||||||||||||||||||||||
| provider: &RobustProvider<N>, | ||||||||||||||||||||||||||||||
| ) -> Option<BlockHash> { | ||||||||||||||||||||||||||||||
| let current_end_block = match provider.get_block_by_number(end.into()).await { | ||||||||||||||||||||||||||||||
| Ok(block) => block, | ||||||||||||||||||||||||||||||
| Err(e) => { | ||||||||||||||||||||||||||||||
| error!("Failed to fetch end block for reorg check"); | ||||||||||||||||||||||||||||||
| _ = sender.try_stream(e).await; | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| let current_hash = current_end_block.header().hash(); | ||||||||||||||||||||||||||||||
| if current_hash == expected_hash { | ||||||||||||||||||||||||||||||
| debug!(end_block_hash = %expected_hash, "Historical sync completed"); | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| warn!( | ||||||||||||||||||||||||||||||
| end_block = end, | ||||||||||||||||||||||||||||||
| old_hash = %expected_hash, | ||||||||||||||||||||||||||||||
| new_hash = %current_hash, | ||||||||||||||||||||||||||||||
| "Reorg detected, re-streaming non-finalized blocks" | ||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| let common_ancestor = non_finalized_start.saturating_sub(1); | ||||||||||||||||||||||||||||||
| if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() { | ||||||||||||||||||||||||||||||
| return None; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Some(current_hash) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of
Options, returnChannelStates