From bf2081649a67dd06b543565288b765abe98b9ac3 Mon Sep 17 00:00:00 2001 From: Julian Meyer Date: Wed, 7 Jan 2026 21:39:50 +0000 Subject: [PATCH 1/3] feat: move state/receipt building to new method Moves transaction processing logic of build_pending_state to a new struct. Also separates out reusing a cached execution from executing using the EVM. This also removes block_hash from receipts and transactions which matches the spec and removes the need for calculating a useless block hash for every flashblock. The next step is to move the for loop into this struct which will return an iterator of results. Using this, we'll be able to prewarm accessed addresses using FALs and eventually process transactions in parallel. Moving execution to a new struct allows us to implement a trait for building flashblocks and switch out different implementations (prewarming, parallel, sequential). --- crates/flashblocks/src/processor.rs | 552 ++++++++++++++++------------ 1 file changed, 321 insertions(+), 231 deletions(-) diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 97b52adb..b5f02914 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -7,12 +7,12 @@ use std::{ }; use alloy_consensus::{ - Eip658Value, Header, TxReceipt, + Block, Eip658Value, Header, TxReceipt, transaction::{Recovered, SignerRecoverable, TransactionMeta}, }; use alloy_eips::BlockNumberOrTag; use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; -use alloy_primitives::{Address, B256, BlockNumber, Bytes, Sealable}; +use alloy_primitives::{Address, B256, BlockNumber, Bytes, U256, map::HashMap}; use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; use alloy_rpc_types_eth::state::StateOverride; @@ -21,19 +21,22 @@ use base_flashtypes::Flashblock; use eyre::eyre; use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; use op_alloy_network::TransactionResponse; -use op_alloy_rpc_types::Transaction; +use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use rayon::prelude::*; use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, providers::{BlockReaderIdExt, StateProviderFactory}, revm::{ - DatabaseCommit, State, context::result::ResultAndState, database::StateProviderDatabase, - db::CacheDB, + Database, DatabaseCommit, State, context::result::ResultAndState, + database::StateProviderDatabase, db::CacheDB, state::EvmState, }, }; -use reth_evm::{ConfigureEvm, Evm, eth::receipt_builder::ReceiptBuilderCtx}; +use reth_evm::{ + ConfigureEvm, Evm, FromRecoveredTx, eth::receipt_builder::ReceiptBuilderCtx, + op_revm::L1BlockInfo, +}; use reth_optimism_chainspec::OpHardforks; -use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes, OpRethReceiptBuilder}; use reth_optimism_primitives::{OpBlock, OpPrimitives}; use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; use reth_primitives::RecoveredBlock; @@ -52,6 +55,286 @@ pub enum StateUpdate { Flashblock(Flashblock), } +struct ExecutedPendingTransaction { + rpc_transaction: Transaction, + receipt: OpTransactionReceipt, + state: EvmState, + touched_balances: HashMap, +} + +/// Executes or fetches cached values for transactions in a flashblock. +struct PendingStateBuilder { + cumulative_gas_used: u64, + next_log_index: usize, + + evm: E, + pending_block: Block, + l1_block_info: L1BlockInfo, + chain_spec: ChainSpec, + receipt_builder: OpRethReceiptBuilder, + + prev_pending_blocks: Option>, + state_overrides: StateOverride, +} + +impl PendingStateBuilder +where + E: Evm, + E::DB: DatabaseCommit + Database, + E::Tx: FromRecoveredTx, + ChainSpec: OpHardforks, +{ + const fn new( + chain_spec: ChainSpec, + evm: E, + pending_block: Block, + prev_pending_blocks: Option>, + l1_block_info: L1BlockInfo, + state_overrides: StateOverride, + receipt_builder: OpRethReceiptBuilder, + ) -> Self { + Self { + pending_block, + evm, + cumulative_gas_used: 0, + next_log_index: 0, + prev_pending_blocks, + l1_block_info, + state_overrides, + chain_spec, + receipt_builder, + } + } + + fn execute_transaction( + &mut self, + idx: usize, + transaction: Recovered, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + let effective_gas_price = if transaction.is_deposit() { + 0 + } else { + self.pending_block + .base_fee_per_gas + .map(|base_fee| { + transaction.effective_tip_per_gas(base_fee).unwrap_or_default() + + base_fee as u128 + }) + .unwrap_or_else(|| transaction.max_fee_per_gas()) + }; + + // Check if we have all the data we need (receipt + state) + let cached_data = self.prev_pending_blocks.as_ref().and_then(|p| { + let receipt = p.get_receipt(tx_hash)?; + let state = p.get_transaction_state(&tx_hash)?; + Some((receipt, state)) + }); + + // If cached, we can fill out pending block data using previous execution results + // If not cached, we need to execute the transaction and build pending block data from scratch + if let Some((receipt, state)) = cached_data { + self.execute_with_cached_data(transaction, receipt, state, idx, effective_gas_price) + } else { + self.execute_with_evm(transaction, idx, effective_gas_price) + } + } + + /// Builds transaction result from cached receipt and state data. + fn execute_with_cached_data( + &mut self, + transaction: Recovered, + receipt: OpTransactionReceipt, + state: EvmState, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + + let mut touched_balances = HashMap::default(); + + for (address, account) in state.iter() { + if account.is_touched() { + touched_balances.insert(*address, account.info.balance); + } + } + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(receipt.inner.gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + self.next_log_index += receipt.inner.logs().len(); + + Ok(ExecutedPendingTransaction { rpc_transaction, receipt, state, touched_balances }) + } + + /// Executes the transaction through the EVM and builds the result from scratch. + fn execute_with_evm( + &mut self, + transaction: Recovered, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + match self.evm.transact(transaction.clone()) { + Ok(ResultAndState { state, result }) => { + let gas_used = result.gas_used(); + let mut touched_balances = HashMap::default(); + for (addr, acc) in &state { + if acc.is_touched() { + touched_balances.insert(*addr, acc.info.balance); + } + + let existing_override = self.state_overrides.entry(*addr).or_default(); + existing_override.balance = Some(acc.info.balance); + existing_override.nonce = Some(acc.info.nonce); + existing_override.code = acc.info.code.clone().map(|code| code.bytes()); + + let existing = existing_override.state_diff.get_or_insert(Default::default()); + let changed_slots = acc + .storage + .iter() + .map(|(&key, slot)| (B256::from(key), B256::from(slot.present_value))); + + existing.extend(changed_slots); + } + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + + let is_canyon_active = + self.chain_spec.is_canyon_active_at_timestamp(self.pending_block.timestamp); + + let is_regolith_active = + self.chain_spec.is_regolith_active_at_timestamp(self.pending_block.timestamp); + + let receipt = match self.receipt_builder.build_receipt(ReceiptBuilderCtx { + tx: &transaction, + evm: &mut self.evm, + result, + state: &state, + cumulative_gas_used: self.cumulative_gas_used, + }) { + Ok(receipt) => receipt, + Err(ctx) => { + // This is a deposit transaction, so build the receipt from the context + let receipt = alloy_consensus::Receipt { + status: Eip658Value::Eip658(ctx.result.is_success()), + cumulative_gas_used: ctx.cumulative_gas_used, + logs: ctx.result.into_logs(), + }; + + let deposit_nonce = (is_regolith_active && transaction.is_deposit()) + .then(|| { + self.evm + .db_mut() + .basic(transaction.signer()) + .map(|acc| acc.unwrap_or_default().nonce) + }) + .transpose() + .map_err(|_| eyre!("failed to load cache account for depositor"))?; + + self.receipt_builder.build_deposit_receipt(OpDepositReceipt { + inner: receipt, + deposit_nonce, + deposit_receipt_version: is_canyon_active.then_some(1), + }) + } + }; + + let meta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: B256::ZERO, // block hash is not available yet for flashblocks + block_number: self.pending_block.number, + base_fee: self.pending_block.base_fee_per_gas, + excess_blob_gas: self.pending_block.excess_blob_gas, + timestamp: self.pending_block.timestamp, + }; + + let sender = transaction.signer(); + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(&transaction, sender), + gas_used, + next_log_index: self.next_log_index, + meta, + }; + + let op_receipt = + OpRpcReceiptBuilder::new(&self.chain_spec, input, &mut self.l1_block_info) + .unwrap() + .build(); + self.next_log_index += receipt.logs().len(); + + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = op_receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + self.evm.db_mut().commit(state.clone()); + + Ok(ExecutedPendingTransaction { + rpc_transaction, + receipt: op_receipt, + state, + touched_balances, + }) + } + Err(e) => Err(eyre!( + "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", + e, + tx_hash, + transaction.signer() + )), + } + } +} + /// Processes flashblocks and canonical blocks to keep pending state updated. #[derive(Debug, Clone)] pub struct StateProcessor { @@ -360,9 +643,10 @@ where }; let block: OpBlock = execution_payload.try_into_block()?; - let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; - let header = block.header.clone().seal_slow(); - pending_blocks_builder.with_header(header.clone()); + let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; + let block_header = block.header.clone(); // prevents us from needing to clone the entire block + let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks + pending_blocks_builder.with_header(sealed_header); let block_env_attributes = OpNextBlockEnvAttributes { timestamp: base.timestamp, @@ -374,18 +658,16 @@ where }; let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; - let mut evm = evm_config.evm_with_env(db, evm_env); - - let mut cumulative_gas_used: u64 = 0; - let mut next_log_index = 0; + let evm = evm_config.evm_with_env(db, evm_env); // Parallel sender recovery - batch all ECDSA operations upfront let recovery_start = Instant::now(); - let txs_with_senders: Vec<(&OpTxEnvelope, Address)> = block + let txs_with_senders: Vec<(OpTxEnvelope, Address)> = block .body .transactions .par_iter() - .map(|tx| -> eyre::Result<(&OpTxEnvelope, Address)> { + .cloned() + .map(|tx| -> eyre::Result<(OpTxEnvelope, Address)> { let tx_hash = tx.tx_hash(); let sender = match prev_pending_blocks .as_ref() @@ -399,236 +681,44 @@ where .collect::>()?; self.metrics.sender_recovery_duration.record(recovery_start.elapsed()); + let mut pending_state_builder = PendingStateBuilder::new( + self.client.chain_spec(), + evm, + block, + prev_pending_blocks.clone(), + l1_block_info, + state_overrides, + *evm_config.block_executor_factory().receipt_builder(), + ); + for (idx, (transaction, sender)) in txs_with_senders.into_iter().enumerate() { let tx_hash = transaction.tx_hash(); pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); - let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); + let recovered_transaction = Recovered::new_unchecked(transaction, sender); - let effective_gas_price = if transaction.is_deposit() { - 0 - } else { - block - .base_fee_per_gas - .map(|base_fee| { - transaction.effective_tip_per_gas(base_fee).unwrap_or_default() - + base_fee as u128 - }) - .unwrap_or_else(|| transaction.max_fee_per_gas()) - }; - - // Check if we have all the data we need (receipt + state) - let cached_data = prev_pending_blocks.as_ref().and_then(|p| { - let receipt = p.get_receipt(tx_hash)?; - let state = p.get_transaction_state(&tx_hash)?; - Some((receipt, state)) - }); - - // If cached, we can fill out pending block data using previous execution results - // If not cached, we need to execute the transaction and build pending block data from scratch - // The `pending_blocks_builder.with*` calls should fill out the same data in both cases - // We also need to update the cumulative gas used and next log index in both cases - if let Some((receipt, state)) = cached_data { - let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { - let deposit_receipt = receipt - .inner - .inner - .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; - - (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) - } else { - (None, None) - }; - - let envelope = recovered_transaction.clone().convert::(); - let rpc_txn = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: envelope, - block_hash: Some(header.hash()), - block_number: Some(base.block_number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - - pending_blocks_builder.with_transaction(rpc_txn); - pending_blocks_builder.with_receipt(tx_hash, receipt.clone()); - - for (address, account) in state.iter() { - if account.is_touched() { - pending_blocks_builder - .with_account_balance(*address, account.info.balance); - } - } - pending_blocks_builder.with_transaction_state(tx_hash, state); + let executed_transaction = + pending_state_builder.execute_transaction(idx, recovered_transaction)?; - cumulative_gas_used = cumulative_gas_used - .checked_add(receipt.inner.gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - next_log_index += receipt.inner.logs().len(); - } else { - let envelope = recovered_transaction.clone().convert::(); - - match evm.transact(recovered_transaction.clone()) { - Ok(ResultAndState { state, result }) => { - let gas_used = result.gas_used(); - for (addr, acc) in &state { - if acc.is_touched() { - pending_blocks_builder - .with_account_balance(*addr, acc.info.balance); - } - - let existing_override = state_overrides.entry(*addr).or_default(); - existing_override.balance = Some(acc.info.balance); - existing_override.nonce = Some(acc.info.nonce); - existing_override.code = - acc.info.code.clone().map(|code| code.bytes()); - - let existing = - existing_override.state_diff.get_or_insert(Default::default()); - let changed_slots = acc.storage.iter().map(|(&key, slot)| { - (B256::from(key), B256::from(slot.present_value)) - }); - - existing.extend(changed_slots); - } + pending_blocks_builder.with_transaction(executed_transaction.rpc_transaction); + pending_blocks_builder.with_receipt(tx_hash, executed_transaction.receipt); + pending_blocks_builder.with_transaction_state(tx_hash, executed_transaction.state); - cumulative_gas_used = cumulative_gas_used - .checked_add(gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - - let receipt_builder = - evm_config.block_executor_factory().receipt_builder(); - - let is_canyon_active = self - .client - .chain_spec() - .is_canyon_active_at_timestamp(block.timestamp); - - let is_regolith_active = self - .client - .chain_spec() - .is_regolith_active_at_timestamp(block.timestamp); - - let receipt = match receipt_builder.build_receipt(ReceiptBuilderCtx { - tx: &recovered_transaction, - evm: &evm, - result, - state: &state, - cumulative_gas_used, - }) { - Ok(receipt) => receipt, - Err(ctx) => { - // This is a deposit transaction, so build the receipt from the context - let receipt = alloy_consensus::Receipt { - status: Eip658Value::Eip658(ctx.result.is_success()), - cumulative_gas_used: ctx.cumulative_gas_used, - logs: ctx.result.into_logs(), - }; - - let deposit_nonce = (is_regolith_active - && transaction.is_deposit()) - .then(|| { - evm.db_mut() - .load_account(recovered_transaction.signer()) - .map(|acc| acc.info.nonce) - }) - .transpose() - .map_err(|_| { - eyre!("failed to load cache account for depositor") - })?; - - receipt_builder.build_deposit_receipt(OpDepositReceipt { - inner: receipt, - deposit_nonce, - deposit_receipt_version: is_canyon_active.then_some(1), - }) - } - }; - - let meta = TransactionMeta { - tx_hash, - index: idx as u64, - block_hash: header.hash(), - block_number: block.number, - base_fee: block.base_fee_per_gas, - excess_blob_gas: block.excess_blob_gas, - timestamp: block.timestamp, - }; - - let input: ConvertReceiptInput<'_, OpPrimitives> = - ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(transaction, sender), - gas_used, - next_log_index, - meta, - }; - - let op_receipt = OpRpcReceiptBuilder::new( - self.client.chain_spec().as_ref(), - input, - &mut l1_block_info, - ) - .unwrap() - .build(); - next_log_index += receipt.logs().len(); - - let (deposit_receipt_version, deposit_nonce) = - if transaction.is_deposit() { - let deposit_receipt = - op_receipt.inner.inner.as_deposit_receipt().ok_or( - eyre!("deposit transaction, non deposit receipt"), - )?; - - ( - deposit_receipt.deposit_receipt_version, - deposit_receipt.deposit_nonce, - ) - } else { - (None, None) - }; - - let rpc_txn = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: envelope, - block_hash: Some(header.hash()), - block_number: Some(base.block_number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - - pending_blocks_builder.with_transaction(rpc_txn); - pending_blocks_builder.with_receipt(tx_hash, op_receipt); - pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); - evm.db_mut().commit(state); - } - Err(e) => { - return Err(eyre!( - "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", - e, - tx_hash, - sender - )); - } - } + for (address, balance) in executed_transaction.touched_balances { + pending_blocks_builder.with_account_balance(address, balance); } } - db = evm.into_db(); - last_block_header = block.header.clone(); + db = pending_state_builder.evm.into_db(); + last_block_header = block_header; + state_overrides = pending_state_builder.state_overrides; } - pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); + pending_blocks_builder.with_db_cache(db.cache); + Ok(Some(Arc::new(pending_blocks_builder.build()?))) } From ac294aa6ee624725322f1114365feed0cceedae4 Mon Sep 17 00:00:00 2001 From: Julian Meyer Date: Thu, 8 Jan 2026 04:37:51 +0000 Subject: [PATCH 2/3] Remove unnecessary clone of transaction --- crates/flashblocks/src/processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index b5f02914..d301c828 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -200,7 +200,7 @@ where ) -> eyre::Result { let tx_hash = transaction.tx_hash(); - match self.evm.transact(transaction.clone()) { + match self.evm.transact(&transaction) { Ok(ResultAndState { state, result }) => { let gas_used = result.gas_used(); let mut touched_balances = HashMap::default(); From 6f6e5d606a7144b54d3642345379c0865ac75fe3 Mon Sep 17 00:00:00 2001 From: Julian Meyer Date: Thu, 8 Jan 2026 04:47:00 +0000 Subject: [PATCH 3/3] Move to separate module --- crates/flashblocks/src/lib.rs | 3 + crates/flashblocks/src/processor.rs | 323 ++---------------------- crates/flashblocks/src/state_builder.rs | 299 ++++++++++++++++++++++ 3 files changed, 319 insertions(+), 306 deletions(-) create mode 100644 crates/flashblocks/src/state_builder.rs diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 12a36fe2..b9bec3e5 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -20,3 +20,6 @@ pub use subscription::FlashblocksSubscriber; mod traits; pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI}; + +mod state_builder; +pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder}; diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index d301c828..5524383d 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -7,44 +7,34 @@ use std::{ }; use alloy_consensus::{ - Block, Eip658Value, Header, TxReceipt, - transaction::{Recovered, SignerRecoverable, TransactionMeta}, + Header, + transaction::{Recovered, SignerRecoverable}, }; use alloy_eips::BlockNumberOrTag; -use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; -use alloy_primitives::{Address, B256, BlockNumber, Bytes, U256, map::HashMap}; -use alloy_rpc_types::{TransactionTrait, Withdrawal}; +use alloy_primitives::{Address, B256, BlockNumber, Bytes}; +use alloy_rpc_types::Withdrawal; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; use alloy_rpc_types_eth::state::StateOverride; use arc_swap::ArcSwapOption; use base_flashtypes::Flashblock; use eyre::eyre; -use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; +use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::TransactionResponse; -use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use rayon::prelude::*; use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, providers::{BlockReaderIdExt, StateProviderFactory}, - revm::{ - Database, DatabaseCommit, State, context::result::ResultAndState, - database::StateProviderDatabase, db::CacheDB, state::EvmState, - }, -}; -use reth_evm::{ - ConfigureEvm, Evm, FromRecoveredTx, eth::receipt_builder::ReceiptBuilderCtx, - op_revm::L1BlockInfo, + revm::{State, database::StateProviderDatabase, db::CacheDB}, }; +use reth_evm::ConfigureEvm; use reth_optimism_chainspec::OpHardforks; -use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes, OpRethReceiptBuilder}; -use reth_optimism_primitives::{OpBlock, OpPrimitives}; -use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_primitives::OpBlock; use reth_primitives::RecoveredBlock; -use reth_rpc_convert::transaction::ConvertReceiptInput; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; use tracing::{debug, error, info, warn}; -use crate::{Metrics, PendingBlocks, PendingBlocksBuilder}; +use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder}; /// Messages consumed by the state processor. #[derive(Debug, Clone)] @@ -55,286 +45,6 @@ pub enum StateUpdate { Flashblock(Flashblock), } -struct ExecutedPendingTransaction { - rpc_transaction: Transaction, - receipt: OpTransactionReceipt, - state: EvmState, - touched_balances: HashMap, -} - -/// Executes or fetches cached values for transactions in a flashblock. -struct PendingStateBuilder { - cumulative_gas_used: u64, - next_log_index: usize, - - evm: E, - pending_block: Block, - l1_block_info: L1BlockInfo, - chain_spec: ChainSpec, - receipt_builder: OpRethReceiptBuilder, - - prev_pending_blocks: Option>, - state_overrides: StateOverride, -} - -impl PendingStateBuilder -where - E: Evm, - E::DB: DatabaseCommit + Database, - E::Tx: FromRecoveredTx, - ChainSpec: OpHardforks, -{ - const fn new( - chain_spec: ChainSpec, - evm: E, - pending_block: Block, - prev_pending_blocks: Option>, - l1_block_info: L1BlockInfo, - state_overrides: StateOverride, - receipt_builder: OpRethReceiptBuilder, - ) -> Self { - Self { - pending_block, - evm, - cumulative_gas_used: 0, - next_log_index: 0, - prev_pending_blocks, - l1_block_info, - state_overrides, - chain_spec, - receipt_builder, - } - } - - fn execute_transaction( - &mut self, - idx: usize, - transaction: Recovered, - ) -> eyre::Result { - let tx_hash = transaction.tx_hash(); - - let effective_gas_price = if transaction.is_deposit() { - 0 - } else { - self.pending_block - .base_fee_per_gas - .map(|base_fee| { - transaction.effective_tip_per_gas(base_fee).unwrap_or_default() - + base_fee as u128 - }) - .unwrap_or_else(|| transaction.max_fee_per_gas()) - }; - - // Check if we have all the data we need (receipt + state) - let cached_data = self.prev_pending_blocks.as_ref().and_then(|p| { - let receipt = p.get_receipt(tx_hash)?; - let state = p.get_transaction_state(&tx_hash)?; - Some((receipt, state)) - }); - - // If cached, we can fill out pending block data using previous execution results - // If not cached, we need to execute the transaction and build pending block data from scratch - if let Some((receipt, state)) = cached_data { - self.execute_with_cached_data(transaction, receipt, state, idx, effective_gas_price) - } else { - self.execute_with_evm(transaction, idx, effective_gas_price) - } - } - - /// Builds transaction result from cached receipt and state data. - fn execute_with_cached_data( - &mut self, - transaction: Recovered, - receipt: OpTransactionReceipt, - state: EvmState, - idx: usize, - effective_gas_price: u128, - ) -> eyre::Result { - let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { - let deposit_receipt = receipt - .inner - .inner - .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; - - (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) - } else { - (None, None) - }; - - let rpc_transaction = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: transaction, - block_hash: None, - block_number: Some(self.pending_block.number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - - let mut touched_balances = HashMap::default(); - - for (address, account) in state.iter() { - if account.is_touched() { - touched_balances.insert(*address, account.info.balance); - } - } - - self.cumulative_gas_used = self - .cumulative_gas_used - .checked_add(receipt.inner.gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - self.next_log_index += receipt.inner.logs().len(); - - Ok(ExecutedPendingTransaction { rpc_transaction, receipt, state, touched_balances }) - } - - /// Executes the transaction through the EVM and builds the result from scratch. - fn execute_with_evm( - &mut self, - transaction: Recovered, - idx: usize, - effective_gas_price: u128, - ) -> eyre::Result { - let tx_hash = transaction.tx_hash(); - - match self.evm.transact(&transaction) { - Ok(ResultAndState { state, result }) => { - let gas_used = result.gas_used(); - let mut touched_balances = HashMap::default(); - for (addr, acc) in &state { - if acc.is_touched() { - touched_balances.insert(*addr, acc.info.balance); - } - - let existing_override = self.state_overrides.entry(*addr).or_default(); - existing_override.balance = Some(acc.info.balance); - existing_override.nonce = Some(acc.info.nonce); - existing_override.code = acc.info.code.clone().map(|code| code.bytes()); - - let existing = existing_override.state_diff.get_or_insert(Default::default()); - let changed_slots = acc - .storage - .iter() - .map(|(&key, slot)| (B256::from(key), B256::from(slot.present_value))); - - existing.extend(changed_slots); - } - - self.cumulative_gas_used = self - .cumulative_gas_used - .checked_add(gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - - let is_canyon_active = - self.chain_spec.is_canyon_active_at_timestamp(self.pending_block.timestamp); - - let is_regolith_active = - self.chain_spec.is_regolith_active_at_timestamp(self.pending_block.timestamp); - - let receipt = match self.receipt_builder.build_receipt(ReceiptBuilderCtx { - tx: &transaction, - evm: &mut self.evm, - result, - state: &state, - cumulative_gas_used: self.cumulative_gas_used, - }) { - Ok(receipt) => receipt, - Err(ctx) => { - // This is a deposit transaction, so build the receipt from the context - let receipt = alloy_consensus::Receipt { - status: Eip658Value::Eip658(ctx.result.is_success()), - cumulative_gas_used: ctx.cumulative_gas_used, - logs: ctx.result.into_logs(), - }; - - let deposit_nonce = (is_regolith_active && transaction.is_deposit()) - .then(|| { - self.evm - .db_mut() - .basic(transaction.signer()) - .map(|acc| acc.unwrap_or_default().nonce) - }) - .transpose() - .map_err(|_| eyre!("failed to load cache account for depositor"))?; - - self.receipt_builder.build_deposit_receipt(OpDepositReceipt { - inner: receipt, - deposit_nonce, - deposit_receipt_version: is_canyon_active.then_some(1), - }) - } - }; - - let meta = TransactionMeta { - tx_hash, - index: idx as u64, - block_hash: B256::ZERO, // block hash is not available yet for flashblocks - block_number: self.pending_block.number, - base_fee: self.pending_block.base_fee_per_gas, - excess_blob_gas: self.pending_block.excess_blob_gas, - timestamp: self.pending_block.timestamp, - }; - - let sender = transaction.signer(); - let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(&transaction, sender), - gas_used, - next_log_index: self.next_log_index, - meta, - }; - - let op_receipt = - OpRpcReceiptBuilder::new(&self.chain_spec, input, &mut self.l1_block_info) - .unwrap() - .build(); - self.next_log_index += receipt.logs().len(); - - let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { - let deposit_receipt = op_receipt - .inner - .inner - .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; - - (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) - } else { - (None, None) - }; - - let rpc_transaction = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: transaction, - block_hash: None, - block_number: Some(self.pending_block.number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - self.evm.db_mut().commit(state.clone()); - - Ok(ExecutedPendingTransaction { - rpc_transaction, - receipt: op_receipt, - state, - touched_balances, - }) - } - Err(e) => Err(eyre!( - "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", - e, - tx_hash, - transaction.signer() - )), - } - } -} - /// Processes flashblocks and canonical blocks to keep pending state updated. #[derive(Debug, Clone)] pub struct StateProcessor { @@ -702,18 +412,19 @@ where let executed_transaction = pending_state_builder.execute_transaction(idx, recovered_transaction)?; + for (address, account) in executed_transaction.state.iter() { + if account.is_touched() { + pending_blocks_builder.with_account_balance(*address, account.info.balance); + } + } + pending_blocks_builder.with_transaction(executed_transaction.rpc_transaction); pending_blocks_builder.with_receipt(tx_hash, executed_transaction.receipt); pending_blocks_builder.with_transaction_state(tx_hash, executed_transaction.state); - - for (address, balance) in executed_transaction.touched_balances { - pending_blocks_builder.with_account_balance(address, balance); - } } - db = pending_state_builder.evm.into_db(); + (db, state_overrides) = pending_state_builder.into_db_and_state_overrides(); last_block_header = block_header; - state_overrides = pending_state_builder.state_overrides; } pending_blocks_builder.with_state_overrides(state_overrides); diff --git a/crates/flashblocks/src/state_builder.rs b/crates/flashblocks/src/state_builder.rs new file mode 100644 index 00000000..3505bf67 --- /dev/null +++ b/crates/flashblocks/src/state_builder.rs @@ -0,0 +1,299 @@ +use std::sync::Arc; + +use alloy_consensus::{ + Block, Eip658Value, Header, TxReceipt, + transaction::{Recovered, TransactionMeta}, +}; +use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; +use alloy_primitives::B256; +use alloy_rpc_types::TransactionTrait; +use alloy_rpc_types_eth::state::StateOverride; +use eyre::eyre; +use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; +use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; +use reth::revm::{Database, DatabaseCommit, context::result::ResultAndState, state::EvmState}; +use reth_evm::{ + Evm, FromRecoveredTx, eth::receipt_builder::ReceiptBuilderCtx, op_revm::L1BlockInfo, +}; +use reth_optimism_chainspec::OpHardforks; +use reth_optimism_evm::OpRethReceiptBuilder; +use reth_optimism_primitives::OpPrimitives; +use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; +use reth_rpc_convert::transaction::ConvertReceiptInput; + +use crate::PendingBlocks; + +/// Represents the result of executing or fetching a cached pending transaction. +#[derive(Debug, Clone)] +pub struct ExecutedPendingTransaction { + /// The RPC transaction. + pub rpc_transaction: Transaction, + /// The receipt of the transaction. + pub receipt: OpTransactionReceipt, + /// The updated EVM state. + pub state: EvmState, +} + +/// Executes or fetches cached values for transactions in a flashblock. +#[derive(Debug)] +pub struct PendingStateBuilder { + cumulative_gas_used: u64, + next_log_index: usize, + + evm: E, + pending_block: Block, + l1_block_info: L1BlockInfo, + chain_spec: ChainSpec, + receipt_builder: OpRethReceiptBuilder, + + prev_pending_blocks: Option>, + state_overrides: StateOverride, +} + +impl PendingStateBuilder +where + E: Evm, + DB: Database + DatabaseCommit, + E::Tx: FromRecoveredTx, + ChainSpec: OpHardforks, +{ + /// Creates a new pending state builder. + pub const fn new( + chain_spec: ChainSpec, + evm: E, + pending_block: Block, + prev_pending_blocks: Option>, + l1_block_info: L1BlockInfo, + state_overrides: StateOverride, + receipt_builder: OpRethReceiptBuilder, + ) -> Self { + Self { + pending_block, + evm, + cumulative_gas_used: 0, + next_log_index: 0, + prev_pending_blocks, + l1_block_info, + state_overrides, + chain_spec, + receipt_builder, + } + } + + /// Consumes the builder and returns the database and state overrides. + pub fn into_db_and_state_overrides(self) -> (DB, StateOverride) { + (self.evm.into_db(), self.state_overrides) + } + + /// Executes a single transaction and updates internal state. + /// Should be called in order for each transaction. + pub fn execute_transaction( + &mut self, + idx: usize, + transaction: Recovered, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + let effective_gas_price = if transaction.is_deposit() { + 0 + } else { + self.pending_block + .base_fee_per_gas + .map(|base_fee| { + transaction.effective_tip_per_gas(base_fee).unwrap_or_default() + + base_fee as u128 + }) + .unwrap_or_else(|| transaction.max_fee_per_gas()) + }; + + // Check if we have all the data we need (receipt + state) + let cached_data = self.prev_pending_blocks.as_ref().and_then(|p| { + let receipt = p.get_receipt(tx_hash)?; + let state = p.get_transaction_state(&tx_hash)?; + Some((receipt, state)) + }); + + // If cached, we can fill out pending block data using previous execution results + // If not cached, we need to execute the transaction and build pending block data from scratch + if let Some((receipt, state)) = cached_data { + self.execute_with_cached_data(transaction, receipt, state, idx, effective_gas_price) + } else { + self.execute_with_evm(transaction, idx, effective_gas_price) + } + } + + /// Builds transaction result from cached receipt and state data. + fn execute_with_cached_data( + &mut self, + transaction: Recovered, + receipt: OpTransactionReceipt, + state: EvmState, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(receipt.inner.gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + self.next_log_index += receipt.inner.logs().len(); + + Ok(ExecutedPendingTransaction { rpc_transaction, receipt, state }) + } + + /// Executes the transaction through the EVM and builds the result from scratch. + fn execute_with_evm( + &mut self, + transaction: Recovered, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + match self.evm.transact(&transaction) { + Ok(ResultAndState { state, result }) => { + let gas_used = result.gas_used(); + for (addr, acc) in &state { + let existing_override = self.state_overrides.entry(*addr).or_default(); + existing_override.balance = Some(acc.info.balance); + existing_override.nonce = Some(acc.info.nonce); + existing_override.code = acc.info.code.clone().map(|code| code.bytes()); + + let existing = existing_override.state_diff.get_or_insert(Default::default()); + let changed_slots = acc + .storage + .iter() + .map(|(&key, slot)| (B256::from(key), B256::from(slot.present_value))); + + existing.extend(changed_slots); + } + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + + let is_canyon_active = + self.chain_spec.is_canyon_active_at_timestamp(self.pending_block.timestamp); + + let is_regolith_active = + self.chain_spec.is_regolith_active_at_timestamp(self.pending_block.timestamp); + + let receipt = match self.receipt_builder.build_receipt(ReceiptBuilderCtx { + tx: &transaction, + evm: &mut self.evm, + result, + state: &state, + cumulative_gas_used: self.cumulative_gas_used, + }) { + Ok(receipt) => receipt, + Err(ctx) => { + // This is a deposit transaction, so build the receipt from the context + let receipt = alloy_consensus::Receipt { + status: Eip658Value::Eip658(ctx.result.is_success()), + cumulative_gas_used: ctx.cumulative_gas_used, + logs: ctx.result.into_logs(), + }; + + let deposit_nonce = (is_regolith_active && transaction.is_deposit()) + .then(|| { + self.evm + .db_mut() + .basic(transaction.signer()) + .map(|acc| acc.unwrap_or_default().nonce) + }) + .transpose() + .map_err(|_| eyre!("failed to load cache account for depositor"))?; + + self.receipt_builder.build_deposit_receipt(OpDepositReceipt { + inner: receipt, + deposit_nonce, + deposit_receipt_version: is_canyon_active.then_some(1), + }) + } + }; + + let meta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: B256::ZERO, // block hash is not available yet for flashblocks + block_number: self.pending_block.number, + base_fee: self.pending_block.base_fee_per_gas, + excess_blob_gas: self.pending_block.excess_blob_gas, + timestamp: self.pending_block.timestamp, + }; + + let sender = transaction.signer(); + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(&transaction, sender), + gas_used, + next_log_index: self.next_log_index, + meta, + }; + + let op_receipt = + OpRpcReceiptBuilder::new(&self.chain_spec, input, &mut self.l1_block_info) + .unwrap() + .build(); + self.next_log_index += receipt.logs().len(); + + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = op_receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + self.evm.db_mut().commit(state.clone()); + + Ok(ExecutedPendingTransaction { rpc_transaction, receipt: op_receipt, state }) + } + Err(e) => Err(eyre!( + "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", + e, + tx_hash, + transaction.signer() + )), + } + } +}