diff --git a/Cargo.lock b/Cargo.lock index 8b120441..be103a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1551,7 +1551,6 @@ dependencies = [ "base-flashtypes", "base-reth-test-utils", "criterion", - "eyre", "futures-util", "metrics", "metrics-derive", @@ -1579,6 +1578,7 @@ dependencies = [ "reth-transaction-pool", "rstest", "serde_json", + "thiserror 2.0.17", "tokio", "tokio-tungstenite 0.28.0", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 6d99fbcc..86e1125d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,3 +162,4 @@ derive_more = "2.1.0" serde_json = "1.0.145" metrics-derive = "0.1.0" tracing-subscriber = "0.3.22" +thiserror = "2.0" diff --git a/crates/flashblocks/Cargo.toml b/crates/flashblocks/Cargo.toml index 31598f74..364f71e3 100644 --- a/crates/flashblocks/Cargo.toml +++ b/crates/flashblocks/Cargo.toml @@ -50,7 +50,7 @@ futures-util.workspace = true # misc url.workspace = true -eyre.workspace = true +thiserror.workspace = true tracing.workspace = true metrics.workspace = true arc-swap.workspace = true @@ -58,6 +58,7 @@ metrics-derive.workspace = true rayon.workspace = true [dev-dependencies] +rstest.workspace = true rand.workspace = true reth-db.workspace = true once_cell.workspace = true @@ -73,7 +74,6 @@ reth-primitives-traits.workspace = true reth-optimism-primitives.workspace = true reth-transaction-pool.workspace = true serde_json.workspace = true -rstest.workspace = true criterion = { version = "0.5", features = ["async_tokio"] } [[bench]] diff --git a/crates/flashblocks/src/error.rs b/crates/flashblocks/src/error.rs new file mode 100644 index 00000000..8574125f --- /dev/null +++ b/crates/flashblocks/src/error.rs @@ -0,0 +1,302 @@ +//! Error types for the flashblocks state processor. + +use alloy_consensus::crypto::RecoveryError; +use alloy_primitives::{Address, B256}; +use thiserror::Error; + +/// Errors related to flashblock protocol sequencing and ordering. +#[derive(Debug, Clone, Eq, PartialEq, Error)] +pub enum ProtocolError { + /// Invalid flashblock sequence or ordering. + #[error("invalid flashblock sequence: flashblocks must be processed in order")] + InvalidSequence, + + /// First flashblock in a sequence must contain a base payload. + #[error("missing base: first flashblock in sequence must contain a base payload")] + MissingBase, + + /// Cannot build from an empty flashblocks collection. + #[error("empty flashblocks: cannot build state from zero flashblocks")] + EmptyFlashblocks, +} + +/// Errors related to state provider and infrastructure operations. +#[derive(Debug, Clone, Eq, PartialEq, Error)] +pub enum ProviderError { + /// Missing canonical header for a given block number. + #[error( + "missing canonical header for block {block_number}. This can be ignored if the node has recently restarted, restored from a snapshot or is still syncing." + )] + MissingCanonicalHeader { + /// The block number for which the header is missing. + block_number: u64, + }, + + /// State provider error with context. + #[error("state provider error: {0}")] + StateProvider(String), +} + +/// Errors related to transaction execution and processing. +#[derive(Debug, Clone, Eq, PartialEq, Error)] +pub enum ExecutionError { + /// Transaction execution failed. + #[error("transaction execution failed for tx {tx_hash} from sender {sender}: {reason}")] + TransactionFailed { + /// The hash of the failed transaction. + tx_hash: B256, + /// The sender address of the failed transaction. + sender: Address, + /// The reason for the execution failure. + reason: String, + }, + + /// ECDSA signature recovery failed. + #[error("sender recovery failed: {0}")] + SenderRecovery(String), + + /// Deposit transaction paired with a non-deposit receipt. + #[error("deposit receipt mismatch: deposit transaction must have a deposit receipt")] + DepositReceiptMismatch, + + /// Cumulative gas used overflow. + #[error("gas overflow: cumulative gas used exceeded u64::MAX")] + GasOverflow, + + /// EVM environment setup error. + #[error("EVM environment error: {0}")] + EvmEnv(String), + + /// L1 block info extraction error. + #[error("L1 block info extraction error: {0}")] + L1BlockInfo(String), + + /// Payload to block conversion error. + #[error("block conversion error: {0}")] + BlockConversion(String), + + /// Failed to load cache account for depositor. + #[error("failed to load cache account for deposit transaction sender")] + DepositAccountLoad, +} + +impl From for ExecutionError { + fn from(err: RecoveryError) -> Self { + Self::SenderRecovery(err.to_string()) + } +} + +/// Errors related to pending blocks construction. +#[derive(Debug, Clone, Eq, PartialEq, Error)] +pub enum BuildError { + /// Cannot build pending blocks without headers. + #[error("missing headers: cannot build pending blocks without header information")] + MissingHeaders, + + /// Cannot build pending blocks with no flashblocks. + #[error("no flashblocks: cannot build pending blocks from empty flashblock collection")] + NoFlashblocks, +} + +/// Errors that can occur during flashblock state processing. +#[derive(Debug, Clone, Eq, PartialEq, Error)] +pub enum StateProcessorError { + /// Protocol-level errors (sequencing, ordering). + #[error(transparent)] + Protocol(#[from] ProtocolError), + + /// Provider/infrastructure errors. + #[error(transparent)] + Provider(#[from] ProviderError), + + /// Transaction execution errors. + #[error(transparent)] + Execution(#[from] ExecutionError), + + /// Pending blocks build errors. + #[error(transparent)] + Build(#[from] BuildError), +} + +impl From for StateProcessorError { + fn from(err: RecoveryError) -> Self { + Self::Execution(ExecutionError::from(err)) + } +} + +/// A type alias for `Result`. +pub type Result = std::result::Result; + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + #[rstest] + #[case::invalid_sequence( + ProtocolError::InvalidSequence, + "invalid flashblock sequence: flashblocks must be processed in order" + )] + #[case::missing_base( + ProtocolError::MissingBase, + "missing base: first flashblock in sequence must contain a base payload" + )] + #[case::empty_flashblocks( + ProtocolError::EmptyFlashblocks, + "empty flashblocks: cannot build state from zero flashblocks" + )] + fn test_protocol_error_display(#[case] error: ProtocolError, #[case] expected: &str) { + assert_eq!(error.to_string(), expected); + } + + #[rstest] + #[case::missing_canonical_header( + ProviderError::MissingCanonicalHeader { block_number: 12345 }, + "missing canonical header for block 12345. This can be ignored if the node has recently restarted, restored from a snapshot or is still syncing." + )] + #[case::state_provider( + ProviderError::StateProvider("connection failed".to_string()), + "state provider error: connection failed" + )] + fn test_provider_error_display(#[case] error: ProviderError, #[case] expected: &str) { + assert_eq!(error.to_string(), expected); + } + + #[rstest] + #[case::deposit_receipt_mismatch( + ExecutionError::DepositReceiptMismatch, + "deposit receipt mismatch: deposit transaction must have a deposit receipt" + )] + #[case::gas_overflow( + ExecutionError::GasOverflow, + "gas overflow: cumulative gas used exceeded u64::MAX" + )] + #[case::evm_env( + ExecutionError::EvmEnv("invalid chain id".to_string()), + "EVM environment error: invalid chain id" + )] + #[case::l1_block_info( + ExecutionError::L1BlockInfo("missing l1 data".to_string()), + "L1 block info extraction error: missing l1 data" + )] + #[case::block_conversion( + ExecutionError::BlockConversion("invalid payload".to_string()), + "block conversion error: invalid payload" + )] + #[case::deposit_account_load( + ExecutionError::DepositAccountLoad, + "failed to load cache account for deposit transaction sender" + )] + #[case::sender_recovery( + ExecutionError::SenderRecovery("invalid signature".to_string()), + "sender recovery failed: invalid signature" + )] + fn test_execution_error_display(#[case] error: ExecutionError, #[case] expected: &str) { + assert_eq!(error.to_string(), expected); + } + + #[rstest] + #[case::transaction_failed( + ExecutionError::TransactionFailed { + tx_hash: B256::ZERO, + sender: Address::ZERO, + reason: "out of gas".to_string(), + }, + &["transaction execution failed", "out of gas"] + )] + fn test_execution_error_display_contains( + #[case] error: ExecutionError, + #[case] substrings: &[&str], + ) { + let display = error.to_string(); + for substring in substrings { + assert!(display.contains(substring), "expected '{display}' to contain '{substring}'"); + } + } + + #[rstest] + #[case::missing_headers( + BuildError::MissingHeaders, + "missing headers: cannot build pending blocks without header information" + )] + #[case::no_flashblocks( + BuildError::NoFlashblocks, + "no flashblocks: cannot build pending blocks from empty flashblock collection" + )] + fn test_build_error_display(#[case] error: BuildError, #[case] expected: &str) { + assert_eq!(error.to_string(), expected); + } + + #[rstest] + #[case::protocol(StateProcessorError::from(ProtocolError::InvalidSequence))] + #[case::provider(StateProcessorError::from(ProviderError::MissingCanonicalHeader { block_number: 100 }))] + #[case::execution(StateProcessorError::from(ExecutionError::GasOverflow))] + #[case::build(StateProcessorError::from(BuildError::MissingHeaders))] + fn test_state_processor_error_from_variants(#[case] error: StateProcessorError) { + let debug_str = format!("{:?}", error); + assert!(!debug_str.is_empty()); + let display_str = error.to_string(); + assert!(!display_str.is_empty()); + } + + #[test] + fn test_error_in_result() { + fn returns_ok() -> Result { + Ok(42) + } + + fn returns_err() -> Result { + Err(ExecutionError::GasOverflow.into()) + } + + assert!(returns_ok().is_ok()); + assert_eq!(returns_ok().unwrap(), 42); + assert!(returns_err().is_err()); + assert!(matches!( + returns_err().unwrap_err(), + StateProcessorError::Execution(ExecutionError::GasOverflow) + )); + } + + #[test] + fn test_error_is_send_sync() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + assert_send::(); + assert_sync::(); + assert_send::(); + assert_sync::(); + assert_send::(); + assert_sync::(); + assert_send::(); + assert_sync::(); + } + + #[test] + fn test_sender_recovery_from_impl() { + let recovery_err = RecoveryError::new(); + let err: StateProcessorError = recovery_err.into(); + assert!(matches!(err, StateProcessorError::Execution(ExecutionError::SenderRecovery(_)))); + assert!(err.to_string().contains("sender recovery failed")); + } + + #[test] + fn test_error_category_matching() { + let protocol_err: StateProcessorError = ProtocolError::InvalidSequence.into(); + assert!(matches!(protocol_err, StateProcessorError::Protocol(_))); + + let provider_err: StateProcessorError = + ProviderError::MissingCanonicalHeader { block_number: 1 }.into(); + assert!(matches!(provider_err, StateProcessorError::Provider(_))); + + let execution_err: StateProcessorError = ExecutionError::GasOverflow.into(); + assert!(matches!(execution_err, StateProcessorError::Execution(_))); + + let build_err: StateProcessorError = BuildError::MissingHeaders.into(); + assert!(matches!(build_err, StateProcessorError::Build(_))); + } +} diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 24b76bc8..e75be71a 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -6,6 +6,11 @@ #[macro_use] extern crate tracing; +mod error; +pub use error::{ + BuildError, ExecutionError, ProtocolError, ProviderError, Result, StateProcessorError, +}; + mod metrics; pub use metrics::Metrics; diff --git a/crates/flashblocks/src/pending_blocks.rs b/crates/flashblocks/src/pending_blocks.rs index eaf31ed5..f343cece 100644 --- a/crates/flashblocks/src/pending_blocks.rs +++ b/crates/flashblocks/src/pending_blocks.rs @@ -11,14 +11,13 @@ use alloy_rpc_types::{BlockTransactions, state::StateOverride}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use arc_swap::Guard; use base_flashtypes::Flashblock; -use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{db::Cache, state::EvmState}; use reth_rpc_convert::RpcTransaction; use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use crate::PendingBlocksAPI; +use crate::{BuildError, PendingBlocksAPI, StateProcessorError}; /// Builder for [`PendingBlocks`]. #[derive(Debug)] @@ -122,13 +121,13 @@ impl PendingBlocksBuilder { self } - pub(crate) fn build(self) -> eyre::Result { + pub(crate) fn build(self) -> Result { if self.headers.is_empty() { - return Err(eyre!("missing headers")); + return Err(BuildError::MissingHeaders.into()); } if self.flashblocks.is_empty() { - return Err(eyre!("no flashblocks")); + return Err(BuildError::NoFlashblocks.into()); } Ok(PendingBlocks { diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 5c705531..61d087d7 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -13,7 +13,6 @@ use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPa use alloy_rpc_types_eth::state::StateOverride; use arc_swap::ArcSwapOption; use base_flashtypes::Flashblock; -use eyre::eyre; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::TransactionResponse; use rayon::prelude::*; @@ -30,7 +29,8 @@ use reth_primitives::RecoveredBlock; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; use crate::{ - Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, + ExecutionError, Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, + ProtocolError, ProviderError, Result, validation::{ CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, ReorgDetector, SequenceValidationResult, @@ -122,7 +122,7 @@ where &self, prev_pending_blocks: Option>, block: &RecoveredBlock, - ) -> eyre::Result>> { + ) -> Result>> { let pending_blocks = match &prev_pending_blocks { Some(pb) => pb, None => { @@ -214,7 +214,7 @@ where &self, prev_pending_blocks: Option>, flashblock: Flashblock, - ) -> eyre::Result>> { + ) -> Result>> { let pending_blocks = match &prev_pending_blocks { Some(pb) => pb, None => { @@ -280,7 +280,7 @@ where &self, prev_pending_blocks: Option>, flashblocks: &Vec, - ) -> eyre::Result>> { + ) -> Result>> { // BTreeMap guarantees ascending order of keys while iterating let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { @@ -292,14 +292,17 @@ where let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; - let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( - "Failed to extract header for canonical block number {}. This can be ignored if the node has recently restarted, restored from a snapshot or is still syncing.", - canonical_block - ))?; + let mut last_block_header = self + .client + .header_by_number(canonical_block) + .map_err(|e| ProviderError::StateProvider(e.to_string()))? + .ok_or(ProviderError::MissingCanonicalHeader { block_number: canonical_block })?; let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); - let state_provider = - self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; + let state_provider = self + .client + .state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block)) + .map_err(|e| ProviderError::StateProvider(e.to_string()))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); @@ -317,15 +320,13 @@ where for (_block_number, flashblocks) in flashblocks_per_block { let base = flashblocks .first() - .ok_or(eyre!("cannot build a pending block from no flashblocks"))? + .ok_or(ProtocolError::EmptyFlashblocks)? .base .clone() - .ok_or(eyre!("first flashblock does not contain a base"))?; + .ok_or(ProtocolError::MissingBase)?; - let latest_flashblock = flashblocks - .last() - .cloned() - .ok_or(eyre!("cannot build a pending block from no flashblocks"))?; + let latest_flashblock = + flashblocks.last().cloned().ok_or(ProtocolError::EmptyFlashblocks)?; let transactions: Vec = flashblocks .iter() @@ -365,8 +366,11 @@ where }, }; - let block: OpBlock = execution_payload.try_into_block()?; - let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; + let block: OpBlock = execution_payload + .try_into_block() + .map_err(|e| ExecutionError::BlockConversion(e.to_string()))?; + let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body) + .map_err(|e| ExecutionError::L1BlockInfo(e.to_string()))?; let block_header = block.header.clone(); // prevents us from needing to clone the entire block let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks pending_blocks_builder.with_header(sealed_header); @@ -380,7 +384,9 @@ where extra_data: base.extra_data.clone(), }; - let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; + let evm_env = evm_config + .next_evm_env(&last_block_header, &block_env_attributes) + .map_err(|e| ExecutionError::EvmEnv(e.to_string()))?; let evm = evm_config.evm_with_env(db, evm_env); // Parallel sender recovery - batch all ECDSA operations upfront @@ -390,7 +396,7 @@ where .transactions .par_iter() .cloned() - .map(|tx| -> eyre::Result<(OpTxEnvelope, Address)> { + .map(|tx| -> Result<(OpTxEnvelope, Address)> { let tx_hash = tx.tx_hash(); let sender = match prev_pending_blocks .as_ref() @@ -401,7 +407,7 @@ where }; Ok((tx, sender)) }) - .collect::>()?; + .collect::>()?; self.metrics.sender_recovery_duration.record(recovery_start.elapsed()); let mut pending_state_builder = PendingStateBuilder::new( diff --git a/crates/flashblocks/src/state_builder.rs b/crates/flashblocks/src/state_builder.rs index 3505bf67..5a655eb7 100644 --- a/crates/flashblocks/src/state_builder.rs +++ b/crates/flashblocks/src/state_builder.rs @@ -8,7 +8,6 @@ use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; use alloy_primitives::B256; use alloy_rpc_types::TransactionTrait; use alloy_rpc_types_eth::state::StateOverride; -use eyre::eyre; use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{Database, DatabaseCommit, context::result::ResultAndState, state::EvmState}; @@ -21,7 +20,7 @@ use reth_optimism_primitives::OpPrimitives; use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; use reth_rpc_convert::transaction::ConvertReceiptInput; -use crate::PendingBlocks; +use crate::{ExecutionError, PendingBlocks, StateProcessorError}; /// Represents the result of executing or fetching a cached pending transaction. #[derive(Debug, Clone)] @@ -91,7 +90,7 @@ where &mut self, idx: usize, transaction: Recovered, - ) -> eyre::Result { + ) -> Result { let tx_hash = transaction.tx_hash(); let effective_gas_price = if transaction.is_deposit() { @@ -130,13 +129,13 @@ where state: EvmState, idx: usize, effective_gas_price: u128, - ) -> eyre::Result { + ) -> Result { let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { let deposit_receipt = receipt .inner .inner .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + .ok_or(ExecutionError::DepositReceiptMismatch)?; (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) } else { @@ -158,7 +157,7 @@ where self.cumulative_gas_used = self .cumulative_gas_used .checked_add(receipt.inner.gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; + .ok_or(ExecutionError::GasOverflow)?; self.next_log_index += receipt.inner.logs().len(); Ok(ExecutedPendingTransaction { rpc_transaction, receipt, state }) @@ -170,7 +169,7 @@ where transaction: Recovered, idx: usize, effective_gas_price: u128, - ) -> eyre::Result { + ) -> Result { let tx_hash = transaction.tx_hash(); match self.evm.transact(&transaction) { @@ -194,7 +193,7 @@ where self.cumulative_gas_used = self .cumulative_gas_used .checked_add(gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; + .ok_or(ExecutionError::GasOverflow)?; let is_canyon_active = self.chain_spec.is_canyon_active_at_timestamp(self.pending_block.timestamp); @@ -226,7 +225,7 @@ where .map(|acc| acc.unwrap_or_default().nonce) }) .transpose() - .map_err(|_| eyre!("failed to load cache account for depositor"))?; + .map_err(|_| ExecutionError::DepositAccountLoad)?; self.receipt_builder.build_deposit_receipt(OpDepositReceipt { inner: receipt, @@ -266,7 +265,7 @@ where .inner .inner .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + .ok_or(ExecutionError::DepositReceiptMismatch)?; (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) } else { @@ -288,12 +287,12 @@ where Ok(ExecutedPendingTransaction { rpc_transaction, receipt: op_receipt, state }) } - Err(e) => Err(eyre!( - "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", - e, + Err(e) => Err(ExecutionError::TransactionFailed { tx_hash, - transaction.signer() - )), + sender: transaction.signer(), + reason: format!("{:?}", e), + } + .into()), } } }