Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 0 additions & 76 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
};
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network},
primitives::BlockNumber,
};
Expand Down Expand Up @@ -385,81 +384,6 @@ struct LiveStreamingState<N: Network> {
previous_batch_end: Option<N::BlockResponse>,
}

#[must_use]
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
)]
pub(crate) async fn stream_historical_range<N: Network, R: ReorgHandler<N>>(
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut R,
) -> Option<()> {
// NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized
// depth) just use zero.
// Since we use the finalized block number only to determine whether to run reorg checks
// or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0`
// even on errors. Here `0` is used because it effectively just enables reorg checks.
// If there was actually a provider problem, any subsequent provider call will catch and
// properly log it and return the error to the caller.
let finalized_block_num =
provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0);

// no reorg check for finalized blocks
let finalized_batch_end = finalized_block_num.min(end);
let finalized_range_count =
RangeIterator::forward(start, finalized_batch_end, max_block_range).count();
trace!(
start = start,
finalized_batch_end = finalized_batch_end,
batch_count = finalized_range_count,
"Streaming finalized blocks (no reorg check)"
);

for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range");
if sender.try_stream(range).await.is_closed() {
return None; // channel closed
}
}

// If start > finalized_batch_end, the loop above was empty and we should
// continue from start. Otherwise, continue from after finalized_batch_end.
let batch_start = start.max(finalized_batch_end + 1);

// covers case when `end <= finalized`
if batch_start > end {
return Some(()); // we're done
}

// we have non-finalized block numbers to stream, a reorg can occur

// Possible minimal common ancestors when a reorg occurs:
// * start > finalized -> the common ancestor we care about is the block before `start`, that's
// where the stream should restart -> this is why we used `start - 1`
// * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart
// on `start + 1`
// * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only
// re-stream non-finalized blocks
let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num);

stream_range_with_reorg_handling(
min_common_ancestor,
batch_start,
end,
max_block_range,
sender,
provider,
reorg_handler,
)
.await?;

Some(())
}

/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
#[cfg_attr(
feature = "tracing",
Expand Down
224 changes: 224 additions & 0 deletions src/block_range_scanner/historical_range_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use alloy::{
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{BlockHash, BlockNumber},
};
use tokio::sync::mpsc;

use robust_provider::RobustProvider;

use crate::{
Notification,
block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator},
types::TryStream,
};

pub(crate) struct HistoricalRangeHandler<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
start: BlockNumber,
end: BlockNumber,
sender: mpsc::Sender<BlockScannerResult>,
}

impl<N: Network> HistoricalRangeHandler<N> {
pub fn new(
provider: RobustProvider<N>,
max_block_range: u64,
start: BlockNumber,
end: BlockNumber,
sender: mpsc::Sender<BlockScannerResult>,
) -> Self {
Self { provider, max_block_range, start, end, sender }
}

pub fn run(self) {
let HistoricalRangeHandler { provider, max_block_range, start, end, sender } = self;

info!(
start_block = start,
end_block = end,
total_blocks = end.saturating_sub(start) + 1,
"Starting historical range stream"
);

tokio::spawn(async move {
let _ = Self::handle_stream_historical_range(
start,
end,
max_block_range,
&sender,
&provider,
)
.await;
debug!("Historical range stream ended");
});
}

#[must_use]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))]
pub(crate) async fn stream_historical_range(
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> Option<()> {
Self::handle_stream_historical_range(start, end, max_block_range, sender, provider).await
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(sender, provider)))]
async fn handle_stream_historical_range(
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> Option<()> {
// Phase 1: Stream all finalized blocks without any reorg checks
let non_finalized_start =
Self::stream_finalized_blocks(provider, start, end, max_block_range, sender).await?;

// All blocks already finalized
if non_finalized_start > end {
return Some(());
}

// Phase 2: Stream non-finalized blocks with reorg detection
Self::stream_non_finalized_blocks(
non_finalized_start,
end,
max_block_range,
sender,
provider,
)
.await
}

/// Streams finalized blocks without reorg checks.
/// Returns the starting block number for non-finalized streaming, or None if channel closed.
async fn stream_finalized_blocks(
provider: &RobustProvider<N>,
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
) -> Option<BlockNumber> {
// NOTE: Edge case - If the chain is too young to expose finalized blocks (height <
// finalized depth) just use zero. Since we use the finalized block number only to
// determine whether to run reorg checks or not, this is a "low-stakes" RPC call.
let finalized_block_num =
provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0);

let finalized_batch_end = finalized_block_num.min(end);

for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
trace!(
range_start = *range.start(),
range_end = *range.end(),
"Streaming finalized range"
);
if sender.try_stream(range).await.is_closed() {
return None;
}
}

// If start > finalized_batch_end, the loop above was empty and we should
// continue from start. Otherwise, continue from after finalized_batch_end.
Some(start.max(finalized_batch_end + 1))
}

/// Streams non-finalized blocks with reorg detection.
/// Re-streams if a reorg is detected, repeating until stable.
async fn stream_non_finalized_blocks(
non_finalized_start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> Option<()> {
// Get the end block's hash before streaming
let mut end_block_hash = match provider.get_block_by_number(end.into()).await {
Ok(block) => block.header().hash(),
Err(e) => {
error!("Failed to get end block hash");
_ = sender.try_stream(e).await;
return None;
}
};

loop {
// Stream all non-finalized ranges
for range in RangeIterator::forward(non_finalized_start, end, max_block_range) {
if sender.try_stream(range).await.is_closed() {
return None;
}
}

// Check for reorg - returns Some(new_hash) if reorg detected
match Self::check_reorg(end, end_block_hash, non_finalized_start, sender, provider)
.await
{
Some(new_hash) => end_block_hash = new_hash,
None => return Some(()),
}
}
}

async fn check_reorg(
end: BlockNumber,
expected_hash: BlockHash,
non_finalized_start: BlockNumber,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> Option<BlockHash> {
let current_end_block = match provider.get_block_by_number(end.into()).await {
Ok(block) => block,
Err(e) => {
error!("Failed to fetch end block for reorg check");
_ = sender.try_stream(e).await;
return None;
}
};

let current_hash = current_end_block.header().hash();
if current_hash == expected_hash {
debug!(end_block_hash = %expected_hash, "Historical sync completed");
return None;
}

warn!(
end_block = end,
old_hash = %expected_hash,
new_hash = %current_hash,
"Reorg detected, re-streaming non-finalized blocks"
);

let common_ancestor = non_finalized_start.saturating_sub(1);
if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() {
return None;
}

// Check if finalized has advanced past end
let current_finalized =
match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await {
Ok(block) => block,
Err(e) => {
error!("Failed to get updated finalized block");
_ = sender.try_stream(e).await;
return None;
}
};

if current_finalized >= end {
debug!(
finalized = current_finalized,
end = end,
"End block is now finalized, historical sync completed"
);
return None;
}

Some(current_hash)
}
}
1 change: 1 addition & 0 deletions src/block_range_scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod builder;
mod common;
mod historical_range_handler;
mod range_iterator;
mod reorg_handler;
mod rewind_handler;
Expand Down
26 changes: 9 additions & 17 deletions src/block_range_scanner/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use crate::{
block_range_scanner::{
RingBufferCapacity,
common::{self, BlockScannerResult},
historical_range_handler::HistoricalRangeHandler,
reorg_handler::DefaultReorgHandler,
rewind_handler::RewindHandler,
sync_handler::SyncHandler,
Expand Down Expand Up @@ -219,7 +220,6 @@ impl<N: Network> BlockRangeScanner<N> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);

let max_block_range = self.max_block_range;
let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
let provider = self.provider.clone();

let (start_block, end_block) = tokio::try_join!(
Expand All @@ -244,22 +244,14 @@ impl<N: Network> BlockRangeScanner<N> {
"Starting historical block stream"
);

tokio::spawn(async move {
let mut reorg_handler =
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);

_ = common::stream_historical_range(
start_block_num,
end_block_num,
max_block_range,
&blocks_sender,
&provider,
&mut reorg_handler,
)
.await;

debug!("Historical block stream completed");
});
let handler = HistoricalRangeHandler::new(
provider,
max_block_range,
start_block_num,
end_block_num,
blocks_sender,
);
handler.run();

Ok(ReceiverStream::new(blocks_receiver))
}
Expand Down
Loading
Loading