diff --git a/Cargo.lock b/Cargo.lock index f35228950..bab6c6041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7696,6 +7696,7 @@ dependencies = [ "alloy", "clap", "common", + "contract-bindings", "ctor", "derivative", "eyre", diff --git a/shared/src/types.rs b/shared/src/types.rs index ccff3f357..2a135b98a 100644 --- a/shared/src/types.rs +++ b/shared/src/types.rs @@ -4,7 +4,7 @@ use alloy::{ consensus::Eip658Value, hex, network::EthereumWallet, - primitives::{Address, Bloom, Log, B256}, + primitives::{Address, Bloom, Bytes, FixedBytes, Log, B256, U256}, providers::{ fillers::{ BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, @@ -12,12 +12,12 @@ use alloy::{ }, Identity, ProviderBuilder, RootProvider, }, - rpc::types::Block, signers::local::PrivateKeySigner, }; use async_trait::async_trait; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::{ + collections::HashMap, fmt::{Display, Formatter}, str::FromStr as _, }; @@ -28,11 +28,14 @@ pub trait GetBlockRef { fn block_ref(&self) -> &BlockRef; } +/// auxiliary data for delayed messages in a hashmap by sequence number to msg contents +pub type DelayedMsgsData = HashMap; + /// A trait for building blocks from the sequencing and settlement chains. #[async_trait] pub trait BlockBuilder: Send { /// Process a single slot - fn build_block(&self, block: &PartialBlock) -> eyre::Result; + fn build_block(&self, block: &PartialBlock, msgs_data: DelayedMsgsData) -> eyre::Result; } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] @@ -86,27 +89,16 @@ pub struct Receipt { } /// `PartialBlock` contains block transactions, event logs, and metadata -#[allow(missing_docs)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] pub struct PartialBlock { + /// reference to the block that this partial block was built from pub block_ref: BlockRef, + /// hash of the parent block pub parent_hash: B256, + /// log data pub logs: Vec, -} - -/// Convert a block and its receipts to a `PartialBlock` -pub fn convert_block_to_partial_block(block: &Block, receipts: &[Receipt]) -> PartialBlock { - let filtered_logs: Vec = - receipts.iter().flat_map(|receipt| receipt.logs.clone()).collect(); - PartialBlock { - block_ref: BlockRef { - number: block.header.number, - hash: block.header.hash, - timestamp: block.header.timestamp, - }, - parent_hash: block.header.parent_hash, - logs: filtered_logs, - } + /// associated transaction hashes for each log + pub log_tx_hashes: Vec>, } impl GetBlockRef for PartialBlock { diff --git a/shared/test-utils/src/nitro_chain.rs b/shared/test-utils/src/nitro_chain.rs index 37bab0dcc..df15e0a30 100644 --- a/shared/test-utils/src/nitro_chain.rs +++ b/shared/test-utils/src/nitro_chain.rs @@ -4,7 +4,7 @@ use crate::{chain_info::PRIVATE_KEY, docker::E2EProcess}; use alloy::{ consensus::{EthereumTxEnvelope, TxEip4844Variant}, network::TransactionBuilder, - primitives::{address, Address, Bytes, B256, U256}, + primitives::{address, Address, Bytes, B256, U160, U256}, providers::{Provider, WalletProvider}, }; use contract_bindings::synd::{ @@ -126,6 +126,19 @@ pub struct NitroBlock { pub timestamp: U256, } +const L1_TO_L2_ALIAS_OFFSET: Address = address!("0x1111000000000000000000000000000000001111"); +/// Computes the L2 alias of an L1 address. +/// +/// When a contract on L1 sends a message to L2 via the Inbox, the sender address +/// on L2 is aliased by adding this offset. This prevents address collisions and +/// distinguishes L1-originated messages from native L2 messages. +pub fn apply_l1_to_l2_alias(l1_address: Address) -> Address { + Address::from( + U160::from_be_slice(&l1_address[..]) + .wrapping_add(U160::from_be_slice(&L1_TO_L2_ALIAS_OFFSET[..])), + ) +} + pub async fn init_withdrawal_tx( to_address: Address, withdrawal_value: U256, @@ -147,51 +160,68 @@ pub async fn init_withdrawal_tx( Ok(tx) } -pub async fn execute_withdrawal( - to_address: Address, - withdrawal_value: U256, - bridge_address: Address, - settlement_provider: &FilledProvider, - appchain_provider: &FilledProvider, -) -> eyre::Result<()> { +pub struct ExecuteWithdrawalParams<'a> { + pub to_address: Address, + pub withdrawal_value: U256, + pub appchain_block_hash_to_prove: B256, + pub bridge_address: Address, + pub settlement_provider: &'a FilledProvider, + pub appchain_provider: &'a FilledProvider, + pub l2_sender: Address, + pub send_root_size: u64, + pub withdrawal_position: u64, +} + +pub async fn execute_withdrawal(params: ExecuteWithdrawalParams<'_>) { // Generate proof - let node_interface = NodeInterface::new(NODE_INTERFACE_PRECOMPILE_ADDRESS, &appchain_provider); - let proof = node_interface.constructOutboxProof(1, 0).call().await?; + let node_interface = + NodeInterface::new(NODE_INTERFACE_PRECOMPILE_ADDRESS, ¶ms.appchain_provider); + let proof = node_interface + .constructOutboxProof(params.send_root_size, params.withdrawal_position) + .call() + .await + .unwrap(); // Execute withdrawal - let bridge = IBridge::new(bridge_address, &settlement_provider); + let bridge = IBridge::new(params.bridge_address, ¶ms.settlement_provider); let outbox = IOutbox::new( - IRollupCore::new(bridge.rollup().call().await?, &settlement_provider) + IRollupCore::new(bridge.rollup().call().await.unwrap(), ¶ms.settlement_provider) .outbox() .call() - .await?, - &settlement_provider, + .await + .unwrap(), + ¶ms.settlement_provider, ); - let block: NitroBlock = - appchain_provider.raw_request("eth_getBlockByNumber".into(), ("latest", false)).await?; + let block: NitroBlock = params + .appchain_provider + .raw_request("eth_getBlockByHash".into(), (params.appchain_block_hash_to_prove, false)) + .await + .unwrap(); let _ = outbox .executeTransaction( - proof.proof, // proof - U256::from(0), // index - settlement_provider.default_signer_address(), // l2Sender - to_address, // to - block.number, // l2Block, - block.l1_block_number, // l1Block, - block.timestamp, // l2Timestamp, - withdrawal_value, // value - Bytes::new(), // data (always empty) + proof.proof, // proof + U256::from(params.withdrawal_position), // index + params.l2_sender, // l2Sender + params.to_address, // to + block.number, // l2Block, + block.l1_block_number, // l1Block, + block.timestamp, // l2Timestamp, + params.withdrawal_value, // value + Bytes::new(), // data (always empty) ) - // NOTE: manually setting the nonce shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually setting the nonce shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 .nonce( - settlement_provider - .get_transaction_count(settlement_provider.default_signer_address()) - .await?, + params + .settlement_provider + .get_transaction_count(params.settlement_provider.default_signer_address()) + .await + .unwrap(), ) .send() - .await?; - Ok(()) + .await + .unwrap(); } #[allow(dead_code)] diff --git a/synd-chain-ingestor/Cargo.toml b/synd-chain-ingestor/Cargo.toml index f91fabc7c..176cdebbc 100644 --- a/synd-chain-ingestor/Cargo.toml +++ b/synd-chain-ingestor/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true [dependencies] alloy = { workspace = true, features = ["provider-ws"] } clap = { workspace = true, features = ["derive", "env"] } +contract-bindings = { workspace = true } derivative = { workspace = true } eyre = { workspace = true } fs2 = { workspace = true } diff --git a/synd-chain-ingestor/src/client.rs b/synd-chain-ingestor/src/client.rs index f2d307430..1469791ea 100644 --- a/synd-chain-ingestor/src/client.rs +++ b/synd-chain-ingestor/src/client.rs @@ -2,9 +2,17 @@ use crate::{db::ITEM_SIZE, eth_client::EthClient, server::Message}; use alloy::{ + consensus::Transaction, eips::BlockNumberOrTag, - primitives::{Address, Bytes, B256}, + primitives::{Address, Bytes, B256, U256}, rpc::types::Filter, + sol_types::{SolCall, SolEvent as _}, +}; +use contract_bindings::synd::{ + i_delayed_message_provider::IDelayedMessageProvider::{ + InboxMessageDelivered, InboxMessageDeliveredFromOrigin, + }, + i_inbox::IInbox::sendL2MessageFromOriginCall, }; use eyre::{eyre, OptionExt as _}; use futures_util::{ @@ -23,25 +31,19 @@ use jsonrpsee::{ use serde::de::DeserializeOwned; use shared::{ tracing::SpanKind, - types::{BlockBuilder, BlockRef, GetBlockRef, PartialBlock}, + types::{BlockBuilder, BlockRef, DelayedMsgsData, GetBlockRef, PartialBlock}, }; -use std::{ - collections::{HashSet, VecDeque}, - future::Future, - pin::Pin, - sync::Arc, - time::Duration, -}; -use tracing::{info, instrument}; +use std::{collections::VecDeque, future::Future, pin::Pin, sync::Arc, time::Duration}; +use tracing::{info, instrument, trace}; /// Uses the [`EthClient`] to fetch log data for blocks in a range and combines them with raw /// (timestamp, block hash) data from the db to build partial blocks #[allow(clippy::unwrap_used, clippy::cognitive_complexity)] -async fn build_partial_blocks( +async fn build_partial_blocks_from_init_requests( start_block: u64, data: &IndexedBlockData, client: &EthClient, - addrs: Vec
, + addrs: &[Address], ) -> eyre::Result> { let count = data.count(); let mut blocks = Vec::default(); @@ -56,6 +58,7 @@ async fn build_partial_blocks( block_ref: BlockRef { number: i, timestamp, hash }, parent_hash, logs: Default::default(), + log_tx_hashes: Default::default(), }); parent_hash = hash; } @@ -69,7 +72,9 @@ async fn build_partial_blocks( info!("fetching partial logs from blocks {} to {}", start_block, end_block); let mut logs = client - .get_logs(&Filter::new().address(addrs.clone()).from_block(start_block).to_block(end_block)) + .get_logs( + &Filter::new().address(addrs.to_vec()).from_block(start_block).to_block(end_block), + ) .await?; if let Some(log) = logs.last() { @@ -91,6 +96,7 @@ async fn build_partial_blocks( // Fetch all logs for unsafe blocks. This makes it more likely that a log is included which // contains block hash info with it. let first_unsafe_block = safe_block + 1; + info!("fetching full logs from blocks {} to {}", first_unsafe_block, end_block); let mut unsafe_logs = client .get_logs(&Filter::new().from_block(first_unsafe_block).to_block(end_block)) @@ -107,11 +113,7 @@ async fn build_partial_blocks( blocks[(safe_block - start_block) as usize].block_ref.hash )); } - let mut addr_set = HashSet::new(); - for addr in &addrs { - addr_set.insert(addr); - } - unsafe_logs.retain(|x| addr_set.contains(&x.address())); + unsafe_logs.retain(|x| addrs.contains(&x.address())); logs.append(&mut unsafe_logs); } @@ -122,7 +124,7 @@ async fn build_partial_blocks( let mut block_logs = client .get_logs( &Filter::new() - .address(addrs.clone()) + .address(addrs.to_vec()) .at_block_hash(blocks[(i - start_block) as usize].block_ref.hash), ) .await?; @@ -130,22 +132,68 @@ async fn build_partial_blocks( } } - let mut block = start_block - 1; - let mut index = 0; + let mut prev_block_index = start_block - 1; + let mut prev_log_index = 0; for log in logs { assert!(!log.removed); let log_block = log.block_number.unwrap(); assert_eq!(log.block_hash, Some(blocks[(log_block - start_block) as usize].block_ref.hash)); let log_index = log.log_index.unwrap(); - assert!(log_block > block || (log_block == block && log_index > index), "out of order log found from rpc provider: previous (block, index) = ({block} {index}), current = ({log_block}, {log_index})"); - block = log_block; - index = log_index; - blocks[(log.block_number.unwrap() - start_block) as usize].logs.push(log.into_inner()); + assert!(log_block > prev_block_index || (log_block == prev_block_index && log_index > prev_log_index), + "out of order log found from rpc provider: previous (block, index) = ({prev_block_index} {prev_log_index}), current = ({log_block}, {log_index})"); + prev_block_index = log_block; + prev_log_index = log_index; + let block_index = (log.block_number.unwrap() - start_block) as usize; + blocks[block_index] + .log_tx_hashes + .push(log.transaction_hash.unwrap_or_else(|| panic!("log without txhash"))); + blocks[block_index].logs.push(log.into_inner()); } Ok(blocks) } +/// Obtains delayed message data for the delayed messages in the block +#[allow(clippy::cognitive_complexity)] +pub async fn delayed_msgs_data_from_partial_block( + block: &PartialBlock, + client: &EthClient, + inbox_addr: Option
, +) -> eyre::Result { + let mut result = DelayedMsgsData::new(); + + // if no inbox_addr is set, it's a seq chain block and there's no need to iterate the logs + let Some(inbox_addr) = inbox_addr else { + return Ok(result); + }; + + for (i, log) in block.logs.iter().enumerate() { + if log.address != inbox_addr { + continue; + }; + + match log.topics()[0] { + InboxMessageDelivered::SIGNATURE_HASH => { + let seq_num: U256 = log.topics()[1].into(); + let decoded = InboxMessageDelivered::abi_decode_data_validate(&log.data.data)?; + result.insert(seq_num, decoded.0); + } + InboxMessageDeliveredFromOrigin::SIGNATURE_HASH => { + let tx = client + .get_transaction_by_hash(block.log_tx_hashes[i]) + .await + .unwrap_or_else(|| panic!("tx for log not found: {:?}", log)); + + let decoded_tx = sendL2MessageFromOriginCall::abi_decode_validate(tx.input())?; + let seq_num: U256 = log.topics()[1].into(); + result.insert(seq_num, decoded_tx.messageData); + } + e => trace!("unsupported event type: {e}"), + }; + } + Ok(result) +} + struct BlockStream< S: Stream>, Block: GetBlockRef, @@ -155,7 +203,9 @@ struct BlockStream< buffer: VecDeque, block_builder: Arc, indexed_block_number: u64, - init_data: Option<(EthClient, Vec
, u64)>, + inbox_addr: Option
, + client: EthClient, + init_data: Option<(Vec
, u64)>, #[allow(clippy::type_complexity)] init_requests: VecDeque< Pin< @@ -178,22 +228,26 @@ impl< stream: S, block_builder: Arc, start_block: u64, - init_data: (EthClient, Vec
, u64), + client: EthClient, + init_data: (Vec
, u64), + inbox_addr: Option
, ) -> Self { Self { stream: Box::pin(stream.ready_chunks(1024).peekable()), block_builder, buffer: Default::default(), indexed_block_number: start_block, + client, init_data: Some(init_data), init_requests: Default::default(), + inbox_addr, } } /// Process the init message into initial requests to be processed later #[allow(clippy::unwrap_used)] async fn process_init_message(&mut self) -> eyre::Result<(), eyre::Error> { - if let Some((client, addrs, max_blocks_per_request)) = self.init_data.take() { + if let Some((addrs, max_blocks_per_request)) = self.init_data.take() { // fetch initial blocks from the stream let mut init_blocks = self.stream.next().await.ok_or_eyre("stream closed")?; // remove the first block from the stream, which is a special init message @@ -203,35 +257,32 @@ impl< // get start and end blocks for batching let mut start_block = self.indexed_block_number; - if max_blocks_per_request == 0 { - self.init_requests.push_back(Box::pin(async move { - let blocks = build_partial_blocks(start_block, &init, &client, addrs) - .await? - .into_iter() - .map(|x| Ok(Message::Block(x))) - .collect(); + let addrs_arc = Arc::new(addrs.clone()); + + let create_request = |init_data: IndexedBlockData, block_num: u64| { + let client_clone = self.client.clone(); + let addrs_clone = addrs_arc.clone(); + Box::pin(async move { + let blocks = build_partial_blocks_from_init_requests( + block_num, + &init_data, + &client_clone, + &addrs_clone, + ) + .await? + .into_iter() + .map(|x| Ok(Message::Block(x))) + .collect(); Ok(blocks) - })); + }) + }; + + if max_blocks_per_request == 0 { + self.init_requests.push_back(create_request(init, start_block)); } else { while init.count() > 0 { let (init_batch, remaining) = init.split_at(max_blocks_per_request)?; - - let client_clone = client.clone(); - let addrs_clone = addrs.clone(); - - self.init_requests.push_back(Box::pin(async move { - let blocks = build_partial_blocks( - start_block, - &init_batch, - &client_clone, - addrs_clone, - ) - .await? - .into_iter() - .map(|x| Ok(Message::Block(x))) - .collect(); - Ok(blocks) - })); + self.init_requests.push_back(create_request(init_batch, start_block)); start_block += max_blocks_per_request; init = remaining; } @@ -265,7 +316,7 @@ impl< { #[allow(clippy::unwrap_used)] async fn recv(&mut self, timestamp: u64) -> eyre::Result { - let mut blocks = vec![]; + let mut responses = vec![]; // If there is init data, handle the initial message // This happens only once, the first time this function is called @@ -274,16 +325,23 @@ impl< } if !self.init_requests.is_empty() { - blocks = self.init_requests.pop_front().unwrap().await?; + responses = self.init_requests.pop_front().unwrap().await?; } else if self.stream.as_mut().peek().now_or_never().is_some() { // If there are no init requests, and there is data in the stream, pop it off // This is to try to catch any reorgs ASAP - blocks = self.stream.next().await.ok_or_eyre("stream closed")?; + responses = self.stream.next().await.ok_or_eyre("stream closed")?; } loop { - for partial_block in blocks { - let block = self.block_builder.build_block(&partial_block?.block())?; + for resp in responses { + let partial_block = resp?.block(); + let delayed_msgs_data = delayed_msgs_data_from_partial_block( + &partial_block, + &self.client, + self.inbox_addr, + ) + .await?; + let block = self.block_builder.build_block(&partial_block, delayed_msgs_data)?; let block_number = block.block_ref().number; assert!( block_number <= self.indexed_block_number, @@ -318,7 +376,7 @@ impl< } // If there are no valid blocks in the buffer, await the next block from the stream - blocks = self.stream.next().await.ok_or_eyre("stream closed")? + responses = self.stream.next().await.ok_or_eyre("stream closed")? } } } @@ -392,7 +450,7 @@ impl IndexedBlockData { #[allow(missing_docs)] #[async_trait] -pub trait Provider: Sync { +pub trait IngestorProvider: Sync { async fn request( &self, method: &'static str, @@ -443,6 +501,7 @@ pub trait Provider: Sync { addresses: Vec
, block_builder: Arc + Sync + 'static>, client: EthClient, + inbox_addr: Option
, ) -> Result, ClientError> { Ok(BlockStream::new( self.subscribe::<_, Message>( @@ -453,7 +512,9 @@ pub trait Provider: Sync { .await?, block_builder, start_block, - (client, addresses, 0), + client, + (addresses, 0), + inbox_addr, )) } } @@ -482,12 +543,13 @@ impl Default for IngestorProviderConfig { } } +/// Ingestor provider - tuple of a [`WsClient`] and the maximum number of blocks to fetch per +/// request #[derive(Debug, Clone)] -#[allow(missing_docs)] -pub struct IngestorProvider(Arc, u64); +pub struct IngestorProviderImpl(Arc, u64); #[allow(missing_docs)] -impl IngestorProvider { +impl IngestorProviderImpl { pub async fn new(url: &str, config: IngestorProviderConfig) -> Self { match tokio::time::timeout( config.timeout, @@ -514,7 +576,7 @@ impl IngestorProvider { } #[async_trait] -impl Provider for IngestorProvider { +impl IngestorProvider for IngestorProviderImpl { async fn request( &self, method: &'static str, @@ -539,17 +601,22 @@ impl Provider for IngestorProvider { addresses: Vec
, block_builder: Arc + Sync + 'static>, client: EthClient, + inbox_addr: Option
, ) -> Result, ClientError> { - Ok(BlockStream::new( - self.subscribe::<_, Message>( + let stream = self + .subscribe::<_, Message>( "subscribe_blocks", (start_block, addresses.clone()), "unsubscribe_blocks", ) - .await?, + .await?; + Ok(BlockStream::new( + stream, block_builder, start_block, - (client, addresses, self.1), + client, + (addresses, self.1), + inbox_addr, )) } } @@ -599,7 +666,7 @@ mod tests { } #[async_trait] - impl Provider for RpcModule { + impl IngestorProvider for RpcModule { async fn request( &self, method: &'static str, diff --git a/synd-chain-ingestor/src/eth_client.rs b/synd-chain-ingestor/src/eth_client.rs index 02ac38978..2b46068dd 100644 --- a/synd-chain-ingestor/src/eth_client.rs +++ b/synd-chain-ingestor/src/eth_client.rs @@ -3,7 +3,8 @@ use alloy::{ eips::BlockNumberOrTag, - providers::{Provider as _, ProviderBuilder, RootProvider, WsConnect}, + primitives::TxHash, + providers::{Provider, ProviderBuilder, RootProvider, WsConnect}, pubsub::Subscription, rpc::types::{Filter, FilterBlockOption, Header}, transports::{ws::WebSocketConfig, RpcError, TransportErrorKind}, @@ -187,6 +188,26 @@ impl EthClient { } } + /// Gets a transaction by hash + #[instrument(skip(self), fields(otel.kind = ?SpanKind::Client))] + pub async fn get_transaction_by_hash( + &self, + hash: TxHash, + ) -> Option { + loop { + match timeout(self.timeout, self.client.get_transaction_by_hash(hash)).await { + Err(_) => { + error!("eth_getTransactionByHash request timed out"); + } + Ok(Err(err)) => { + handle_rpc_error("failed to get block by hash", &err); + } + Ok(Ok(tx)) => return tx, + } + tokio::time::sleep(self.retry_interval).await; + } + } + async fn handle_split_range( &self, filter: &Filter, diff --git a/synd-chain-ingestor/src/ingestor.rs b/synd-chain-ingestor/src/ingestor.rs index a51a9f915..8913a3ad4 100644 --- a/synd-chain-ingestor/src/ingestor.rs +++ b/synd-chain-ingestor/src/ingestor.rs @@ -107,44 +107,44 @@ pub async fn run( { let _guard = info_span!("send_subscriptions").entered(); - let partial_block = PartialBlock { - block_ref: BlockRef { - number: block.number, - timestamp: block.timestamp, - hash: block.hash, - }, - parent_hash: block.parent_hash, - logs: receipts - .into_iter() - .enumerate() - .flat_map(|(i, x)| { - assert_eq!(x.block_hash, block.hash); - assert_eq!(x.transaction_index, i as u64); - x.logs - }) - .collect(), - }; - ctx.lock() .map_err(|e| eyre::eyre!("Failed to acquire mutex lock: {}", e))? .subs .retain_mut(|(sink, addrs)| { - !sink.is_closed() && - sink.try_send(SubscriptionMessage::from( - serde_json::value::to_raw_value(&Message::Block(PartialBlock { - logs: partial_block - .logs - .clone() - .into_iter() - .filter(|log| addrs.contains(&log.address)) - .collect(), - block_ref: partial_block.block_ref.clone(), - parent_hash: partial_block.parent_hash, - })) + if sink.is_closed() { + return false; + } + + let mut logs = Vec::new(); + let mut log_tx_hashes = Vec::new(); + for (i, receipt) in receipts.iter().enumerate() { + assert_eq!(receipt.block_hash, block.hash); + assert_eq!(receipt.transaction_index, i as u64); + for log in &receipt.logs { + if addrs.contains(&log.address) { + logs.push(log.clone()); + log_tx_hashes.push(receipt.transaction_hash); + } + } + } + + let partial_block = PartialBlock { + block_ref: BlockRef { + number: block.number, + timestamp: block.timestamp, + hash: block.hash, + }, + parent_hash: block.parent_hash, + logs, + log_tx_hashes, + }; + + sink.try_send(SubscriptionMessage::from( + serde_json::value::to_raw_value(&Message::Block(partial_block)) .unwrap_or_else(|e| panic!("failed to serialize message: {e}")), - )) - .inspect_err(|err| error!("try_send failed: {err}")) - .is_ok() + )) + .inspect_err(|err| error!("try_send failed: {err}")) + .is_ok() }); } } diff --git a/synd-chain-ingestor/src/server.rs b/synd-chain-ingestor/src/server.rs index b0953cbeb..e6960cfe9 100644 --- a/synd-chain-ingestor/src/server.rs +++ b/synd-chain-ingestor/src/server.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use shared::{tracing::SpanKind, types::PartialBlock}; use std::{ - collections::{HashSet, VecDeque}, + collections::VecDeque, io::Error, sync::{atomic::AtomicBool, Arc, Mutex}, }; @@ -29,7 +29,7 @@ use url::Url; #[allow(missing_docs)] pub struct Context { pub db: Option, - pub subs: Vec<(SubscriptionSink, HashSet
)>, + pub subs: Vec<(SubscriptionSink, Vec
)>, } #[derive(Debug, Serialize, Deserialize)] @@ -122,6 +122,8 @@ pub async fn sync_db( Ok(db) } +const MAX_ADDRESS_PER_SUB: usize = 20; + #[allow(clippy::unwrap_used)] fn handle_subscription( mut sink: SubscriptionSink, @@ -153,15 +155,19 @@ fn handle_subscription( return Err(eyre!("start block {} after next db block {}", start_block, next_block).into()); } - let mut addrs = HashSet::new(); - for addr in addresses { - addrs.insert(addr); + if addresses.len() > MAX_ADDRESS_PER_SUB { + return Err(eyre!( + "subscribing to more addresses than allowed: {}, max: {}", + addresses.len(), + MAX_ADDRESS_PER_SUB + ) + .into()); } let message = Message::Init(db.get_block_bytes(start_block - 1)); sink.try_send(SubscriptionMessage::from(serde_json::value::to_raw_value(&message).unwrap()))?; - lock.subs.push((sink, addrs)); + lock.subs.push((sink, addresses)); drop(lock); Ok(()) } diff --git a/synd-chain-ingestor/tests/integration_test.rs b/synd-chain-ingestor/tests/integration_test.rs index f2531c062..4534a6f5e 100644 --- a/synd-chain-ingestor/tests/integration_test.rs +++ b/synd-chain-ingestor/tests/integration_test.rs @@ -4,7 +4,7 @@ use common::types::SequencingBlock; use shared::types::{BlockBuilder, PartialBlock}; use std::{sync::Arc, time::Duration}; use synd_chain_ingestor::{ - client::{BlockStreamT, IngestorProvider, IngestorProviderConfig, Provider}, + client::{BlockStreamT, IngestorProvider, IngestorProviderConfig, IngestorProviderImpl}, eth_client::EthClient, }; use test_framework::components::chain_ingestor::ChainIngestorConfig; @@ -20,12 +20,17 @@ use tracing::info; mod tests { use super::*; + use shared::types::DelayedMsgsData; use url::Url; struct MockBlockBuilder; impl BlockBuilder for MockBlockBuilder { - fn build_block(&self, block: &PartialBlock) -> eyre::Result { + fn build_block( + &self, + block: &PartialBlock, + _msgs_data: DelayedMsgsData, + ) -> eyre::Result { Ok(SequencingBlock { block_ref: block.block_ref.clone(), parent_hash: block.parent_hash, @@ -82,7 +87,7 @@ mod tests { let ingestor_ws_url = format!("ws://localhost:{}", seq_chain_ingestor_cfg.port); let client = - IngestorProvider::new(&ingestor_ws_url, IngestorProviderConfig::default()).await; + IngestorProviderImpl::new(&ingestor_ws_url, IngestorProviderConfig::default()).await; wait_until!(client.ready().await?, Duration::from_secs(10)); @@ -103,7 +108,7 @@ mod tests { let anvil = anvil.unwrap(); let client = - IngestorProvider::new(&ingestor_ws_url, IngestorProviderConfig::default()).await; + IngestorProviderImpl::new(&ingestor_ws_url, IngestorProviderConfig::default()).await; for _ in 0..initial_blocks { mine_block(&anvil.provider, 10).await?; @@ -122,8 +127,9 @@ mod tests { ) .await; - let mut block_stream = - client.get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client).await?; + let mut block_stream = client + .get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client, None) + .await?; for _ in 0..post_init_blocks { mine_block(&anvil.provider, 10).await?; @@ -153,7 +159,7 @@ mod tests { let (anvil, _ingestor, ingestor_ws_url) = setup(None).await?; let anvil = anvil.unwrap(); - let client = IngestorProvider::new( + let client = IngestorProviderImpl::new( &ingestor_ws_url, IngestorProviderConfig { max_blocks_per_request: 5, ..Default::default() }, ) @@ -178,8 +184,9 @@ mod tests { ) .await; - let mut block_stream = - client.get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client).await?; + let mut block_stream = client + .get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client, None) + .await?; for _ in 0..post_init_blocks { mine_block(&anvil.provider, 10).await?; diff --git a/synd-mchain/src/client.rs b/synd-mchain/src/client.rs index 716e407f6..305468416 100644 --- a/synd-mchain/src/client.rs +++ b/synd-mchain/src/client.rs @@ -83,8 +83,8 @@ pub trait Provider: Send + Sync { #[instrument(skip_all, err, fields(otel.kind = ?SpanKind::Client))] async fn reconcile_mchain_with_source_chains( &self, - sequencing_client: &impl synd_chain_ingestor::client::Provider, - settlement_client: &impl synd_chain_ingestor::client::Provider, + sequencing_client: &impl synd_chain_ingestor::client::IngestorProvider, + settlement_client: &impl synd_chain_ingestor::client::IngestorProvider, ) -> eyre::Result> { let (safe_state, mchain_block_number) = self.get_safe_state(sequencing_client, settlement_client).await; @@ -106,8 +106,8 @@ pub trait Provider: Send + Sync { #[instrument(skip_all, fields(otel.kind = ?SpanKind::Client))] async fn get_safe_state( &self, - sequencing_client: &impl synd_chain_ingestor::client::Provider, - settlement_client: &impl synd_chain_ingestor::client::Provider, + sequencing_client: &impl synd_chain_ingestor::client::IngestorProvider, + settlement_client: &impl synd_chain_ingestor::client::IngestorProvider, ) -> (Option, Option) { info!("getting safe state"); let mut current_block = BlockNumberOrTag::Pending; @@ -153,7 +153,7 @@ pub trait Provider: Send + Sync { } async fn validate_block_add_timestamp( - client: &impl synd_chain_ingestor::client::Provider, + client: &impl synd_chain_ingestor::client::IngestorProvider, expected_block: &mut BlockRef, ) -> bool { #[allow(clippy::unwrap_used)] @@ -280,7 +280,7 @@ mod tests { } #[async_trait] - impl synd_chain_ingestor::client::Provider for MockRPCClient { + impl synd_chain_ingestor::client::IngestorProvider for MockRPCClient { async fn request( &self, _: &'static str, diff --git a/synd-translator/bin/synd-translator/src/config_manager.rs b/synd-translator/bin/synd-translator/src/config_manager.rs index f5ec829d8..fc0cea2bf 100644 --- a/synd-translator/bin/synd-translator/src/config_manager.rs +++ b/synd-translator/bin/synd-translator/src/config_manager.rs @@ -8,7 +8,7 @@ use contract_bindings::synd::{ arb_chain_config::ArbChainConfig, arb_config_manager::ArbConfigManager, }; use eyre::Result; -use synd_chain_ingestor::client::{IngestorProvider, IngestorProviderConfig, Provider as _}; +use synd_chain_ingestor::client::{IngestorProvider, IngestorProviderConfig, IngestorProviderImpl}; use tracing::{debug, error, info, warn}; async fn rpc_client_from_urls(urls: &[String]) -> RpcClient { @@ -102,7 +102,7 @@ pub async fn with_onchain_config(config: &TranslatorConfig) -> TranslatorConfig } }; - let ingestor_provider = IngestorProvider::new( + let ingestor_provider = IngestorProviderImpl::new( config.settlement.settlement_ws_url.as_ref(), IngestorProviderConfig { timeout: config.ws_request_timeout, diff --git a/synd-translator/bin/synd-translator/src/spawn.rs b/synd-translator/bin/synd-translator/src/spawn.rs index 5a442731d..20013666d 100644 --- a/synd-translator/bin/synd-translator/src/spawn.rs +++ b/synd-translator/bin/synd-translator/src/spawn.rs @@ -11,7 +11,7 @@ use shared::{ use std::{sync::Arc, time::Duration}; use synd_block_builder::appchains::arbitrum::arbitrum_adapter::ArbitrumAdapter; use synd_chain_ingestor::{ - client::{IngestorProvider, IngestorProviderConfig, Provider as IProvider}, + client::{IngestorProvider, IngestorProviderConfig, IngestorProviderImpl}, eth_client::EthClient, }; use synd_mchain::client::{MProvider, Provider}; @@ -46,7 +46,7 @@ async fn start_slotter(config: &TranslatorConfig, metrics: &TranslatorMetrics) - .await .map_err(|e| RuntimeError::InvalidConfig(format!("Invalid synd-mchain rpc url: {e}")))?; - let sequencing_client = IngestorProvider::new( + let sequencing_client = IngestorProviderImpl::new( config.sequencing.sequencing_ws_url.as_ref().unwrap(), IngestorProviderConfig { timeout: config.ws_request_timeout, @@ -57,7 +57,7 @@ async fn start_slotter(config: &TranslatorConfig, metrics: &TranslatorMetrics) - ) .await; - let settlement_client = IngestorProvider::new( + let settlement_client = IngestorProviderImpl::new( config.settlement.settlement_ws_url.as_ref(), IngestorProviderConfig { timeout: config.ws_request_timeout, @@ -115,6 +115,7 @@ async fn start_slotter(config: &TranslatorConfig, metrics: &TranslatorMetrics) - adapter.sequencer_addresses(), adapter, seq_client, + None, ) .await?; @@ -133,12 +134,14 @@ async fn start_slotter(config: &TranslatorConfig, metrics: &TranslatorMetrics) - config.rpc_retry_interval, ) .await; + let inbox_address = Some(arbitrum_adapter.inbox_address); let settlement = settlement_client .get_blocks( settlement_config.start_block, arbitrum_adapter.settlement_addresses(), arbitrum_adapter, set_client, + inbox_address, ) .await?; @@ -155,8 +158,8 @@ async fn start_slotter(config: &TranslatorConfig, metrics: &TranslatorMetrics) - } async fn wait_until_ingestors_are_ready( - sequencing_client: &IngestorProvider, - settlement_client: &IngestorProvider, + sequencing_client: &IngestorProviderImpl, + settlement_client: &IngestorProviderImpl, ingestor_ready_check_interval: Duration, ) -> Result<()> { let interval_str = humantime::format_duration(ingestor_ready_check_interval); diff --git a/synd-translator/crates/synd-block-builder/src/appchains/arbitrum/arbitrum_adapter.rs b/synd-translator/crates/synd-block-builder/src/appchains/arbitrum/arbitrum_adapter.rs index 8cfacb0cb..841304a94 100644 --- a/synd-translator/crates/synd-block-builder/src/appchains/arbitrum/arbitrum_adapter.rs +++ b/synd-translator/crates/synd-block-builder/src/appchains/arbitrum/arbitrum_adapter.rs @@ -14,18 +14,13 @@ use crate::{ config::BlockBuilderConfig, }; use alloy::{ - primitives::{Address, Bytes, FixedBytes, Log, U256}, + primitives::{Address, Bytes, Log, U256}, sol_types::SolEvent as _, }; use common::types::{SequencingBlock, SettlementBlock}; -use contract_bindings::synd::{ - i_bridge::IBridge::MessageDelivered, - i_delayed_message_provider::IDelayedMessageProvider::{ - InboxMessageDelivered, InboxMessageDeliveredFromOrigin, - }, -}; +use contract_bindings::synd::i_bridge::IBridge::MessageDelivered; use eyre::Result; -use shared::types::{BlockBuilder, PartialBlock}; +use shared::types::{BlockBuilder, DelayedMsgsData, PartialBlock}; use std::collections::HashMap; use synd_mchain::db::DelayedMessage; use thiserror::Error; @@ -37,11 +32,6 @@ use tracing::{debug, error, info, trace}; // into blocks based on gas usage instead. const TX_PER_BLOCK: usize = 100; -const MSG_DELIVERED_EVENT_HASH: FixedBytes<32> = MessageDelivered::SIGNATURE_HASH; -const INBOX_MSG_DELIVERED_EVENT_HASH: FixedBytes<32> = InboxMessageDelivered::SIGNATURE_HASH; -const INBOX_MSG_DELIVERED_FROM_ORIGIN_EVENT_HASH: FixedBytes<32> = - InboxMessageDeliveredFromOrigin::SIGNATURE_HASH; - #[allow(missing_docs)] // self-documenting #[derive(Debug, Error)] pub enum ArbitrumBlockBuilderError { @@ -164,50 +154,23 @@ impl ArbitrumAdapter { } /// Processes settlement chain receipts into delayed messages - pub fn process_delayed_messages(&self, block: &PartialBlock) -> Result> { - // Create a local map to store message data - let mut message_data: HashMap = HashMap::new(); + pub fn process_delayed_messages( + &self, + block: &PartialBlock, + msgs_data: DelayedMsgsData, + ) -> Result> { // Process all bridge logs in all receipts let delayed_messages = block.logs.iter().filter(|log| { - log.address == self.bridge_address && log.topics()[0] == MSG_DELIVERED_EVENT_HASH + log.address == self.bridge_address && + log.topics()[0] == MessageDelivered::SIGNATURE_HASH }); - // Process all inbox logs in all receipts - block.logs.iter().filter(|log| log.address == self.inbox_address).for_each(|log| { - match log.topics()[0] { - INBOX_MSG_DELIVERED_EVENT_HASH => { - let message_num = log.topics()[1].into(); - - // Decode the event using the contract bindings - match InboxMessageDelivered::abi_decode_data_validate(&log.data.data) { - Ok(decoded) => { - message_data.insert(message_num, decoded.0); - } - Err(e) => { - panic!( - "{}", - ArbitrumBlockBuilderError::DecodingError( - "InboxMessageDelivered", - e.into() - ) - ); - } - } - } - - INBOX_MSG_DELIVERED_FROM_ORIGIN_EVENT_HASH => { - panic!("unsupported inbox message delivered from origin: {}", log.topics()[1]); - } - _ => {} - } - }); - - trace!("Delayed message data: {:?}", message_data); + trace!("Delayed message data: {:?}", msgs_data); trace!("Delayed messages: {:?}", delayed_messages); let delayed_msg_txns = delayed_messages .filter_map(|msg_log| { - match self.delayed_message_to_mchain_txn(msg_log, &message_data) { + match self.delayed_message_to_mchain_txn(msg_log, &msgs_data) { Ok(txn) => Some(txn), Err(ArbitrumBlockBuilderError::DelayedMessageIgnored( L1MessageType::Initialize, @@ -348,7 +311,15 @@ impl ArbitrumAdapter { } impl BlockBuilder for ArbitrumAdapter { - fn build_block(&self, block: &PartialBlock) -> Result { + fn build_block( + &self, + block: &PartialBlock, + msgs_data: DelayedMsgsData, + ) -> Result { + assert!( + msgs_data.is_empty(), + "delayed messages found on sequencing block: {block:?}, {msgs_data:?}" + ); let (tx_count, batch) = self.build_batch(block)?; Ok(SequencingBlock { block_ref: block.block_ref.clone(), @@ -360,11 +331,15 @@ impl BlockBuilder for ArbitrumAdapter { } impl BlockBuilder for ArbitrumAdapter { - fn build_block(&self, block: &PartialBlock) -> Result { + fn build_block( + &self, + block: &PartialBlock, + msgs_data: DelayedMsgsData, + ) -> Result { Ok(SettlementBlock { block_ref: block.block_ref.clone(), parent_hash: block.parent_hash, - messages: self.process_delayed_messages(block)?, + messages: self.process_delayed_messages(block, msgs_data)?, }) } } @@ -376,7 +351,7 @@ mod tests { use alloy::{ eips::Encodable2718, network::{EthereumWallet, TransactionBuilder as _}, - primitives::{hex, keccak256}, + primitives::{hex, keccak256, FixedBytes}, rpc::types::TransactionRequest, signers::local::PrivateKeySigner, }; @@ -531,7 +506,11 @@ mod tests { // Create the log let log = Log::new_unchecked( builder.bridge_address, - vec![MSG_DELIVERED_EVENT_HASH, message_index.into(), FixedBytes::from([1u8; 32])], + vec![ + MessageDelivered::SIGNATURE_HASH, + message_index.into(), + FixedBytes::from([1u8; 32]), + ], msg_delivered.encode_data().into(), ); @@ -573,7 +552,11 @@ mod tests { let log = Log::new_unchecked( builder.bridge_address, - vec![MSG_DELIVERED_EVENT_HASH, message_index.into(), FixedBytes::from([1u8; 32])], + vec![ + MessageDelivered::SIGNATURE_HASH, + message_index.into(), + FixedBytes::from([1u8; 32]), + ], msg_delivered.encode_data().into(), ); @@ -592,7 +575,7 @@ mod tests { // Create log with invalid event data let log = Log::new_unchecked( builder.bridge_address, - vec![MSG_DELIVERED_EVENT_HASH], + vec![MessageDelivered::SIGNATURE_HASH], Bytes::from(vec![1, 2, 3]), // Invalid data that can't be decoded ); @@ -627,7 +610,11 @@ mod tests { // Create the log let log = Log::new_unchecked( builder.bridge_address, - vec![MSG_DELIVERED_EVENT_HASH, message_index.into(), FixedBytes::from([1u8; 32])], + vec![ + MessageDelivered::SIGNATURE_HASH, + message_index.into(), + FixedBytes::from([1u8; 32]), + ], msg_delivered.encode_data().into(), ); diff --git a/synd-withdrawals/synd-enclave/README.md b/synd-withdrawals/synd-enclave/README.md index 179713d25..d8ce2666d 100644 --- a/synd-withdrawals/synd-enclave/README.md +++ b/synd-withdrawals/synd-enclave/README.md @@ -7,7 +7,7 @@ - Make sure it is in your PATH via `which abigen || echo "$(go env GOPATH)/bin/abigen"`, and if it isn't you can temporarily add with `export PATH="$(go env GOPATH)/bin:$PATH"` 3. from /shared, run `make create-withdrawal-contract-bindings-go`. - You may need to run `make build-node-deps` as well. -- If you encounter yarn errors, you may need to upgrade yarn to v4. +- If you encounter yarn errors, you may need to upgrade yarn to v4 (`yarn set version stable`). 4. from /synd-withdrawals/synd-enclave/nitro run `make contracts`. Must use a supported version of npm, like v18. Run `nvm use` to select the proper node version. - You may need to modify nitro/Makefile to remove CGO_LDFLAGS linker flag values - -Wl,-no_warn_duplicate_libraries that are unsupported on Mac 5. from /synd-withdrawals/synd-enclave, run `go mod tidy` diff --git a/synd-withdrawals/synd-proposer/pkg/helpers.go b/synd-withdrawals/synd-proposer/pkg/helpers.go index 8a4393ff6..717f853c2 100644 --- a/synd-withdrawals/synd-proposer/pkg/helpers.go +++ b/synd-withdrawals/synd-proposer/pkg/helpers.go @@ -11,6 +11,7 @@ import ( "github.com/SyndicateProtocol/synd-appchains/synd-enclave/teetypes" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -42,8 +43,11 @@ type ValidationData struct { } var ( - messageDeliveredID common.Hash - inboxMessageDeliveredID common.Hash + messageDeliveredEventHash common.Hash + inboxMessageDeliveredEventHash common.Hash + inboxMessageDeliveredFromOriginEventHash common.Hash + l2MessageFromOriginCallABI abi.Method + l2MessageFromOriginCallSelector common.Hash ) var ArbSysPrecompileAddress = common.HexToAddress("0x0000000000000000000000000000000000000064") @@ -53,12 +57,19 @@ func init() { if err != nil { panic(err) } - messageDeliveredID = parsedIBridgeABI.Events["MessageDelivered"].ID + messageDeliveredEventHash = parsedIBridgeABI.Events["MessageDelivered"].ID parsedIMessageProviderABI, err := bridgegen.IDelayedMessageProviderMetaData.GetAbi() if err != nil { panic(err) } - inboxMessageDeliveredID = parsedIMessageProviderABI.Events["InboxMessageDelivered"].ID + inboxMessageDeliveredEventHash = parsedIMessageProviderABI.Events["InboxMessageDelivered"].ID + inboxMessageDeliveredFromOriginEventHash = parsedIMessageProviderABI.Events["InboxMessageDeliveredFromOrigin"].ID + parsedIInboxABI, err := bridgegen.IInboxMetaData.GetAbi() + if err != nil { + panic(err) + } + l2MessageFromOriginCallABI = parsedIInboxABI.Methods["sendL2MessageFromOrigin"] + l2MessageFromOriginCallSelector = common.BytesToHash(l2MessageFromOriginCallABI.ID) } // once the target qty is reached or exceeded, getLogs stops fetching logs @@ -234,7 +245,7 @@ func GetDelayedMessages( 0, endBlock, []common.Address{bridge}, - [][]common.Hash{{messageDeliveredID}, nil, {endAcc}}, + [][]common.Hash{{messageDeliveredEventHash}, nil, {endAcc}}, 1) if err != nil { return common.Hash{}, nil, false, err @@ -269,7 +280,7 @@ func GetDelayedMessages( 0, endBlock, []common.Address{bridge}, - [][]common.Hash{{messageDeliveredID}, indexes}, + [][]common.Hash{{messageDeliveredEventHash}, indexes}, uint64(len(indexes))) if err != nil { return common.Hash{}, nil, false, err @@ -311,7 +322,7 @@ func GetDelayedMessages( logs, err = getLogs(ctx, c, logs[0].BlockNumber, logs[len(logs)-1].BlockNumber, addrs, [][]common.Hash{ - {messageDeliveredID, inboxMessageDeliveredID}, + {messageDeliveredEventHash, inboxMessageDeliveredEventHash, inboxMessageDeliveredFromOriginEventHash}, }, 0) if err != nil { return common.Hash{}, nil, false, err @@ -333,22 +344,63 @@ func GetDelayedMessages( var msgs [][]byte var prevAcc *common.Hash for i := 0; i < len(logs); i += 2 { - log, err := ibridge.ParseMessageDelivered(logs[i]) + msgDeliveredLog, err := ibridge.ParseMessageDelivered(logs[i]) if err != nil { return common.Hash{}, nil, false, errors.Wrap(err, "failed to parse message delivered log") } - dataLog, err := iinbox.ParseInboxMessageDelivered(logs[i+1]) - if err != nil { - return common.Hash{}, nil, false, errors.Wrap(err, "failed to parse message delivered log") + + inboxLog := logs[i+1] + var logData []byte + var logBlockNum uint64 + var logMsgIndex *big.Int + + switch inboxLog.Topics[0] { + case inboxMessageDeliveredEventHash: + dataLog, err := iinbox.ParseInboxMessageDelivered(inboxLog) + if err != nil { + return common.Hash{}, nil, false, errors.Wrap(err, "failed to parse message delivered log") + } + logData = dataLog.Data + logBlockNum = dataLog.Raw.BlockNumber + logMsgIndex = dataLog.MessageNum + + case inboxMessageDeliveredFromOriginEventHash: + dataLog, err := iinbox.ParseInboxMessageDeliveredFromOrigin(inboxLog) + if err != nil { + return common.Hash{}, nil, false, errors.Wrap(err, "failed to parse message delivered from origin log") + } + // fetch the tx from the event + tx, _, err := c.TransactionByHash(ctx, inboxLog.TxHash) + if err != nil { + return common.Hash{}, nil, false, errors.Wrap(err, fmt.Sprintf("failed to get tx by hash %s", inboxLog.TxHash.String())) + } + if len(tx.Data()) < 4 { + return common.Hash{}, nil, false, errors.New("tx data too short") + } + if l2MessageFromOriginCallSelector.Cmp(common.BytesToHash(tx.Data()[:4])) != 0 { + return common.Hash{}, nil, false, errors.New("invalid function selector") + } + args := make(map[string]interface{}) + err = l2MessageFromOriginCallABI.Inputs.UnpackIntoMap(args, tx.Data()[4:]) + if err != nil { + return common.Hash{}, nil, false, errors.Wrap(err, "failed to parse inputs of sendL2MessageFromOrigin") + } + var ok bool + logData, ok = args["messageData"].([]byte) + if !ok { + return common.Hash{}, nil, false, errors.New("failed to cast messageData to []byte") + } + logBlockNum = dataLog.Raw.BlockNumber + logMsgIndex = dataLog.MessageNum } - if log.MessageIndex.Cmp(dataLog.MessageNum) != 0 { + if msgDeliveredLog.MessageIndex.Cmp(logMsgIndex) != 0 { return common.Hash{}, nil, false, errors.New("event log msg index mismatch") } - if log.Raw.BlockNumber != dataLog.Raw.BlockNumber { + if msgDeliveredLog.Raw.BlockNumber != logBlockNum { return common.Hash{}, nil, false, errors.New("event log block number mismatch") } // skip events prior to the start one - if log.MessageIndex.Cmp(big.NewInt(int64(start))) != 0 { + if msgDeliveredLog.MessageIndex.Cmp(big.NewInt(int64(start))) != 0 { continue } // exit once we have processed the end message @@ -357,25 +409,25 @@ func GetDelayedMessages( } start++ if prevAcc == nil { - hash := common.Hash(log.BeforeInboxAcc) + hash := common.Hash(msgDeliveredLog.BeforeInboxAcc) prevAcc = &hash } - requestId := common.BigToHash(log.MessageIndex) + requestId := common.BigToHash(msgDeliveredLog.MessageIndex) msg := arbostypes.L1IncomingMessage{ Header: &arbostypes.L1IncomingMessageHeader{ - Kind: log.Kind, - Poster: log.Sender, - BlockNumber: log.Raw.BlockNumber, - Timestamp: log.Timestamp, + Kind: msgDeliveredLog.Kind, + Poster: msgDeliveredLog.Sender, + BlockNumber: msgDeliveredLog.Raw.BlockNumber, + Timestamp: msgDeliveredLog.Timestamp, RequestId: &requestId, - L1BaseFee: log.BaseFeeL1, + L1BaseFee: msgDeliveredLog.BaseFeeL1, }, - L2msg: dataLog.Data, + L2msg: logData, } if settlesToArbitrumRollup { - block, err := c.BlockByHash(ctx, log.Raw.BlockHash) + block, err := c.BlockByHash(ctx, msgDeliveredLog.Raw.BlockHash) if err != nil { return common.Hash{}, nil, false, errors.Wrap(err, "failed to get block by hash") } diff --git a/test-framework/tests/e2e/e2e_tests.rs b/test-framework/tests/e2e/e2e_tests.rs index 43b89f1ab..833dfb1dc 100644 --- a/test-framework/tests/e2e/e2e_tests.rs +++ b/test-framework/tests/e2e/e2e_tests.rs @@ -12,7 +12,7 @@ use contract_bindings::synd::{i_inbox::IInbox, rollup::Rollup}; use eyre::Result; use std::time::Duration; use synd_block_builder::appchains::shared::sequencing_transaction_parser::L2MessageKind; -use synd_chain_ingestor::client::{IngestorProvider, IngestorProviderConfig}; +use synd_chain_ingestor::client::{IngestorProviderConfig, IngestorProviderImpl}; use synd_mchain::client::Provider as _; use test_framework::components::{ configuration::{BaseChainsType, ConfigurationOptions}, @@ -794,12 +794,12 @@ async fn e2e_reboot_without_settlement_processed() -> Result<()> { // assert that restarting and rolling back here will not make synd-mchain go back to // block 1 - let seq_mchain_client = IngestorProvider::new( + let seq_mchain_client = IngestorProviderImpl::new( components.sequencing_ingestor_rpc_url.as_ref(), IngestorProviderConfig { timeout: Duration::from_secs(1), ..Default::default() }, ) .await; - let settlement_client = IngestorProvider::new( + let settlement_client = IngestorProviderImpl::new( components.settlement_ingestor_rpc_url.as_ref(), IngestorProviderConfig { timeout: Duration::from_secs(1), ..Default::default() }, ) diff --git a/test-framework/tests/e2e/e2e_tests_withdrawals.rs b/test-framework/tests/e2e/e2e_tests_withdrawals.rs index 3900b3d13..6dc6fd5c3 100644 --- a/test-framework/tests/e2e/e2e_tests_withdrawals.rs +++ b/test-framework/tests/e2e/e2e_tests_withdrawals.rs @@ -3,7 +3,7 @@ use alloy::{ contract::CallBuilder, eips::{BlockNumberOrTag, Encodable2718}, network::{Ethereum, TransactionBuilder as _}, - primitives::{address, keccak256, utils::parse_ether, Address, B256, U160, U256}, + primitives::{address, keccak256, utils::parse_ether, Address, B256, U256}, providers::{ ext::{AnvilApi, DebugApi}, Provider, ProviderBuilder, WalletProvider as _, @@ -11,7 +11,7 @@ use alloy::{ rpc::types::{ anvil::MineOptions, trace::geth::{GethDebugTracingOptions, GethTrace}, - TransactionRequest, + TransactionReceipt, TransactionRequest, }, signers::local::PrivateKeySigner, sol, @@ -33,11 +33,13 @@ use test_framework::components::{ use test_utils::{ chain_info::{test_account1, test_account2, test_account3, PRIVATE_KEY3}, docker::{launch_enclave_server, start_component}, - nitro_chain::{execute_withdrawal, init_withdrawal_tx}, + nitro_chain::{ + apply_l1_to_l2_alias, execute_withdrawal, init_withdrawal_tx, ExecuteWithdrawalParams, + }, port_manager::PortManager, wait_until, }; -use tokio::task::JoinHandle; +use tokio::{task::JoinHandle, time}; #[ctor::ctor] fn init() { @@ -88,7 +90,7 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu })) .await .unwrap(); // NOTE: this will crash once the test ends that's fine - tokio::time::sleep(Duration::from_secs(10)).await; + time::sleep(Duration::from_secs(10)).await; } }); @@ -96,7 +98,7 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu let inbox = IInbox::new(components.appchain_deployment.inbox, &components.settlement_provider); - // NOTE: manually setting the nonce shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually setting the nonce shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 let receipt = inbox .depositEth() .value(parse_ether("1")?) @@ -237,7 +239,7 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu &components.settlement_provider, ); - // NOTE: manually setting the nonce shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually setting the nonce shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 let receipt = assertion_poster .transferOwnership(tee_module_addr) .nonce( @@ -303,7 +305,7 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu let public_values = tee_public_key.abi_encode(); let proof_bytes = vec![]; - // NOTE: manually setting the nonce shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually setting the nonce shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 let receipt = key_mgr .addKey(public_values.into(), proof_bytes.into()) .nonce( @@ -329,23 +331,22 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu ) .await?; - // send a dummy tx so that the sequencing chain progresses and the deposit is - // slotted in - components.sequence_tx(b"dummy_tx", 0, false).await?; - wait_until!( - components.appchain_provider.get_balance(test_account1().address).await? >= - parse_ether("1")?, - Duration::from_secs(60) + { + // send a dummy tx so that the sequencing chain progresses and the deposit is + // slotted in + components.sequence_tx(b"dummy_tx", 0, false).await?; + components.appchain_provider.get_balance(test_account1().address).await? >= + parse_ether("1")? + }, + Duration::from_secs(60), + Duration::from_millis(500) ); // send 101 valid txs plus some invalid ones to trigger the block splitting code which // does not require the nitro fork to be enabled let latest = components.appchain_provider.get_block_number().await?; - let offset = address!("0x1000000000000000000000000000000000000001").into(); - let alias_address = Address::from( - U160::from_be_slice(&test_account1().address[..]).wrapping_add(offset), - ); + let alias_address = apply_l1_to_l2_alias(test_account1().address); let dummy_tx = vec![L2MessageKind::SignedTx as u8, 0xc0]; let mut txs = vec![]; for _ in 0..100 { @@ -448,14 +449,93 @@ async fn e2e_tee_withdrawal_basic_flow(base_chains_type: BaseChainsType) -> Resu ); // finish the withdrawal on the settlement chain - execute_withdrawal( + execute_withdrawal(ExecuteWithdrawalParams { to_address, withdrawal_value, - components.appchain_deployment.bridge, - &components.settlement_provider, - &components.appchain_provider, - ) - .await?; + appchain_block_hash_to_prove, + bridge_address: components.appchain_deployment.bridge, + settlement_provider: &components.settlement_provider, + appchain_provider: &components.appchain_provider, + l2_sender: components.appchain_provider.default_signer_address(), + send_root_size: 1, + withdrawal_position: 0, + }) + .await; + + // Assert new balance is equal to withdrawal amount + let balance_after = components.settlement_provider.get_balance(to_address).await?; + assert_eq!(balance_after, withdrawal_value); + + // lets withdraw using sendL2MessageFromOrigin + let withdrawal_value = parse_ether("0.5")?; + let to_address = address!("0x0000000000000000000000000000000000000002"); + let withdraw_from_origin_tx = + init_withdrawal_tx(to_address, withdrawal_value, &components.appchain_provider) + .await?; + let tx_hash = withdraw_from_origin_tx.hash(); + let mut raw_tx_with_prefix = withdraw_from_origin_tx.encoded_2718(); + raw_tx_with_prefix.insert(0, L2MessageKind::SignedTx as u8); + + let nonce = components + .settlement_provider + .get_transaction_count(components.settlement_provider.default_signer_address()) + .await?; + assert!(inbox + .sendL2MessageFromOrigin(raw_tx_with_prefix.into()) + .nonce(nonce) + .send() + .await? + .get_receipt() + .await? + .status()); + + let mut receipt: Option = None; + wait_until!( + { + // send a dummy tx so that the sequencing chain progresses and the deposit is + // slotted in + components.sequence_tx(b"dummy_tx", 0, false).await?; + receipt = + components.appchain_provider.get_transaction_receipt(*tx_hash).await?; + receipt.is_some() + }, + Duration::from_secs(60), + Duration::from_millis(500) + ); + let receipt = receipt.unwrap(); + assert!(receipt.status()); + + // wait for the sendroot to be updated + let appchain_block_hash_to_prove = receipt.block_hash.unwrap(); + wait_until!( + rollup_core + .NodeConfirmed_filter() + .query() + .await? + .iter() + .any(|event| event.0.blockHash == appchain_block_hash_to_prove), + Duration::from_secs(10 * 60) + ); + + // topic 3 of the L2ToL1Tx event is the withdrawal position + let withdrawal_position: u64 = + U256::from_be_bytes(receipt.logs()[1].clone().topics()[3].into()) + .try_into() + .unwrap(); + + // finish the withdrawal on the settlement chain + execute_withdrawal(ExecuteWithdrawalParams { + to_address, + withdrawal_value, + appchain_block_hash_to_prove, + bridge_address: components.appchain_deployment.bridge, + settlement_provider: &components.settlement_provider, + appchain_provider: &components.appchain_provider, + l2_sender: components.appchain_provider.default_signer_address(), + send_root_size: 2, + withdrawal_position, + }) + .await; // Assert new balance is equal to withdrawal amount let balance_after = components.settlement_provider.get_balance(to_address).await?; @@ -613,7 +693,7 @@ async fn setup_l1_oracle + Clone + Send + Sync + 'static>( l1_provider: T, target_chain_provider: T, ) -> (JoinHandle<()>, Address) { - // NOTE: manually constructing the deployment tx shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually constructing the deployment tx shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 // instead should just be: // let oracle_contract = L1BlockOracle::deploy(target_chain_provider).await.unwrap(); let receipt = L1BlockOracle::deploy_builder(target_chain_provider.clone()) @@ -636,7 +716,7 @@ async fn setup_l1_oracle + Clone + Send + Sync + 'static>( println!("l1 block: {l1_block:?}"); let l1_hash = l1_block.hash; let l1_timestamp = l1_block.timestamp; - // NOTE: manually setting the nonce shouldn't be necessary, likey an artifact of: https://github.com/alloy-rs/alloy/issues/2668 + // NOTE: manually setting the nonce shouldn't be necessary, likely an artifact of: https://github.com/alloy-rs/alloy/issues/2668 let receipt = oracle_contract .setL1Block(l1_timestamp, l1_hash) .nonce(