Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ reth-primitives-traits.workspace = true
reth-optimism-primitives.workspace = true
reth-transaction-pool.workspace = true
serde_json.workspace = true
rstest.workspace = true
criterion = { version = "0.5", features = ["async_tokio"] }

[[bench]]
Expand Down
9 changes: 9 additions & 0 deletions crates/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

#[macro_use]
extern crate tracing;

mod metrics;
pub use metrics::Metrics;

Expand All @@ -23,3 +26,9 @@ pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI};

mod state_builder;
pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder};

mod validation;
pub use validation::{
CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy,
ReorgDetectionResult, ReorgDetector, SequenceValidationResult,
};
280 changes: 139 additions & 141 deletions crates/flashblocks/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! Flashblocks state processor.

use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
time::Instant,
};
use std::{collections::BTreeMap, sync::Arc, time::Instant};

use alloy_consensus::{
Header,
Expand Down Expand Up @@ -32,9 +28,14 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_optimism_primitives::OpBlock;
use reth_primitives::RecoveredBlock;
use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver};
use tracing::{debug, error, info, warn};

use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder};
use crate::{
Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder,
validation::{
CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy,
ReorgDetector, SequenceValidationResult,
},
};

/// Messages consumed by the state processor.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -122,86 +123,87 @@ where
prev_pending_blocks: Option<Arc<PendingBlocks>>,
block: &RecoveredBlock<OpBlock>,
) -> eyre::Result<Option<Arc<PendingBlocks>>> {
match &prev_pending_blocks {
Some(pending_blocks) => {
let mut flashblocks = pending_blocks.get_flashblocks();
let num_flashblocks_for_canon = flashblocks
.iter()
.filter(|fb| fb.metadata.block_number == block.number)
.count();
self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64);
self.metrics
.pending_snapshot_height
.set(pending_blocks.latest_block_number() as f64);

if pending_blocks.latest_block_number() <= block.number {
debug!(
message = "pending snapshot cleared because canonical caught up",
latest_pending_block = pending_blocks.latest_block_number(),
canonical_block = block.number,
);
self.metrics.pending_clear_catchup.increment(1);
self.metrics
.pending_snapshot_fb_index
.set(pending_blocks.latest_flashblock_index() as f64);

Ok(None)
} else {
// If we had a reorg, we need to reset all flashblocks state
let tracked_txns = pending_blocks.get_transactions_for_block(block.number);
let tracked_txn_hashes: HashSet<_> =
tracked_txns.iter().map(|tx| tx.tx_hash()).collect();
let block_txn_hashes: HashSet<_> =
block.body().transactions().map(|tx| tx.tx_hash()).collect();
let pending_blocks_depth =
block.number - pending_blocks.earliest_block_number();

debug!(
message = "canonical block behind latest pending block, checking for reorg and max depth",
latest_pending_block = pending_blocks.latest_block_number(),
earliest_pending_block = pending_blocks.earliest_block_number(),
canonical_block = block.number,
pending_txns_for_block = ?tracked_txn_hashes.len(),
canonical_txns_for_block = ?block_txn_hashes.len(),
pending_blocks_depth = pending_blocks_depth,
max_depth = self.max_depth,
);

if tracked_txn_hashes.len() != block_txn_hashes.len()
|| tracked_txn_hashes != block_txn_hashes
{
debug!(
message = "reorg detected, recomputing pending flashblocks going ahead of reorg",
tracked_txn_hashes = ?tracked_txn_hashes,
block_txn_hashes = ?block_txn_hashes,
);
self.metrics.pending_clear_reorg.increment(1);

// If there is a reorg, we re-process all future flashblocks without reusing the existing pending state
flashblocks
.retain(|flashblock| flashblock.metadata.block_number > block.number);
return self.build_pending_state(None, &flashblocks);
}

if pending_blocks_depth > self.max_depth {
debug!(
message =
"pending blocks depth exceeds max depth, resetting pending blocks",
pending_blocks_depth = pending_blocks_depth,
max_depth = self.max_depth,
);

flashblocks
.retain(|flashblock| flashblock.metadata.block_number > block.number);
return self.build_pending_state(None, &flashblocks);
}
let pending_blocks = match &prev_pending_blocks {
Some(pb) => pb,
None => {
debug!(message = "no pending state to update with canonical block, skipping");
return Ok(None);
}
};

// If no reorg, we can continue building on top of the existing pending state
// NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number
self.build_pending_state(prev_pending_blocks, &flashblocks)
}
let mut flashblocks = pending_blocks.get_flashblocks();
let num_flashblocks_for_canon =
flashblocks.iter().filter(|fb| fb.metadata.block_number == block.number).count();
self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64);
self.metrics.pending_snapshot_height.set(pending_blocks.latest_block_number() as f64);

// Check for reorg by comparing transaction sets
let tracked_txns = pending_blocks.get_transactions_for_block(block.number);
let tracked_txn_hashes: Vec<_> = tracked_txns.iter().map(|tx| tx.tx_hash()).collect();
let block_txn_hashes: Vec<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect();

let reorg_result = ReorgDetector::detect(&tracked_txn_hashes, &block_txn_hashes);
let reorg_detected = reorg_result.is_reorg();

// Determine the reconciliation strategy
let strategy = CanonicalBlockReconciler::reconcile(
Some(pending_blocks.earliest_block_number()),
Some(pending_blocks.latest_block_number()),
block.number,
self.max_depth,
reorg_detected,
);

match strategy {
ReconciliationStrategy::CatchUp => {
debug!(
message = "pending snapshot cleared because canonical caught up",
latest_pending_block = pending_blocks.latest_block_number(),
canonical_block = block.number,
);
self.metrics.pending_clear_catchup.increment(1);
self.metrics
.pending_snapshot_fb_index
.set(pending_blocks.latest_flashblock_index() as f64);
Ok(None)
}
None => {
ReconciliationStrategy::HandleReorg => {
warn!(
message = "reorg detected, recomputing pending flashblocks going ahead of reorg",
tracked_txn_hashes = ?tracked_txn_hashes,
block_txn_hashes = ?block_txn_hashes,
);
self.metrics.pending_clear_reorg.increment(1);

// If there is a reorg, we re-process all future flashblocks without reusing the existing pending state
flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number);
self.build_pending_state(None, &flashblocks)
}
ReconciliationStrategy::DepthLimitExceeded { depth, max_depth } => {
debug!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a warn log, it's a sign the node is falling behind

Copy link
Collaborator

@haardikk21 haardikk21 Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DepthLimitExceeded is not a sign of the node lagging. This occurs because the default Continue case doesn't clear pending state to avoid re-execution of transactions. When flashblocks regularly outpace canon blocks, eventually it will have a LOT of pending state which can cause slowness. The depth limit check ensures that after a certain threshold, we force clear the state and eat the cost of re-executing some (small) number of txns (< 1 block)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context

message = "pending blocks depth exceeds max depth, resetting pending blocks",
pending_blocks_depth = depth,
max_depth = max_depth,
);

flashblocks.retain(|flashblock| flashblock.metadata.block_number > block.number);
self.build_pending_state(None, &flashblocks)
}
ReconciliationStrategy::Continue => {
debug!(
message = "canonical block behind latest pending block, continuing with existing pending state",
latest_pending_block = pending_blocks.latest_block_number(),
earliest_pending_block = pending_blocks.earliest_block_number(),
canonical_block = block.number,
pending_txns_for_block = ?tracked_txn_hashes.len(),
canonical_txns_for_block = ?block_txn_hashes.len(),
);
// If no reorg, we can continue building on top of the existing pending state
// NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number
self.build_pending_state(prev_pending_blocks, &flashblocks)
}
ReconciliationStrategy::NoPendingState => {
// This case is already handled above, but included for completeness
debug!(message = "no pending state to update with canonical block, skipping");
Ok(None)
}
Expand All @@ -213,53 +215,64 @@ where
prev_pending_blocks: Option<Arc<PendingBlocks>>,
flashblock: Flashblock,
) -> eyre::Result<Option<Arc<PendingBlocks>>> {
match &prev_pending_blocks {
Some(pending_blocks) => {
if self.is_next_flashblock(pending_blocks, &flashblock) {
// We have received the next flashblock for the current block
// or the first flashblock for the next block
let mut flashblocks = pending_blocks.get_flashblocks();
flashblocks.push(flashblock);
self.build_pending_state(prev_pending_blocks, &flashblocks)
} else if pending_blocks.latest_block_number() != flashblock.metadata.block_number {
// We have received a non-zero flashblock for a new block
self.metrics.unexpected_block_order.increment(1);
error!(
message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_blocks.latest_block_number(),
new_block = %flashblock.metadata.block_number,
);
Ok(None)
} else if pending_blocks.latest_flashblock_index() == flashblock.index {
// We have received a duplicate flashblock for the current block
self.metrics.unexpected_block_order.increment(1);
warn!(
message = "Received duplicate Flashblock for current block, ignoring",
curr_block = %pending_blocks.latest_block_number(),
flashblock_index = %flashblock.index,
);
Ok(prev_pending_blocks)
} else {
// We have received a non-sequential Flashblock for the current block
self.metrics.unexpected_block_order.increment(1);

error!(
message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_blocks.latest_block_number(),
new_block = %flashblock.metadata.block_number,
);

Ok(None)
}
}
let pending_blocks = match &prev_pending_blocks {
Some(pb) => pb,
None => {
if flashblock.index == 0 {
self.build_pending_state(None, &vec![flashblock])
return self.build_pending_state(None, &vec![flashblock]);
} else {
info!(message = "waiting for first Flashblock");
Ok(None)
return Ok(None);
}
}
};

let validation_result = FlashblockSequenceValidator::validate(
pending_blocks.latest_block_number(),
pending_blocks.latest_flashblock_index(),
flashblock.metadata.block_number,
flashblock.index,
);

match validation_result {
SequenceValidationResult::NextInSequence
| SequenceValidationResult::FirstOfNextBlock => {
// We have received the next flashblock for the current block
// or the first flashblock for the next block
let mut flashblocks = pending_blocks.get_flashblocks();
flashblocks.push(flashblock);
self.build_pending_state(prev_pending_blocks, &flashblocks)
}
SequenceValidationResult::Duplicate => {
// We have received a duplicate flashblock for the current block
self.metrics.unexpected_block_order.increment(1);
warn!(
message = "Received duplicate Flashblock for current block, ignoring",
curr_block = %pending_blocks.latest_block_number(),
flashblock_index = %flashblock.index,
);
Ok(prev_pending_blocks)
}
SequenceValidationResult::InvalidNewBlockIndex { block_number, index: _ } => {
// We have received a non-zero flashblock for a new block
self.metrics.unexpected_block_order.increment(1);
error!(
message = "Received non-zero index Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_blocks.latest_block_number(),
new_block = %block_number,
);
Ok(None)
}
SequenceValidationResult::NonSequentialGap { expected: _, actual: _ } => {
// We have received a non-sequential Flashblock for the current block
self.metrics.unexpected_block_order.increment(1);
error!(
message = "Received non-sequential Flashblock for current block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_blocks.latest_block_number(),
new_block = %flashblock.metadata.block_number,
);
Ok(None)
}
}
}

Expand Down Expand Up @@ -432,19 +445,4 @@ where

Ok(Some(Arc::new(pending_blocks_builder.build()?)))
}

fn is_next_flashblock(
&self,
pending_blocks: &Arc<PendingBlocks>,
flashblock: &Flashblock,
) -> bool {
let is_next_of_block = flashblock.metadata.block_number
== pending_blocks.latest_block_number()
&& flashblock.index == pending_blocks.latest_flashblock_index() + 1;
let is_first_of_next_block = flashblock.metadata.block_number
== pending_blocks.latest_block_number() + 1
&& flashblock.index == 0;

is_next_of_block || is_first_of_next_block
}
}
1 change: 0 additions & 1 deletion crates/flashblocks/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use tokio::sync::{
broadcast::{self, Sender},
mpsc,
};
use tracing::{error, info};

use crate::{
FlashblocksAPI, FlashblocksReceiver, PendingBlocks,
Expand Down
1 change: 0 additions & 1 deletion crates/flashblocks/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use base_flashtypes::Flashblock;
use futures_util::{SinkExt as _, StreamExt};
use tokio::{sync::mpsc, time::interval};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tracing::{error, info, trace, warn};
use url::Url;

use crate::{FlashblocksReceiver, Metrics};
Expand Down
Loading