From ce7328eb19db429ea349cb5abf086a85ad351c0d Mon Sep 17 00:00:00 2001 From: f3kilo Date: Fri, 17 Jan 2025 11:15:48 +0300 Subject: [PATCH 1/2] blockchain provider v2 --- bin/reth/Cargo.toml | 1 + bin/reth/src/commands/bitfinity_import2.rs | 287 +++++++ bin/reth/src/commands/mod.rs | 1 + bin/reth/src/main.rs | 43 +- .../tests/commands/bitfinity_import_it2.rs | 136 ++++ bin/reth/tests/commands/bitfinity_node_it2.rs | 723 ++++++++++++++++++ bin/reth/tests/commands/mod.rs | 3 + bin/reth/tests/commands/utils2.rs | 300 ++++++++ 8 files changed, 1493 insertions(+), 1 deletion(-) create mode 100644 bin/reth/src/commands/bitfinity_import2.rs create mode 100644 bin/reth/tests/commands/bitfinity_import_it2.rs create mode 100644 bin/reth/tests/commands/bitfinity_node_it2.rs create mode 100644 bin/reth/tests/commands/utils2.rs diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index b6fcda2834c..1dfd82865c5 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -121,6 +121,7 @@ reth-db-common.workspace = true reth-db = { workspace = true, features = ["mdbx", "test-utils"] } serial_test.workspace = true reth-discv5.workspace = true +reth-node-ethereum = { workspace = true, features = ["test-utils"] } [features] default = ["jemalloc"] diff --git a/bin/reth/src/commands/bitfinity_import2.rs b/bin/reth/src/commands/bitfinity_import2.rs new file mode 100644 index 00000000000..0721266f2ed --- /dev/null +++ b/bin/reth/src/commands/bitfinity_import2.rs @@ -0,0 +1,287 @@ +//! Command that initializes the node by importing a chain from a remote EVM node. + +use crate::{dirs::DataDirPath, version::SHORT_VERSION}; +use futures::{Stream, StreamExt}; +use lightspeed_scheduler::{job::Job, scheduler::Scheduler, JobExecutor}; +use reth_beacon_consensus::EthBeaconConsensus; +use reth_chainspec::ChainSpec; +use reth_config::{config::EtlConfig, Config}; +use reth_db::DatabaseEnv; + +use alloy_primitives::B256; +use reth_consensus::Consensus; +use reth_downloaders::{ + bitfinity_evm_client::{BitfinityEvmClient, CertificateCheckSettings, RpcClientConfig}, + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_exex::ExExManagerHandle; +use reth_node_api::NodeTypesWithDBAdapter; +use reth_node_core::{args::BitfinityImportArgs, dirs::ChainPath}; +use reth_node_ethereum::{EthExecutorProvider, EthereumNode}; +use reth_node_events::node::NodeEvent; +use reth_primitives::{EthPrimitives, SealedHeader}; +use reth_provider::providers::BlockchainProvider2; +use reth_provider::{ + BlockNumReader, CanonChainTracker, ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, + ProviderError, ProviderFactory, +}; +use reth_prune::PruneModes; +use reth_stages::{ + prelude::*, + stages::{ExecutionStage, SenderRecoveryStage}, + ExecutionStageThresholds, Pipeline, StageSet, +}; +use reth_static_file::StaticFileProducer; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::sync::watch; +use tracing::{debug, info}; + +/// Syncs RLP encoded blocks from a file. +#[derive(Clone)] +pub struct BitfinityImportCommand { + config: Config, + + datadir: ChainPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + chain: Arc, + + /// Bitfinity Related Args + bitfinity: BitfinityImportArgs, + + provider_factory: ProviderFactory>>, + + blockchain_provider: + BlockchainProvider2>>, +} + +/// Manually implement `Debug` for `ImportCommand` because `BlockchainProvider` doesn't implement it. +impl std::fmt::Debug for BitfinityImportCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ImportCommand") + .field("config", &self.config) + .field("datadir", &self.datadir) + .field("chain", &self.chain) + .field("bitfinity", &self.bitfinity) + .finish() + } +} + +type TypedPipeline = Pipeline>>; + +impl BitfinityImportCommand { + /// Create a new `ImportCommand` with the given arguments. + pub fn new( + config: Option, + datadir: ChainPath, + chain: Arc, + bitfinity: BitfinityImportArgs, + provider_factory: ProviderFactory>>, + blockchain_provider: BlockchainProvider2< + NodeTypesWithDBAdapter>, + >, + ) -> Self { + // add network name to data dir + let config_path = config.unwrap_or_else(|| datadir.config()); + + info!(target: "reth::cli - BitfinityImportCommand", path = ?config_path, "Configuration loaded"); + let mut config = Config::from_path(config_path) + .expect("Failed to load BitfinityImportCommand configuration"); + + // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to + if config.stages.etl.dir.is_none() { + config.stages.etl.dir = Some(EtlConfig::from_datadir(datadir.data_dir())); + } + + Self { config, datadir, chain, bitfinity, provider_factory, blockchain_provider } + } + + /// Schedule the import job and return a handle to it. + pub async fn schedule_execution( + self, + job_executor: Option, + ) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> { + info!(target: "reth::cli - BitfinityImportCommand", "reth {} starting", SHORT_VERSION); + + let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz); + + // Schedule the import job + { + let interval = Duration::from_secs(self.bitfinity.import_interval); + job_executor + .add_job_with_scheduler( + Scheduler::Interval { interval_duration: interval, execute_at_startup: true }, + Job::new("import", "block importer", None, move || { + let import = self.clone(); + Box::pin(async move { + import.single_execution().await?; + import.update_chain_info()?; + Ok(()) + }) + }), + ) + .await; + } + + let job_handle = job_executor.run().await?; + Ok((job_executor, job_handle)) + } + + /// Execute the import job. + async fn single_execution(&self) -> eyre::Result<()> { + let consensus = Arc::new(EthBeaconConsensus::new(self.chain.clone())); + debug!(target: "reth::cli - BitfinityImportCommand", "Consensus engine initialized"); + let provider_factory = self.provider_factory.clone(); + + // Get the local block number + let start_block = provider_factory.provider()?.last_block_number()? + 1; + + debug!(target: "reth::cli - BitfinityImportCommand", "Starting block: {}", start_block); + + let rpc_config = RpcClientConfig { + primary_url: self.bitfinity.rpc_url.clone(), + backup_url: self.bitfinity.backup_rpc_url.clone(), + max_retries: self.bitfinity.max_retries, + retry_delay: Duration::from_secs(self.bitfinity.retry_delay_secs), + }; + + let remote_client = Arc::new( + BitfinityEvmClient::from_rpc_url( + rpc_config, + start_block, + self.bitfinity.end_block, + self.bitfinity.batch_size, + self.bitfinity.max_fetch_blocks, + Some(CertificateCheckSettings { + evmc_principal: self.bitfinity.evmc_principal.clone(), + ic_root_key: self.bitfinity.ic_root_key.clone(), + }), + ) + .await?, + ); + + // override the tip + let tip = if let Some(tip) = remote_client.tip() { + tip + } else { + debug!(target: "reth::cli - BitfinityImportCommand", "No tip found, skipping import"); + return Ok(()); + }; + + info!(target: "reth::cli - BitfinityImportCommand", "Chain blocks imported"); + + let (mut pipeline, _events) = self.build_import_pipeline( + &self.config, + provider_factory.clone(), + &consensus, + remote_client, + StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), + )?; + + // override the tip + pipeline.set_tip(tip); + debug!(target: "reth::cli - BitfinityImportCommand", ?tip, "Tip manually set"); + + // Run pipeline + debug!(target: "reth::cli - BitfinityImportCommand", "Starting sync pipeline"); + pipeline.run().await?; + + info!(target: "reth::cli - BitfinityImportCommand", "Finishing up"); + Ok(()) + } + + /// Update the chain info tracker with the latest header from the database. + fn update_chain_info(&self) -> eyre::Result<()> { + let provider = self.blockchain_provider.database_provider_ro()?; + let chain_info = provider.chain_info()?; + + match provider.header_by_number(chain_info.best_number)? { + Some(header) => { + let sealed_header = header.seal(chain_info.best_hash); + let hash = sealed_header.seal(); + let sealed_header = SealedHeader::new(sealed_header.into_inner(), hash); + self.blockchain_provider.set_canonical_head(sealed_header.clone()); + self.blockchain_provider.set_finalized(sealed_header.clone()); + self.blockchain_provider.set_safe(sealed_header); + Ok(()) + } + None => Err(ProviderError::HeaderNotFound(chain_info.best_number.into()))?, + } + } + + fn build_import_pipeline( + &self, + config: &Config, + provider_factory: ProviderFactory>>, + consensus: &Arc, + remote_client: Arc, + static_file_producer: StaticFileProducer< + ProviderFactory>>, + >, + ) -> eyre::Result<(TypedPipeline, impl Stream>)> + where + C: Consensus + 'static, + { + if !remote_client.has_canonical_blocks() { + eyre::bail!("unable to import non canonical blocks"); + } + + let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) + .build(remote_client.clone(), consensus.clone()) + .into_task(); + + let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) + .build(remote_client.clone(), consensus.clone(), provider_factory.clone()) + .into_task(); + + let (tip_tx, tip_rx) = watch::channel(B256::ZERO); + let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec()); + + let max_block = remote_client.max_block().unwrap_or(0); + let pipeline = + Pipeline::>>::builder() + .with_tip_sender(tip_tx) + // we want to sync all blocks the file client provides or 0 if empty + .with_max_block(max_block) + .add_stages( + DefaultStages::new( + provider_factory.clone(), + tip_rx, + consensus.clone(), + header_downloader, + body_downloader, + executor.clone(), + config.stages.clone(), + PruneModes::default(), + ) + .set(SenderRecoveryStage { + commit_threshold: config.stages.sender_recovery.commit_threshold, + }) + .set(ExecutionStage::new( + executor, + ExecutionStageThresholds { + max_blocks: config.stages.execution.max_blocks, + max_changes: config.stages.execution.max_changes, + max_cumulative_gas: config.stages.execution.max_cumulative_gas, + max_duration: config.stages.execution.max_duration, + }, + config + .stages + .merkle + .clean_threshold + .max(config.stages.account_hashing.clean_threshold) + .max(config.stages.storage_hashing.clean_threshold), + config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), + ExExManagerHandle::empty(), + )), + ) + .build(provider_factory, static_file_producer); + + let events = pipeline.events().map(Into::into); + + Ok((pipeline, events)) + } +} diff --git a/bin/reth/src/commands/mod.rs b/bin/reth/src/commands/mod.rs index 9380056c8b1..1ab0e8cd38c 100644 --- a/bin/reth/src/commands/mod.rs +++ b/bin/reth/src/commands/mod.rs @@ -1,5 +1,6 @@ //! This contains all of the `reth` commands pub mod bitfinity_import; +pub mod bitfinity_import2; pub mod bitfinity_reset_evm_state; pub mod debug_cmd; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 88d13499ea7..2af41212866 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -6,7 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne use std::{sync::Arc, time::Duration}; use clap::{Args, Parser}; -use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; +use reth::{bitfinity_tasks::send_txs::BitfinityTransactionSender, commands::bitfinity_import2::{self, BitfinityImportCommand}}; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_node_builder::{ engine_tree_config::{ @@ -96,6 +96,47 @@ fn main() { builder.launch_with(launcher) }) .await?; + + let blockchain_provider = handle.node.provider.clone(); + let config = handle.node.config.config.clone(); + let chain = handle.node.chain_spec(); + let datadir = handle.node.data_dir.clone(); + let (provider_factory, bitfinity) = handle.bitfinity_import.clone().expect("Bitfinity import not configured"); + + // Init bitfinity import + let executor = { + let import = bitfinity_import2::BitfinityImportCommand::new( + config, + datadir, + chain, + bitfinity.clone(), + provider_factory, + blockchain_provider, + ); + let (executor, _import_handle) = import.schedule_execution(None).await?; + executor + }; + + if bitfinity.tx_queue { + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(1000))); + + // Make EthApi handler move new txs to queue. + let queue_clone = Arc::clone(&queue); + handle.node.add_ons_handle.eth_api().set_bitfinity_tx_forwarder(BitfinityTransactionsForwarder::new(queue_clone)); + + // Run batch transaction sender. + let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url); + let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs); + let transaction_sending = BitfinityTransactionSender::new( + queue, + url, + period, + bitfinity.queued_txs_batch_size, + bitfinity.queued_txs_per_execution_threshold, + ); + let _sending_handle = transaction_sending.schedule_execution(Some(executor)).await?; + } + handle.node_exit_future.await } true => { diff --git a/bin/reth/tests/commands/bitfinity_import_it2.rs b/bin/reth/tests/commands/bitfinity_import_it2.rs new file mode 100644 index 00000000000..2d967de33fd --- /dev/null +++ b/bin/reth/tests/commands/bitfinity_import_it2.rs @@ -0,0 +1,136 @@ +//! +//! Integration tests for the bitfinity import command with BlockchainProvider2. +//! These tests requires a running EVM node or EVM block extractor node at the specified URL. +//! + +use super::utils::*; +use alloy_eips::BlockNumberOrTag; +use ethereum_json_rpc_client::{reqwest::ReqwestClient, EthJsonRpcClient}; +use reth_provider::{BlockNumReader, BlockReader, BlockReaderIdExt}; +use std::time::Duration; + +#[tokio::test] +async fn bitfinity_test_should_import_data_from_evm() { + // Arrange + let _log = init_logs(); + let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; + let (_temp_dir, mut import_data) = + bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); + + let end_block = 100; + import_data.bitfinity_args.end_block = Some(end_block); + import_data.bitfinity_args.batch_size = (end_block as usize) * 10; + + // Act + import_blocks(import_data.clone(), Duration::from_secs(20), false).await; + + // Assert + { + let provider = import_data.provider_factory.provider().unwrap(); + assert_eq!(end_block, provider.last_block_number().unwrap()); + + // create evm client + let evm_rpc_client = + EthJsonRpcClient::new(ReqwestClient::new(evm_datasource_url.to_string())); + + let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); + let local_block = provider.block_by_number(end_block).unwrap().unwrap(); + + assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); + assert_eq!(remote_block.state_root.0, local_block.state_root.0); + } +} + +#[tokio::test] +async fn bitfinity_test_should_import_with_small_batch_size() { + // Arrange + let _log = init_logs(); + let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; + let (_temp_dir, mut import_data) = + bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); + + let end_block = 101; + import_data.bitfinity_args.end_block = Some(end_block); + import_data.bitfinity_args.batch_size = 10; + + // Act + import_blocks(import_data.clone(), Duration::from_secs(20), false).await; + + // Assert + { + let provider = import_data.provider_factory.provider().unwrap(); + assert_eq!(end_block, provider.last_block_number().unwrap()); + + // create evm client + let evm_rpc_client = + EthJsonRpcClient::new(ReqwestClient::new(evm_datasource_url.to_string())); + + let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); + let local_block = provider.block_by_number(end_block).unwrap().unwrap(); + + assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); + assert_eq!(remote_block.state_root.0, local_block.state_root.0); + } +} + +#[tokio::test] +async fn bitfinity_test_finalized_and_safe_query_params_works() { + // Arrange + let _log = init_logs(); + let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; + let (_temp_dir, mut import_data) = + bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); + + let end_block = 100; + import_data.bitfinity_args.end_block = Some(end_block); + import_data.bitfinity_args.batch_size = (end_block as usize) * 10; + + // Act + import_blocks(import_data.clone(), Duration::from_secs(20), true).await; + + let latest_block = import_data + .blockchain_db + .block_by_number_or_tag(BlockNumberOrTag::Finalized) + .unwrap() + .unwrap(); + assert_eq!(end_block, latest_block.number); + + let safe_block = + import_data.blockchain_db.block_by_number_or_tag(BlockNumberOrTag::Safe).unwrap().unwrap(); + assert_eq!(end_block, safe_block.number); +} + +#[tokio::test] +async fn bitfinity_test_should_import_data_from_evm_with_backup_rpc_url() { + // Arrange + let _log = init_logs(); + let evm_datasource_url = "https://fake_rpc_url"; + let backup_rpc_url = DEFAULT_EVM_DATASOURCE_URL; + + let (_temp_dir, mut import_data) = + bitfinity_import_config_data(evm_datasource_url, Some(backup_rpc_url.to_owned()), None) + .await + .unwrap(); + + let end_block = 100; + import_data.bitfinity_args.end_block = Some(end_block); + import_data.bitfinity_args.batch_size = (end_block as usize) * 10; + + // Act + import_blocks(import_data.clone(), Duration::from_secs(200), false).await; + + // Assert + { + let provider = import_data.provider_factory.provider().unwrap(); + assert_eq!(end_block, provider.last_block_number().unwrap()); + + // create evm client + let evm_rpc_client = EthJsonRpcClient::new(ReqwestClient::new(backup_rpc_url.to_string())); + + let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); + let local_block = provider.block_by_number(end_block).unwrap().unwrap(); + + assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); + assert_eq!(remote_block.state_root.0, local_block.state_root.0); + } +} diff --git a/bin/reth/tests/commands/bitfinity_node_it2.rs b/bin/reth/tests/commands/bitfinity_node_it2.rs new file mode 100644 index 00000000000..0ffde13bdb9 --- /dev/null +++ b/bin/reth/tests/commands/bitfinity_node_it2.rs @@ -0,0 +1,723 @@ +//! +//! Integration tests for the bitfinity node command with BlockchainProvider2. +//! + +use super::utils::*; +use did::keccak; +use eth_server::{EthImpl, EthServer}; +use ethereum_json_rpc_client::CertifiedResult; +use ethereum_json_rpc_client::{reqwest::ReqwestClient, EthJsonRpcClient}; +use jsonrpsee::{ + server::{Server, ServerHandle}, + Methods, RpcModule, +}; +use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; +use reth::{ + args::{DatadirArgs, RpcServerArgs}, + dirs::{DataDirPath, MaybePlatformPath}, +}; +use reth_consensus::FullConsensus; +use reth_db::test_utils::TempDatabase; +use reth_db::DatabaseEnv; +use reth_db::{init_db, test_utils::tempdir_path}; +use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; +use reth_network::NetworkHandle; +use reth_node_api::{FullNodeTypesAdapter, NodeTypesWithDBAdapter}; +use reth_node_builder::components::Components; +use reth_node_builder::engine_tree_config::TreeConfig; +use reth_node_builder::rpc::RpcAddOns; +use reth_node_builder::{EngineNodeLauncher, NodeAdapter, NodeBuilder, NodeConfig, NodeHandle}; +use reth_node_ethereum::node::{EthereumAddOns, EthereumEngineValidatorBuilder}; +use reth_node_ethereum::{ + BasicBlockExecutorProvider, EthEvmConfig, EthExecutionStrategyFactory, EthereumNode, +}; +use reth_primitives::{Transaction, TransactionSigned}; +use reth_provider::providers::BlockchainProvider2; +use reth_rpc::EthApi; +use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{ + BitfinityTransactionsForwarder, SharedQueue, TransactionsPriorityQueue, +}; +use reth_tasks::TaskManager; +use reth_transaction_pool::blobstore::DiskFileBlobStore; +use reth_transaction_pool::test_utils::MockTransaction; +use reth_transaction_pool::{ + CoinbaseTipOrdering, EthPooledTransaction, EthTransactionValidator, Pool, + TransactionValidationTaskExecutor, +}; +use revm_primitives::{Address, B256, U256}; +use std::time::Duration; +use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; + +#[tokio::test] +async fn bitfinity_test_should_start_local_reth_node() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + let (reth_client, _reth_node) = start_reth_node(&tasks, None, None, None).await; + + // Act & Assert + assert!(reth_client.get_chain_id().await.is_ok()); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::default(); + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; + + // Act + let result = reth_client.get_last_certified_block().await; + + // Assert + assert!(result.is_ok()); + + // Try with `eth_getLastCertifiedBlock` alias + let result: CertifiedResult> = reth_client + .single_request( + "eth_getLastCertifiedBlock".to_owned(), + ethereum_json_rpc_client::Params::None, + ethereum_json_rpc_client::Id::Num(1), + ) + .await + .unwrap(); + + assert_eq!(result.certificate, vec![1u8, 3, 11]); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_get_gas_price_requests() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::default(); + let gas_price = eth_server.gas_price; + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; + + // Act + let gas_price_result = reth_client.gas_price().await; + + // Assert + assert_eq!(gas_price_result.unwrap(), gas_price.into()); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::default(); + let max_priority_fee_per_gas = eth_server.max_priority_fee_per_gas; + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; + + // Act + let result = reth_client.max_priority_fee_per_gas().await; + + // Assert + assert_eq!(result.unwrap(), max_priority_fee_per_gas.into()); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_eth_get_genesis_balances() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::default(); + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; + + // Act + let result: Vec<(did::H160, did::U256)> = reth_client + .single_request( + "eth_getGenesisBalances".to_owned(), + ethereum_json_rpc_client::Params::None, + ethereum_json_rpc_client::Id::Num(1), + ) + .await + .unwrap(); + + // Assert + assert_eq!(result.len(), 3); + + assert_eq!(result[0].0, Address::from_slice(&[1u8; 20]).into()); + assert_eq!(result[0].1, U256::from(10).into()); + + assert_eq!(result[1].0, Address::from_slice(&[2u8; 20]).into()); + assert_eq!(result[1].1, U256::from(20).into()); + + assert_eq!(result[2].0, Address::from_slice(&[3u8; 20]).into()); + assert_eq!(result[2].1, U256::from(30).into()); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_ic_get_genesis_balances() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::default(); + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; + + // Act + let result = reth_client.get_genesis_balances().await.unwrap(); + + // Assert + assert_eq!(result.len(), 3); + + assert_eq!(result[0].0, Address::from_slice(&[1u8; 20]).into()); + assert_eq!(result[0].1, U256::from(10).into()); + + assert_eq!(result[1].0, Address::from_slice(&[2u8; 20]).into()); + assert_eq!(result[1].1, U256::from(20).into()); + + assert_eq!(result[2].0, Address::from_slice(&[3u8; 20]).into()); + assert_eq!(result[2].1, U256::from(30).into()); +} + +#[tokio::test] +async fn bitfinity_test_node_forward_send_raw_transaction_requests() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); + + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = start_reth_node( + &tasks, + Some(format!("http://{}", eth_server_address)), + None, + Some(queue.clone()), + ) + .await; + + // Create a random transaction + let tx = transaction_with_gas_price(100); + let encoded = alloy_rlp::encode(&tx); + let expected_tx_hash = keccak::keccak_hash(&encoded); + + // Act + let result = reth_client.send_raw_transaction_bytes(&encoded).await.unwrap(); + + // Assert + assert_eq!(result, expected_tx_hash); + + let transaction_sending = BitfinityTransactionSender::new( + queue, + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + let received_txs = consume_received_txs(&mut tx_receiver, 1).await.unwrap(); + + assert_eq!(received_txs[0], expected_tx_hash.0); +} + +#[tokio::test] +async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); + + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; + + const TXS_NUMBER: usize = 100; + + // Create a random transactions + let transactions = (1..=TXS_NUMBER) + .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) + .collect::>(); + + // Only highest price transactions should be sent. + let expected_hashes = + transactions.iter().rev().take(10).map(|tx| keccak::keccak_hash(tx)).collect::>(); + + // Act + for tx in &transactions { + reth_client.send_raw_transaction_bytes(tx).await.unwrap(); + } + + let transaction_sending = BitfinityTransactionSender::new( + queue.clone(), + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + let received_txs = consume_received_txs(&mut tx_receiver, 10).await.unwrap(); + + // Check all queued transactions sent. + assert!(queue.lock().await.is_empty()); + + for expected_hash in expected_hashes.iter().rev() { + assert!(received_txs.contains(&expected_hash.0)); + } +} + +#[tokio::test] +async fn bitfinity_test_node_get_transaction_when_it_is_queued() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::new(None); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); + + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; + + const TXS_NUMBER: usize = 10; + + // Create a random transactions + let transactions = (1..=TXS_NUMBER) + .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) + .collect::>(); + + let expected_hashes = transactions.iter().map(|tx| keccak::keccak_hash(tx)).collect::>(); + + // Act + for (tx, expected_hash) in transactions.iter().zip(expected_hashes.iter()) { + let hash = reth_client.send_raw_transaction_bytes(tx).await.unwrap(); + assert_eq!(hash, *expected_hash); + } + + for hash in &expected_hashes { + let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); + // Transaction in forwarder has NO block number. + dbg!(tx.block_number); + assert!(tx.block_number.is_none()); + } + + let transaction_sending = BitfinityTransactionSender::new( + queue, + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + for hash in &expected_hashes { + let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); + // Transaction in mock has block number. + assert!(tx.block_number.is_some()); + } +} + +/// Waits until `n` transactions appear in `received_txs` with one second timeout. +/// Returns true if `received_txs` contains at least `n` transactions. +async fn consume_received_txs(received_txs: &mut Receiver, n: usize) -> Option> { + let wait_future = async { + let mut txs = Vec::with_capacity(n); + while txs.len() < n { + let tx = received_txs.recv().await.unwrap(); + txs.push(tx); + } + txs + }; + + let wait_result = tokio::time::timeout(Duration::from_secs(3), wait_future).await; + wait_result.ok() +} + +fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned { + let mock = MockTransaction::legacy().with_gas_price(gas_price); + let transaction: Transaction = mock.into(); + + sign_tx_with_random_key_pair(transaction) +} + +fn sign_tx_with_random_key_pair(tx: Transaction) -> TransactionSigned { + let secp = Secp256k1::new(); + let key_pair = Keypair::new(&secp, &mut rand::thread_rng()); + sign_tx_with_key_pair(key_pair, tx) +} + +fn sign_tx_with_key_pair(key_pair: Keypair, tx: Transaction) -> TransactionSigned { + let signature = reth_primitives::sign_message( + B256::from_slice(&key_pair.secret_bytes()[..]), + tx.signature_hash(), + ) + .unwrap(); + TransactionSigned::new(tx, signature, Default::default()) +} + +/// Start a local reth node +async fn start_reth_node( + tasks: &TaskManager, + bitfinity_evm_url: Option, + import_data: Option, + queue: Option, +) -> ( + EthJsonRpcClient, + NodeHandle< + NodeAdapter< + FullNodeTypesAdapter< + EthereumNode, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, + >, + Components< + FullNodeTypesAdapter< + EthereumNode, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, + >, + reth_network::EthNetworkPrimitives, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + EthEvmConfig, + BasicBlockExecutorProvider, + Arc, + >, + >, + RpcAddOns< + NodeAdapter< + FullNodeTypesAdapter< + EthereumNode, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, + >, + Components< + FullNodeTypesAdapter< + EthereumNode, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, + >, + reth_network::EthNetworkPrimitives, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + EthEvmConfig, + BasicBlockExecutorProvider, + Arc, + >, + >, + EthApi< + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + NetworkHandle, + EthEvmConfig, + >, + EthereumEngineValidatorBuilder, + >, + >, +) { + // create node config + let mut node_config = + NodeConfig::test().dev().with_rpc(RpcServerArgs::default().with_http()).with_unused_ports(); + node_config.dev.dev = false; + + let mut chain = node_config.chain.as_ref().clone(); + chain.bitfinity_evm_url = bitfinity_evm_url.clone(); + let mut node_config = node_config.with_chain(chain); + + let database = if let Some(import_data) = import_data { + let data_dir = MaybePlatformPath::::from_str( + import_data.data_dir.data_dir().to_str().unwrap(), + ) + .unwrap(); + let mut data_dir_args = node_config.datadir.clone(); + data_dir_args.datadir = data_dir; + data_dir_args.static_files_path = Some(import_data.data_dir.static_files()); + node_config = node_config.with_datadir_args(data_dir_args); + node_config = node_config.with_chain(import_data.chain.clone()); + import_data.database + } else { + let path = MaybePlatformPath::::from(tempdir_path()); + node_config = node_config + .with_datadir_args(DatadirArgs { datadir: path.clone(), ..Default::default() }); + let data_dir = + path.unwrap_or_chain_default(node_config.chain.chain, node_config.datadir.clone()); + Arc::new(init_db(data_dir.db(), Default::default()).unwrap()) + }; + + let exec = tasks.executor(); + let node_handle = NodeBuilder::new(node_config) + .testing_node(exec) + .with_types_and_provider::>() + .with_components(EthereumNode::components()) + .with_add_ons(EthereumAddOns::default()) + .on_rpc_started(|ctx, _| { + // Add custom forwarder with transactions priority queue. + let Some(queue) = queue else { return Ok(()) }; + let forwarder = BitfinityTransactionsForwarder::new(queue); + ctx.registry.eth_api().set_bitfinity_tx_forwarder(forwarder); + Ok(()) + }) + .launch_with_fn(|builder| { + let launcher = EngineNodeLauncher::new( + builder.task_executor().clone(), + builder.config().datadir(), + TreeConfig::default(), + ); + builder.launch_with(launcher) + }) + .await + .unwrap(); + + let reth_address = node_handle.node.rpc_server_handle().http_local_addr().unwrap(); + let addr_string = format!("http://{}", reth_address); + + let client: EthJsonRpcClient = + EthJsonRpcClient::new(ReqwestClient::new(addr_string)); + + (client, node_handle) +} + +/// Start a local Eth server. +/// Reth requests will be forwarded to this server +async fn mock_eth_server_start(methods: impl Into) -> (ServerHandle, SocketAddr) { + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let server = Server::builder().build(addr).await.unwrap(); + + let mut module = RpcModule::new(()); + module.merge(methods).unwrap(); + + let server_address = server.local_addr().unwrap(); + let handle = server.start(module); + + (handle, server_address) +} + +/// Eth server mock for local testing +pub mod eth_server { + + use alloy_consensus::{Signed, TxEnvelope, TxLegacy}; + use alloy_rlp::{Bytes, Decodable}; + use alloy_rpc_types::Transaction; + use ethereum_json_rpc_client::CertifiedResult; + use jsonrpsee::{core::RpcResult, proc_macros::rpc}; + use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; + use reth_primitives::{sign_message, TransactionSigned}; + use revm_primitives::{hex, Address, B256, U256}; + use tokio::sync::{mpsc::Sender, Mutex}; + + #[rpc(server, namespace = "eth")] + pub trait Eth { + /// Returns the current gas price. + #[method(name = "gasPrice")] + async fn gas_price(&self) -> RpcResult; + + /// Returns the current max priority fee per gas. + #[method(name = "maxPriorityFeePerGas")] + async fn max_priority_fee_per_gas(&self) -> RpcResult; + + /// Sends a raw transaction. + #[method(name = "sendRawTransaction")] + async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; + + /// Returns transaction by hash. + #[method(name = "getTransactionByHash")] + async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult>; + + /// Returns the genesis balances. + #[method(name = "getGenesisBalances", aliases = ["ic_getGenesisBalances"])] + async fn get_genesis_balances(&self) -> RpcResult>; + + /// Returns the last certified block. + #[method(name = "getLastCertifiedBlock", aliases = ["ic_getLastCertifiedBlock"])] + async fn get_last_certified_block( + &self, + ) -> RpcResult>>; + } + + /// Eth server implementation for local testing + #[derive(Debug)] + pub struct EthImpl { + /// Current gas price + pub gas_price: u128, + + /// Current max priority fee per gas + pub max_priority_fee_per_gas: u128, + + /// List of received transactions. + pub received_txs: Mutex>, + + /// The mock will send transactions to the sender, if present. + pub txs_sender: Option>, + } + + impl EthImpl { + /// Create a new Eth server implementation + pub fn new(txs_sender: Option>) -> Self { + Self { + gas_price: rand::random(), + max_priority_fee_per_gas: rand::random(), + received_txs: Mutex::default(), + txs_sender, + } + } + } + + impl Default for EthImpl { + fn default() -> Self { + Self::new(None) + } + } + + #[async_trait::async_trait] + impl EthServer for EthImpl { + async fn gas_price(&self) -> RpcResult { + Ok(U256::from(self.gas_price)) + } + + async fn max_priority_fee_per_gas(&self) -> RpcResult { + Ok(U256::from(self.max_priority_fee_per_gas)) + } + + async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { + let decoded = hex::decode(&tx).unwrap(); + let tx = TransactionSigned::decode(&mut decoded.as_ref()).unwrap(); + let hash = tx.hash(); + self.received_txs.lock().await.push(hash); + if let Some(sender) = &self.txs_sender { + sender.send(hash).await.unwrap(); + } + Ok(hash) + } + + async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult> { + if !self.received_txs.lock().await.contains(&hash) { + return Ok(None); + } + + // If tx present, ruturn it with some block number. + let legacy = TxLegacy { + nonce: 42, + to: revm_primitives::TxKind::Create, + value: Default::default(), + gas_price: Default::default(), + input: Default::default(), + chain_id: Default::default(), + gas_limit: Default::default(), + }; + + let key_pair = Keypair::new(&Secp256k1::new(), &mut rand::thread_rng()); + let signature = + sign_message(B256::from_slice(&key_pair.secret_bytes()[..]), hash).unwrap(); + let envelope = TxEnvelope::Legacy(Signed::new_unchecked(legacy, signature, hash)); + let tx = Transaction { + block_hash: Some(B256::random()), + block_number: Some(42), + transaction_index: Some(42), + from: Address::random(), + effective_gas_price: None, + inner: envelope, + }; + + Ok(Some(tx)) + } + + async fn get_genesis_balances(&self) -> RpcResult> { + Ok(vec![ + (Address::from_slice(&[1u8; 20]), U256::from(10)), + (Address::from_slice(&[2u8; 20]), U256::from(20)), + (Address::from_slice(&[3u8; 20]), U256::from(30)), + ]) + } + + async fn get_last_certified_block( + &self, + ) -> RpcResult>> { + Ok(CertifiedResult { + data: Default::default(), + witness: vec![], + certificate: vec![1u8, 3, 11], + }) + } + } +} diff --git a/bin/reth/tests/commands/mod.rs b/bin/reth/tests/commands/mod.rs index ebff8c18ac8..08fc91702e8 100644 --- a/bin/reth/tests/commands/mod.rs +++ b/bin/reth/tests/commands/mod.rs @@ -2,6 +2,9 @@ //! bitfinity integration tests //! pub mod bitfinity_import_it; +pub mod bitfinity_import_it2; pub mod bitfinity_node_it; +pub mod bitfinity_node_it2; pub mod bitfinity_reset_evm_state_it; pub mod utils; +pub mod utils2; diff --git a/bin/reth/tests/commands/utils2.rs b/bin/reth/tests/commands/utils2.rs new file mode 100644 index 00000000000..f49b439c7af --- /dev/null +++ b/bin/reth/tests/commands/utils2.rs @@ -0,0 +1,300 @@ +//! +//! Utils for bitfinity integration tests +//! +use std::{ + fmt::{Debug, Display, Formatter}, + path::PathBuf, + str::FromStr, + sync::Arc, + time, +}; + +use alloy_eips::eip7685::Requests; +use alloy_primitives::BlockNumber; +use lightspeed_scheduler::JobExecutor; +use parking_lot::Mutex; +use reth::{ + args::{BitfinityImportArgs, IC_MAINNET_KEY}, + commands::bitfinity_import2::BitfinityImportCommand, + dirs::{ChainPath, DataDirPath, PlatformPath}, +}; +use reth_chainspec::ChainSpec; +use reth_db::{init_db, DatabaseEnv}; +use reth_downloaders::bitfinity_evm_client::BitfinityEvmClient; +use reth_errors::BlockExecutionError; +use reth_evm::execute::{BatchExecutor, BlockExecutionOutput, BlockExecutorProvider, Executor}; +use reth_node_api::NodeTypesWithDBAdapter; +use reth_node_ethereum::EthereumNode; +use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; +use reth_provider::{ + providers::{BlockchainProvider2, StaticFileProvider}, + BlockNumReader, ExecutionOutcome, ProviderError, ProviderFactory, +}; +use reth_prune::PruneModes; +use reth_tracing::{FileWorkerGuard, LayerInfo, LogFormat, RethTracer, Tracer}; +use revm_primitives::db::Database; +use tempfile::TempDir; +use tracing::{debug, info}; + +/// Local EVM canister ID for testing. +pub const LOCAL_EVM_CANISTER_ID: &str = "bkyz2-fmaaa-aaaaa-qaaaq-cai"; +/// EVM block extractor for devnet running on Digital Ocean. +pub const DEFAULT_EVM_DATASOURCE_URL: &str = + "https://block-extractor-testnet-1052151659755.europe-west9.run.app"; + +/// Initializes the logs for the tests. +pub fn init_logs() -> eyre::Result> { + let mut tracer = RethTracer::new(); + let stdout = LayerInfo::new( + LogFormat::Terminal, + "info".to_string(), + String::new(), + Some("always".to_string()), + ); + tracer = tracer.with_stdout(stdout); + + let guard = tracer.init()?; + Ok(guard) +} + +/// Type alias for the node types. +pub type NodeTypes = NodeTypesWithDBAdapter>; + +#[derive(Clone)] +/// Data needed for the import tests. +pub struct ImportData { + /// The chain spec. + pub chain: Arc, + /// The data directory. + pub data_dir: ChainPath, + /// The database. + pub database: Arc, + /// The provider factory. + pub provider_factory: ProviderFactory, + /// The blockchain provider. + pub blockchain_db: BlockchainProvider2, + /// The bitfinity import arguments. + pub bitfinity_args: BitfinityImportArgs, +} + +impl Debug for ImportData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ImportTempData").field("chain", &self.chain).finish() + } +} + +/// Imports blocks from the EVM node. +pub async fn import_blocks( + import_data: ImportData, + timeout: time::Duration, + shutdown_when_done: bool, +) -> JobExecutor { + let end_block = import_data.bitfinity_args.end_block.unwrap(); + let import = BitfinityImportCommand::new( + None, + import_data.data_dir, + import_data.chain, + import_data.bitfinity_args, + import_data.provider_factory.clone(), + import_data.blockchain_db, + ); + let (job_executor, _import_handle) = import.schedule_execution(None).await.unwrap(); + wait_until_local_block_imported(&import_data.provider_factory, end_block, timeout).await; + + if shutdown_when_done { + debug!("Stopping job executor"); + job_executor.stop(true).await.unwrap(); + debug!("Job executor stopped"); + } + + job_executor +} + +/// Initializes the database and the blockchain tree for the bitfinity import tests. +/// If a `data_dir` is provided, it will be used, otherwise a temporary directory will be created. +pub async fn bitfinity_import_config_data( + evm_datasource_url: &str, + backup_evm_datasource_url: Option, + data_dir: Option, +) -> eyre::Result<(TempDir, ImportData)> { + let chain = Arc::new( + BitfinityEvmClient::fetch_chain_spec_with_fallback( + evm_datasource_url.to_owned(), + backup_evm_datasource_url.clone(), + ) + .await?, + ); + + let temp_dir = TempDir::new().unwrap(); + + let data_dir = data_dir.unwrap_or_else(|| temp_dir.path().to_path_buf()); + let data_dir: PlatformPath = + PlatformPath::from_str(data_dir.as_os_str().to_str().unwrap())?; + let data_dir = ChainPath::new(data_dir, chain.chain, Default::default()); + + let db_path = data_dir.db(); + + let database = Arc::new(init_db(db_path, Default::default())?); + let provider_factory = ProviderFactory::new( + database.clone(), + chain.clone(), + StaticFileProvider::read_write(data_dir.static_files())?, + ); + + reth_db_common::init::init_genesis(&provider_factory)?; + + let blockchain_db = BlockchainProvider2::new(provider_factory.clone())?; + + let bitfinity_args = BitfinityImportArgs { + rpc_url: evm_datasource_url.to_string(), + send_raw_transaction_rpc_url: None, + end_block: Some(100), + import_interval: 1, + batch_size: 1000, + max_fetch_blocks: 10000, + evmc_principal: LOCAL_EVM_CANISTER_ID.to_string(), + ic_root_key: IC_MAINNET_KEY.to_string(), + backup_rpc_url: backup_evm_datasource_url, + max_retries: 3, + retry_delay_secs: 3, + tx_queue: false, + tx_queue_size: 1000, + send_queued_txs_period_secs: 3, + queued_txs_batch_size: 10, + queued_txs_per_execution_threshold: 100, + }; + + Ok(( + temp_dir, + ImportData { data_dir, database, chain, provider_factory, blockchain_db, bitfinity_args }, + )) +} + +/// Waits until the block is imported. +pub async fn wait_until_local_block_imported( + provider_factory: &ProviderFactory, + block: BlockNumber, + timeout: time::Duration, +) { + let now = std::time::Instant::now(); + loop { + let provider = provider_factory.provider().unwrap(); + let last_block = provider.last_block_number().unwrap(); + if last_block == block { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + assert!(now.elapsed() <= timeout, "Timeout waiting for the last block to be imported. Waiting for block: {} but last block found was {}", block, last_block) + } +} + +/// Returns the local dfx port. +pub fn get_dfx_local_port() -> u16 { + use std::process::Command; + let output = Command::new("dfx") + .arg("info") + .arg("replica-port") + .output() + .expect("failed to execute process"); + + let port = String::from_utf8_lossy(&output.stdout); + info!("dfx port: {}", port); + u16::from_str(port.trim()).unwrap() +} + +/// A [`BlockExecutorProvider`] that returns mocked execution results. +/// Original code taken from ./`crates/evm/src/test_utils.rs` +#[derive(Clone, Debug, Default)] +struct MockExecutorProvider { + exec_results: Arc>>, +} + +impl BlockExecutorProvider for MockExecutorProvider { + type Executor + Display>> = Self; + + type BatchExecutor + Display>> = Self; + + fn executor(&self, _: DB) -> Self::Executor + where + DB: Database + Display>, + { + self.clone() + } + + fn batch_executor(&self, _: DB) -> Self::BatchExecutor + where + DB: Database + Display>, + { + self.clone() + } + + type Primitives = EthPrimitives; +} + +impl Executor for MockExecutorProvider { + type Input<'a> = &'a BlockWithSenders; + type Output = BlockExecutionOutput; + type Error = BlockExecutionError; + + fn execute(self, _: Self::Input<'_>) -> Result { + let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = + self.exec_results.lock().pop().unwrap(); + Ok(BlockExecutionOutput { + state: bundle, + receipts: receipts.into_iter().flatten().flatten().collect(), + requests: Requests::new(requests.into_iter().flatten().collect()), + gas_used: 0, + }) + } + + fn execute_with_state_closure( + self, + _input: Self::Input<'_>, + _state: F, + ) -> Result + where + F: FnMut(&reth_revm::State), + { + let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = + self.exec_results.lock().pop().unwrap(); + Ok(BlockExecutionOutput { + state: bundle, + receipts: receipts.into_iter().flatten().flatten().collect(), + requests: Requests::new(requests.into_iter().flatten().collect()), + gas_used: 0, + }) + } + + fn execute_with_state_hook( + self, + _input: Self::Input<'_>, + _state_hook: F, + ) -> Result + where + F: reth_evm::system_calls::OnStateHook + 'static, + { + todo!() + } +} + +impl BatchExecutor for MockExecutorProvider { + type Input<'a> = &'a BlockWithSenders; + type Output = ExecutionOutcome; + type Error = BlockExecutionError; + + fn execute_and_verify_one(&mut self, _: Self::Input<'_>) -> Result<(), Self::Error> { + Ok(()) + } + + fn finalize(self) -> Self::Output { + self.exec_results.lock().pop().unwrap() + } + + fn set_tip(&mut self, _: BlockNumber) {} + + fn size_hint(&self) -> Option { + None + } + + fn set_prune_modes(&mut self, _prune_modes: PruneModes) {} +} From 1967902b2081ec3b8a432b0332322120811ebdc6 Mon Sep 17 00:00:00 2001 From: f3kilo Date: Mon, 20 Jan 2025 09:32:19 +0300 Subject: [PATCH 2/2] support only new version --- bin/reth/src/commands/bitfinity_import.rs | 11 +- bin/reth/src/commands/bitfinity_import2.rs | 287 ------- bin/reth/src/commands/mod.rs | 1 - bin/reth/src/main.rs | 45 +- .../tests/commands/bitfinity_import_it.rs | 2 +- .../tests/commands/bitfinity_import_it2.rs | 136 ---- bin/reth/tests/commands/bitfinity_node_it.rs | 75 +- bin/reth/tests/commands/bitfinity_node_it2.rs | 723 ------------------ bin/reth/tests/commands/mod.rs | 3 - bin/reth/tests/commands/utils.rs | 18 +- bin/reth/tests/commands/utils2.rs | 300 -------- 11 files changed, 66 insertions(+), 1535 deletions(-) delete mode 100644 bin/reth/src/commands/bitfinity_import2.rs delete mode 100644 bin/reth/tests/commands/bitfinity_import_it2.rs delete mode 100644 bin/reth/tests/commands/bitfinity_node_it2.rs delete mode 100644 bin/reth/tests/commands/utils2.rs diff --git a/bin/reth/src/commands/bitfinity_import.rs b/bin/reth/src/commands/bitfinity_import.rs index 55f9f9d9a41..0721266f2ed 100644 --- a/bin/reth/src/commands/bitfinity_import.rs +++ b/bin/reth/src/commands/bitfinity_import.rs @@ -21,14 +21,16 @@ use reth_node_core::{args::BitfinityImportArgs, dirs::ChainPath}; use reth_node_ethereum::{EthExecutorProvider, EthereumNode}; use reth_node_events::node::NodeEvent; use reth_primitives::{EthPrimitives, SealedHeader}; -use reth_provider::providers::BlockchainProvider; +use reth_provider::providers::BlockchainProvider2; use reth_provider::{ BlockNumReader, CanonChainTracker, ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, ProviderError, ProviderFactory, }; use reth_prune::PruneModes; use reth_stages::{ - prelude::*, stages::{ExecutionStage, SenderRecoveryStage}, ExecutionStageThresholds, Pipeline, StageSet + prelude::*, + stages::{ExecutionStage, SenderRecoveryStage}, + ExecutionStageThresholds, Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc, time::Duration}; @@ -52,7 +54,8 @@ pub struct BitfinityImportCommand { provider_factory: ProviderFactory>>, - blockchain_provider: BlockchainProvider>>, + blockchain_provider: + BlockchainProvider2>>, } /// Manually implement `Debug` for `ImportCommand` because `BlockchainProvider` doesn't implement it. @@ -77,7 +80,7 @@ impl BitfinityImportCommand { chain: Arc, bitfinity: BitfinityImportArgs, provider_factory: ProviderFactory>>, - blockchain_provider: BlockchainProvider< + blockchain_provider: BlockchainProvider2< NodeTypesWithDBAdapter>, >, ) -> Self { diff --git a/bin/reth/src/commands/bitfinity_import2.rs b/bin/reth/src/commands/bitfinity_import2.rs deleted file mode 100644 index 0721266f2ed..00000000000 --- a/bin/reth/src/commands/bitfinity_import2.rs +++ /dev/null @@ -1,287 +0,0 @@ -//! Command that initializes the node by importing a chain from a remote EVM node. - -use crate::{dirs::DataDirPath, version::SHORT_VERSION}; -use futures::{Stream, StreamExt}; -use lightspeed_scheduler::{job::Job, scheduler::Scheduler, JobExecutor}; -use reth_beacon_consensus::EthBeaconConsensus; -use reth_chainspec::ChainSpec; -use reth_config::{config::EtlConfig, Config}; -use reth_db::DatabaseEnv; - -use alloy_primitives::B256; -use reth_consensus::Consensus; -use reth_downloaders::{ - bitfinity_evm_client::{BitfinityEvmClient, CertificateCheckSettings, RpcClientConfig}, - bodies::bodies::BodiesDownloaderBuilder, - headers::reverse_headers::ReverseHeadersDownloaderBuilder, -}; -use reth_exex::ExExManagerHandle; -use reth_node_api::NodeTypesWithDBAdapter; -use reth_node_core::{args::BitfinityImportArgs, dirs::ChainPath}; -use reth_node_ethereum::{EthExecutorProvider, EthereumNode}; -use reth_node_events::node::NodeEvent; -use reth_primitives::{EthPrimitives, SealedHeader}; -use reth_provider::providers::BlockchainProvider2; -use reth_provider::{ - BlockNumReader, CanonChainTracker, ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, - ProviderError, ProviderFactory, -}; -use reth_prune::PruneModes; -use reth_stages::{ - prelude::*, - stages::{ExecutionStage, SenderRecoveryStage}, - ExecutionStageThresholds, Pipeline, StageSet, -}; -use reth_static_file::StaticFileProducer; -use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::sync::watch; -use tracing::{debug, info}; - -/// Syncs RLP encoded blocks from a file. -#[derive(Clone)] -pub struct BitfinityImportCommand { - config: Config, - - datadir: ChainPath, - - /// The chain this node is running. - /// - /// Possible values are either a built-in chain or the path to a chain specification file. - chain: Arc, - - /// Bitfinity Related Args - bitfinity: BitfinityImportArgs, - - provider_factory: ProviderFactory>>, - - blockchain_provider: - BlockchainProvider2>>, -} - -/// Manually implement `Debug` for `ImportCommand` because `BlockchainProvider` doesn't implement it. -impl std::fmt::Debug for BitfinityImportCommand { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ImportCommand") - .field("config", &self.config) - .field("datadir", &self.datadir) - .field("chain", &self.chain) - .field("bitfinity", &self.bitfinity) - .finish() - } -} - -type TypedPipeline = Pipeline>>; - -impl BitfinityImportCommand { - /// Create a new `ImportCommand` with the given arguments. - pub fn new( - config: Option, - datadir: ChainPath, - chain: Arc, - bitfinity: BitfinityImportArgs, - provider_factory: ProviderFactory>>, - blockchain_provider: BlockchainProvider2< - NodeTypesWithDBAdapter>, - >, - ) -> Self { - // add network name to data dir - let config_path = config.unwrap_or_else(|| datadir.config()); - - info!(target: "reth::cli - BitfinityImportCommand", path = ?config_path, "Configuration loaded"); - let mut config = Config::from_path(config_path) - .expect("Failed to load BitfinityImportCommand configuration"); - - // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to - if config.stages.etl.dir.is_none() { - config.stages.etl.dir = Some(EtlConfig::from_datadir(datadir.data_dir())); - } - - Self { config, datadir, chain, bitfinity, provider_factory, blockchain_provider } - } - - /// Schedule the import job and return a handle to it. - pub async fn schedule_execution( - self, - job_executor: Option, - ) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> { - info!(target: "reth::cli - BitfinityImportCommand", "reth {} starting", SHORT_VERSION); - - let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz); - - // Schedule the import job - { - let interval = Duration::from_secs(self.bitfinity.import_interval); - job_executor - .add_job_with_scheduler( - Scheduler::Interval { interval_duration: interval, execute_at_startup: true }, - Job::new("import", "block importer", None, move || { - let import = self.clone(); - Box::pin(async move { - import.single_execution().await?; - import.update_chain_info()?; - Ok(()) - }) - }), - ) - .await; - } - - let job_handle = job_executor.run().await?; - Ok((job_executor, job_handle)) - } - - /// Execute the import job. - async fn single_execution(&self) -> eyre::Result<()> { - let consensus = Arc::new(EthBeaconConsensus::new(self.chain.clone())); - debug!(target: "reth::cli - BitfinityImportCommand", "Consensus engine initialized"); - let provider_factory = self.provider_factory.clone(); - - // Get the local block number - let start_block = provider_factory.provider()?.last_block_number()? + 1; - - debug!(target: "reth::cli - BitfinityImportCommand", "Starting block: {}", start_block); - - let rpc_config = RpcClientConfig { - primary_url: self.bitfinity.rpc_url.clone(), - backup_url: self.bitfinity.backup_rpc_url.clone(), - max_retries: self.bitfinity.max_retries, - retry_delay: Duration::from_secs(self.bitfinity.retry_delay_secs), - }; - - let remote_client = Arc::new( - BitfinityEvmClient::from_rpc_url( - rpc_config, - start_block, - self.bitfinity.end_block, - self.bitfinity.batch_size, - self.bitfinity.max_fetch_blocks, - Some(CertificateCheckSettings { - evmc_principal: self.bitfinity.evmc_principal.clone(), - ic_root_key: self.bitfinity.ic_root_key.clone(), - }), - ) - .await?, - ); - - // override the tip - let tip = if let Some(tip) = remote_client.tip() { - tip - } else { - debug!(target: "reth::cli - BitfinityImportCommand", "No tip found, skipping import"); - return Ok(()); - }; - - info!(target: "reth::cli - BitfinityImportCommand", "Chain blocks imported"); - - let (mut pipeline, _events) = self.build_import_pipeline( - &self.config, - provider_factory.clone(), - &consensus, - remote_client, - StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), - )?; - - // override the tip - pipeline.set_tip(tip); - debug!(target: "reth::cli - BitfinityImportCommand", ?tip, "Tip manually set"); - - // Run pipeline - debug!(target: "reth::cli - BitfinityImportCommand", "Starting sync pipeline"); - pipeline.run().await?; - - info!(target: "reth::cli - BitfinityImportCommand", "Finishing up"); - Ok(()) - } - - /// Update the chain info tracker with the latest header from the database. - fn update_chain_info(&self) -> eyre::Result<()> { - let provider = self.blockchain_provider.database_provider_ro()?; - let chain_info = provider.chain_info()?; - - match provider.header_by_number(chain_info.best_number)? { - Some(header) => { - let sealed_header = header.seal(chain_info.best_hash); - let hash = sealed_header.seal(); - let sealed_header = SealedHeader::new(sealed_header.into_inner(), hash); - self.blockchain_provider.set_canonical_head(sealed_header.clone()); - self.blockchain_provider.set_finalized(sealed_header.clone()); - self.blockchain_provider.set_safe(sealed_header); - Ok(()) - } - None => Err(ProviderError::HeaderNotFound(chain_info.best_number.into()))?, - } - } - - fn build_import_pipeline( - &self, - config: &Config, - provider_factory: ProviderFactory>>, - consensus: &Arc, - remote_client: Arc, - static_file_producer: StaticFileProducer< - ProviderFactory>>, - >, - ) -> eyre::Result<(TypedPipeline, impl Stream>)> - where - C: Consensus + 'static, - { - if !remote_client.has_canonical_blocks() { - eyre::bail!("unable to import non canonical blocks"); - } - - let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) - .build(remote_client.clone(), consensus.clone()) - .into_task(); - - let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) - .build(remote_client.clone(), consensus.clone(), provider_factory.clone()) - .into_task(); - - let (tip_tx, tip_rx) = watch::channel(B256::ZERO); - let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec()); - - let max_block = remote_client.max_block().unwrap_or(0); - let pipeline = - Pipeline::>>::builder() - .with_tip_sender(tip_tx) - // we want to sync all blocks the file client provides or 0 if empty - .with_max_block(max_block) - .add_stages( - DefaultStages::new( - provider_factory.clone(), - tip_rx, - consensus.clone(), - header_downloader, - body_downloader, - executor.clone(), - config.stages.clone(), - PruneModes::default(), - ) - .set(SenderRecoveryStage { - commit_threshold: config.stages.sender_recovery.commit_threshold, - }) - .set(ExecutionStage::new( - executor, - ExecutionStageThresholds { - max_blocks: config.stages.execution.max_blocks, - max_changes: config.stages.execution.max_changes, - max_cumulative_gas: config.stages.execution.max_cumulative_gas, - max_duration: config.stages.execution.max_duration, - }, - config - .stages - .merkle - .clean_threshold - .max(config.stages.account_hashing.clean_threshold) - .max(config.stages.storage_hashing.clean_threshold), - config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), - ExExManagerHandle::empty(), - )), - ) - .build(provider_factory, static_file_producer); - - let events = pipeline.events().map(Into::into); - - Ok((pipeline, events)) - } -} diff --git a/bin/reth/src/commands/mod.rs b/bin/reth/src/commands/mod.rs index 1ab0e8cd38c..9380056c8b1 100644 --- a/bin/reth/src/commands/mod.rs +++ b/bin/reth/src/commands/mod.rs @@ -1,6 +1,5 @@ //! This contains all of the `reth` commands pub mod bitfinity_import; -pub mod bitfinity_import2; pub mod bitfinity_reset_evm_state; pub mod debug_cmd; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 2af41212866..73b810d0208 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -6,7 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne use std::{sync::Arc, time::Duration}; use clap::{Args, Parser}; -use reth::{bitfinity_tasks::send_txs::BitfinityTransactionSender, commands::bitfinity_import2::{self, BitfinityImportCommand}}; +use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_node_builder::{ engine_tree_config::{ @@ -105,7 +105,7 @@ fn main() { // Init bitfinity import let executor = { - let import = bitfinity_import2::BitfinityImportCommand::new( + let import = BitfinityImportCommand::new( config, datadir, chain, @@ -142,47 +142,6 @@ fn main() { true => { info!(target: "reth::cli", "Running with legacy engine"); let handle = builder.launch_node(EthereumNode::default()).await?; - let blockchain_provider = handle.node.provider.clone(); - let config = handle.node.config.config.clone(); - let chain = handle.node.chain_spec(); - let datadir = handle.node.data_dir.clone(); - let (provider_factory, bitfinity) = handle.bitfinity_import.clone().expect("Bitfinity import not configured"); - - - // Init bitfinity import - let executor = { - let import = BitfinityImportCommand::new( - config, - datadir, - chain, - bitfinity.clone(), - provider_factory, - blockchain_provider, - ); - let (executor, _import_handle) = import.schedule_execution(None).await?; - executor - }; - - if bitfinity.tx_queue { - let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(1000))); - - // Make EthApi handler move new txs to queue. - let queue_clone = Arc::clone(&queue); - handle.node.add_ons_handle.eth_api().set_bitfinity_tx_forwarder(BitfinityTransactionsForwarder::new(queue_clone)); - - // Run batch transaction sender. - let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url); - let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs); - let transaction_sending = BitfinityTransactionSender::new( - queue, - url, - period, - bitfinity.queued_txs_batch_size, - bitfinity.queued_txs_per_execution_threshold, - ); - let _sending_handle = transaction_sending.schedule_execution(Some(executor)).await?; - } - handle.node_exit_future.await } } diff --git a/bin/reth/tests/commands/bitfinity_import_it.rs b/bin/reth/tests/commands/bitfinity_import_it.rs index 9a173787a3e..2d967de33fd 100644 --- a/bin/reth/tests/commands/bitfinity_import_it.rs +++ b/bin/reth/tests/commands/bitfinity_import_it.rs @@ -1,5 +1,5 @@ //! -//! Integration tests for the bitfinity import command. +//! Integration tests for the bitfinity import command with BlockchainProvider2. //! These tests requires a running EVM node or EVM block extractor node at the specified URL. //! diff --git a/bin/reth/tests/commands/bitfinity_import_it2.rs b/bin/reth/tests/commands/bitfinity_import_it2.rs deleted file mode 100644 index 2d967de33fd..00000000000 --- a/bin/reth/tests/commands/bitfinity_import_it2.rs +++ /dev/null @@ -1,136 +0,0 @@ -//! -//! Integration tests for the bitfinity import command with BlockchainProvider2. -//! These tests requires a running EVM node or EVM block extractor node at the specified URL. -//! - -use super::utils::*; -use alloy_eips::BlockNumberOrTag; -use ethereum_json_rpc_client::{reqwest::ReqwestClient, EthJsonRpcClient}; -use reth_provider::{BlockNumReader, BlockReader, BlockReaderIdExt}; -use std::time::Duration; - -#[tokio::test] -async fn bitfinity_test_should_import_data_from_evm() { - // Arrange - let _log = init_logs(); - let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; - let (_temp_dir, mut import_data) = - bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); - - let end_block = 100; - import_data.bitfinity_args.end_block = Some(end_block); - import_data.bitfinity_args.batch_size = (end_block as usize) * 10; - - // Act - import_blocks(import_data.clone(), Duration::from_secs(20), false).await; - - // Assert - { - let provider = import_data.provider_factory.provider().unwrap(); - assert_eq!(end_block, provider.last_block_number().unwrap()); - - // create evm client - let evm_rpc_client = - EthJsonRpcClient::new(ReqwestClient::new(evm_datasource_url.to_string())); - - let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); - let local_block = provider.block_by_number(end_block).unwrap().unwrap(); - - assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); - assert_eq!(remote_block.state_root.0, local_block.state_root.0); - } -} - -#[tokio::test] -async fn bitfinity_test_should_import_with_small_batch_size() { - // Arrange - let _log = init_logs(); - let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; - let (_temp_dir, mut import_data) = - bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); - - let end_block = 101; - import_data.bitfinity_args.end_block = Some(end_block); - import_data.bitfinity_args.batch_size = 10; - - // Act - import_blocks(import_data.clone(), Duration::from_secs(20), false).await; - - // Assert - { - let provider = import_data.provider_factory.provider().unwrap(); - assert_eq!(end_block, provider.last_block_number().unwrap()); - - // create evm client - let evm_rpc_client = - EthJsonRpcClient::new(ReqwestClient::new(evm_datasource_url.to_string())); - - let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); - let local_block = provider.block_by_number(end_block).unwrap().unwrap(); - - assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); - assert_eq!(remote_block.state_root.0, local_block.state_root.0); - } -} - -#[tokio::test] -async fn bitfinity_test_finalized_and_safe_query_params_works() { - // Arrange - let _log = init_logs(); - let evm_datasource_url = DEFAULT_EVM_DATASOURCE_URL; - let (_temp_dir, mut import_data) = - bitfinity_import_config_data(evm_datasource_url, None, None).await.unwrap(); - - let end_block = 100; - import_data.bitfinity_args.end_block = Some(end_block); - import_data.bitfinity_args.batch_size = (end_block as usize) * 10; - - // Act - import_blocks(import_data.clone(), Duration::from_secs(20), true).await; - - let latest_block = import_data - .blockchain_db - .block_by_number_or_tag(BlockNumberOrTag::Finalized) - .unwrap() - .unwrap(); - assert_eq!(end_block, latest_block.number); - - let safe_block = - import_data.blockchain_db.block_by_number_or_tag(BlockNumberOrTag::Safe).unwrap().unwrap(); - assert_eq!(end_block, safe_block.number); -} - -#[tokio::test] -async fn bitfinity_test_should_import_data_from_evm_with_backup_rpc_url() { - // Arrange - let _log = init_logs(); - let evm_datasource_url = "https://fake_rpc_url"; - let backup_rpc_url = DEFAULT_EVM_DATASOURCE_URL; - - let (_temp_dir, mut import_data) = - bitfinity_import_config_data(evm_datasource_url, Some(backup_rpc_url.to_owned()), None) - .await - .unwrap(); - - let end_block = 100; - import_data.bitfinity_args.end_block = Some(end_block); - import_data.bitfinity_args.batch_size = (end_block as usize) * 10; - - // Act - import_blocks(import_data.clone(), Duration::from_secs(200), false).await; - - // Assert - { - let provider = import_data.provider_factory.provider().unwrap(); - assert_eq!(end_block, provider.last_block_number().unwrap()); - - // create evm client - let evm_rpc_client = EthJsonRpcClient::new(ReqwestClient::new(backup_rpc_url.to_string())); - - let remote_block = evm_rpc_client.get_block_by_number(end_block.into()).await.unwrap(); - let local_block = provider.block_by_number(end_block).unwrap().unwrap(); - - assert_eq!(remote_block.hash.0, local_block.header.hash_slow().0); - assert_eq!(remote_block.state_root.0, local_block.state_root.0); - } -} diff --git a/bin/reth/tests/commands/bitfinity_node_it.rs b/bin/reth/tests/commands/bitfinity_node_it.rs index d834a6429c6..83cb0a10187 100644 --- a/bin/reth/tests/commands/bitfinity_node_it.rs +++ b/bin/reth/tests/commands/bitfinity_node_it.rs @@ -1,5 +1,5 @@ //! -//! Integration tests for the bitfinity node command. +//! Integration tests for the bitfinity node command with BlockchainProvider2. //! use super::utils::*; @@ -17,20 +17,22 @@ use reth::{ dirs::{DataDirPath, MaybePlatformPath}, }; use reth_consensus::FullConsensus; +use reth_db::test_utils::TempDatabase; use reth_db::DatabaseEnv; use reth_db::{init_db, test_utils::tempdir_path}; use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; use reth_network::NetworkHandle; use reth_node_api::{FullNodeTypesAdapter, NodeTypesWithDBAdapter}; use reth_node_builder::components::Components; +use reth_node_builder::engine_tree_config::TreeConfig; use reth_node_builder::rpc::RpcAddOns; -use reth_node_builder::{NodeAdapter, NodeBuilder, NodeConfig, NodeHandle}; -use reth_node_ethereum::node::EthereumEngineValidatorBuilder; +use reth_node_builder::{EngineNodeLauncher, NodeAdapter, NodeBuilder, NodeConfig, NodeHandle}; +use reth_node_ethereum::node::{EthereumAddOns, EthereumEngineValidatorBuilder}; use reth_node_ethereum::{ BasicBlockExecutorProvider, EthEvmConfig, EthExecutionStrategyFactory, EthereumNode, }; use reth_primitives::{Transaction, TransactionSigned}; -use reth_provider::providers::BlockchainProvider; +use reth_provider::providers::BlockchainProvider2; use reth_rpc::EthApi; use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{ BitfinityTransactionsForwarder, SharedQueue, TransactionsPriorityQueue, @@ -396,21 +398,28 @@ async fn start_reth_node( NodeAdapter< FullNodeTypesAdapter< EthereumNode, - Arc, - BlockchainProvider>>, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, >, Components< FullNodeTypesAdapter< EthereumNode, - Arc, - BlockchainProvider>>, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, >, reth_network::EthNetworkPrimitives, Pool< TransactionValidationTaskExecutor< EthTransactionValidator< - BlockchainProvider< - NodeTypesWithDBAdapter>, + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, >, EthPooledTransaction, >, @@ -427,21 +436,28 @@ async fn start_reth_node( NodeAdapter< FullNodeTypesAdapter< EthereumNode, - Arc, - BlockchainProvider>>, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, >, Components< FullNodeTypesAdapter< EthereumNode, - Arc, - BlockchainProvider>>, + Arc>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, >, reth_network::EthNetworkPrimitives, Pool< TransactionValidationTaskExecutor< EthTransactionValidator< - BlockchainProvider< - NodeTypesWithDBAdapter>, + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, >, EthPooledTransaction, >, @@ -455,12 +471,17 @@ async fn start_reth_node( >, >, EthApi< - BlockchainProvider>>, + BlockchainProvider2< + NodeTypesWithDBAdapter>>, + >, Pool< TransactionValidationTaskExecutor< EthTransactionValidator< - BlockchainProvider< - NodeTypesWithDBAdapter>, + BlockchainProvider2< + NodeTypesWithDBAdapter< + EthereumNode, + Arc>, + >, >, EthPooledTransaction, >, @@ -504,10 +525,13 @@ async fn start_reth_node( Arc::new(init_db(data_dir.db(), Default::default()).unwrap()) }; + let exec = tasks.executor(); let node_handle = NodeBuilder::new(node_config) .with_database(database) - .with_launch_context(tasks.executor()) - .node(EthereumNode::default()) + .testing_node(exec) + .with_types_and_provider::>() + .with_components(EthereumNode::components()) + .with_add_ons(EthereumAddOns::default()) .on_rpc_started(|ctx, _| { // Add custom forwarder with transactions priority queue. let Some(queue) = queue else { return Ok(()) }; @@ -515,7 +539,14 @@ async fn start_reth_node( ctx.registry.eth_api().set_bitfinity_tx_forwarder(forwarder); Ok(()) }) - .launch() + .launch_with_fn(|builder| { + let launcher = EngineNodeLauncher::new( + builder.task_executor().clone(), + builder.config().datadir(), + TreeConfig::default(), + ); + builder.launch_with(launcher) + }) .await .unwrap(); diff --git a/bin/reth/tests/commands/bitfinity_node_it2.rs b/bin/reth/tests/commands/bitfinity_node_it2.rs deleted file mode 100644 index 0ffde13bdb9..00000000000 --- a/bin/reth/tests/commands/bitfinity_node_it2.rs +++ /dev/null @@ -1,723 +0,0 @@ -//! -//! Integration tests for the bitfinity node command with BlockchainProvider2. -//! - -use super::utils::*; -use did::keccak; -use eth_server::{EthImpl, EthServer}; -use ethereum_json_rpc_client::CertifiedResult; -use ethereum_json_rpc_client::{reqwest::ReqwestClient, EthJsonRpcClient}; -use jsonrpsee::{ - server::{Server, ServerHandle}, - Methods, RpcModule, -}; -use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; -use reth::{ - args::{DatadirArgs, RpcServerArgs}, - dirs::{DataDirPath, MaybePlatformPath}, -}; -use reth_consensus::FullConsensus; -use reth_db::test_utils::TempDatabase; -use reth_db::DatabaseEnv; -use reth_db::{init_db, test_utils::tempdir_path}; -use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; -use reth_network::NetworkHandle; -use reth_node_api::{FullNodeTypesAdapter, NodeTypesWithDBAdapter}; -use reth_node_builder::components::Components; -use reth_node_builder::engine_tree_config::TreeConfig; -use reth_node_builder::rpc::RpcAddOns; -use reth_node_builder::{EngineNodeLauncher, NodeAdapter, NodeBuilder, NodeConfig, NodeHandle}; -use reth_node_ethereum::node::{EthereumAddOns, EthereumEngineValidatorBuilder}; -use reth_node_ethereum::{ - BasicBlockExecutorProvider, EthEvmConfig, EthExecutionStrategyFactory, EthereumNode, -}; -use reth_primitives::{Transaction, TransactionSigned}; -use reth_provider::providers::BlockchainProvider2; -use reth_rpc::EthApi; -use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{ - BitfinityTransactionsForwarder, SharedQueue, TransactionsPriorityQueue, -}; -use reth_tasks::TaskManager; -use reth_transaction_pool::blobstore::DiskFileBlobStore; -use reth_transaction_pool::test_utils::MockTransaction; -use reth_transaction_pool::{ - CoinbaseTipOrdering, EthPooledTransaction, EthTransactionValidator, Pool, - TransactionValidationTaskExecutor, -}; -use revm_primitives::{Address, B256, U256}; -use std::time::Duration; -use std::{net::SocketAddr, str::FromStr, sync::Arc}; -use tokio::sync::mpsc::Receiver; -use tokio::sync::Mutex; - -#[tokio::test] -async fn bitfinity_test_should_start_local_reth_node() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - let (reth_client, _reth_node) = start_reth_node(&tasks, None, None, None).await; - - // Act & Assert - assert!(reth_client.get_chain_id().await.is_ok()); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::default(); - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; - - // Act - let result = reth_client.get_last_certified_block().await; - - // Assert - assert!(result.is_ok()); - - // Try with `eth_getLastCertifiedBlock` alias - let result: CertifiedResult> = reth_client - .single_request( - "eth_getLastCertifiedBlock".to_owned(), - ethereum_json_rpc_client::Params::None, - ethereum_json_rpc_client::Id::Num(1), - ) - .await - .unwrap(); - - assert_eq!(result.certificate, vec![1u8, 3, 11]); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_get_gas_price_requests() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::default(); - let gas_price = eth_server.gas_price; - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; - - // Act - let gas_price_result = reth_client.gas_price().await; - - // Assert - assert_eq!(gas_price_result.unwrap(), gas_price.into()); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::default(); - let max_priority_fee_per_gas = eth_server.max_priority_fee_per_gas; - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; - - // Act - let result = reth_client.max_priority_fee_per_gas().await; - - // Assert - assert_eq!(result.unwrap(), max_priority_fee_per_gas.into()); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_eth_get_genesis_balances() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::default(); - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; - - // Act - let result: Vec<(did::H160, did::U256)> = reth_client - .single_request( - "eth_getGenesisBalances".to_owned(), - ethereum_json_rpc_client::Params::None, - ethereum_json_rpc_client::Id::Num(1), - ) - .await - .unwrap(); - - // Assert - assert_eq!(result.len(), 3); - - assert_eq!(result[0].0, Address::from_slice(&[1u8; 20]).into()); - assert_eq!(result[0].1, U256::from(10).into()); - - assert_eq!(result[1].0, Address::from_slice(&[2u8; 20]).into()); - assert_eq!(result[1].1, U256::from(20).into()); - - assert_eq!(result[2].0, Address::from_slice(&[3u8; 20]).into()); - assert_eq!(result[2].1, U256::from(30).into()); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_ic_get_genesis_balances() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::default(); - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; - - // Act - let result = reth_client.get_genesis_balances().await.unwrap(); - - // Assert - assert_eq!(result.len(), 3); - - assert_eq!(result[0].0, Address::from_slice(&[1u8; 20]).into()); - assert_eq!(result[0].1, U256::from(10).into()); - - assert_eq!(result[1].0, Address::from_slice(&[2u8; 20]).into()); - assert_eq!(result[1].1, U256::from(20).into()); - - assert_eq!(result[2].0, Address::from_slice(&[3u8; 20]).into()); - assert_eq!(result[2].1, U256::from(30).into()); -} - -#[tokio::test] -async fn bitfinity_test_node_forward_send_raw_transaction_requests() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); - let eth_server = EthImpl::new(Some(tx_sender)); - - let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); - - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let bitfinity_evm_url = format!("http://{}", eth_server_address); - let (reth_client, _reth_node) = start_reth_node( - &tasks, - Some(format!("http://{}", eth_server_address)), - None, - Some(queue.clone()), - ) - .await; - - // Create a random transaction - let tx = transaction_with_gas_price(100); - let encoded = alloy_rlp::encode(&tx); - let expected_tx_hash = keccak::keccak_hash(&encoded); - - // Act - let result = reth_client.send_raw_transaction_bytes(&encoded).await.unwrap(); - - // Assert - assert_eq!(result, expected_tx_hash); - - let transaction_sending = BitfinityTransactionSender::new( - queue, - bitfinity_evm_url, - Duration::from_millis(200), - 10, - 100, - ); - transaction_sending.single_execution().await.unwrap(); - - let received_txs = consume_received_txs(&mut tx_receiver, 1).await.unwrap(); - - assert_eq!(received_txs[0], expected_tx_hash.0); -} - -#[tokio::test] -async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); - let eth_server = EthImpl::new(Some(tx_sender)); - - let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); - - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let bitfinity_evm_url = format!("http://{}", eth_server_address); - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; - - const TXS_NUMBER: usize = 100; - - // Create a random transactions - let transactions = (1..=TXS_NUMBER) - .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) - .collect::>(); - - // Only highest price transactions should be sent. - let expected_hashes = - transactions.iter().rev().take(10).map(|tx| keccak::keccak_hash(tx)).collect::>(); - - // Act - for tx in &transactions { - reth_client.send_raw_transaction_bytes(tx).await.unwrap(); - } - - let transaction_sending = BitfinityTransactionSender::new( - queue.clone(), - bitfinity_evm_url, - Duration::from_millis(200), - 10, - 100, - ); - transaction_sending.single_execution().await.unwrap(); - - let received_txs = consume_received_txs(&mut tx_receiver, 10).await.unwrap(); - - // Check all queued transactions sent. - assert!(queue.lock().await.is_empty()); - - for expected_hash in expected_hashes.iter().rev() { - assert!(received_txs.contains(&expected_hash.0)); - } -} - -#[tokio::test] -async fn bitfinity_test_node_get_transaction_when_it_is_queued() { - // Arrange - let _log = init_logs(); - let tasks = TaskManager::current(); - - let eth_server = EthImpl::new(None); - - let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); - - let (_server, eth_server_address) = - mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let bitfinity_evm_url = format!("http://{}", eth_server_address); - let (reth_client, _reth_node) = - start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; - - const TXS_NUMBER: usize = 10; - - // Create a random transactions - let transactions = (1..=TXS_NUMBER) - .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) - .collect::>(); - - let expected_hashes = transactions.iter().map(|tx| keccak::keccak_hash(tx)).collect::>(); - - // Act - for (tx, expected_hash) in transactions.iter().zip(expected_hashes.iter()) { - let hash = reth_client.send_raw_transaction_bytes(tx).await.unwrap(); - assert_eq!(hash, *expected_hash); - } - - for hash in &expected_hashes { - let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); - // Transaction in forwarder has NO block number. - dbg!(tx.block_number); - assert!(tx.block_number.is_none()); - } - - let transaction_sending = BitfinityTransactionSender::new( - queue, - bitfinity_evm_url, - Duration::from_millis(200), - 10, - 100, - ); - transaction_sending.single_execution().await.unwrap(); - - for hash in &expected_hashes { - let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); - // Transaction in mock has block number. - assert!(tx.block_number.is_some()); - } -} - -/// Waits until `n` transactions appear in `received_txs` with one second timeout. -/// Returns true if `received_txs` contains at least `n` transactions. -async fn consume_received_txs(received_txs: &mut Receiver, n: usize) -> Option> { - let wait_future = async { - let mut txs = Vec::with_capacity(n); - while txs.len() < n { - let tx = received_txs.recv().await.unwrap(); - txs.push(tx); - } - txs - }; - - let wait_result = tokio::time::timeout(Duration::from_secs(3), wait_future).await; - wait_result.ok() -} - -fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned { - let mock = MockTransaction::legacy().with_gas_price(gas_price); - let transaction: Transaction = mock.into(); - - sign_tx_with_random_key_pair(transaction) -} - -fn sign_tx_with_random_key_pair(tx: Transaction) -> TransactionSigned { - let secp = Secp256k1::new(); - let key_pair = Keypair::new(&secp, &mut rand::thread_rng()); - sign_tx_with_key_pair(key_pair, tx) -} - -fn sign_tx_with_key_pair(key_pair: Keypair, tx: Transaction) -> TransactionSigned { - let signature = reth_primitives::sign_message( - B256::from_slice(&key_pair.secret_bytes()[..]), - tx.signature_hash(), - ) - .unwrap(); - TransactionSigned::new(tx, signature, Default::default()) -} - -/// Start a local reth node -async fn start_reth_node( - tasks: &TaskManager, - bitfinity_evm_url: Option, - import_data: Option, - queue: Option, -) -> ( - EthJsonRpcClient, - NodeHandle< - NodeAdapter< - FullNodeTypesAdapter< - EthereumNode, - Arc>, - BlockchainProvider2< - NodeTypesWithDBAdapter>>, - >, - >, - Components< - FullNodeTypesAdapter< - EthereumNode, - Arc>, - BlockchainProvider2< - NodeTypesWithDBAdapter>>, - >, - >, - reth_network::EthNetworkPrimitives, - Pool< - TransactionValidationTaskExecutor< - EthTransactionValidator< - BlockchainProvider2< - NodeTypesWithDBAdapter< - EthereumNode, - Arc>, - >, - >, - EthPooledTransaction, - >, - >, - CoinbaseTipOrdering, - DiskFileBlobStore, - >, - EthEvmConfig, - BasicBlockExecutorProvider, - Arc, - >, - >, - RpcAddOns< - NodeAdapter< - FullNodeTypesAdapter< - EthereumNode, - Arc>, - BlockchainProvider2< - NodeTypesWithDBAdapter>>, - >, - >, - Components< - FullNodeTypesAdapter< - EthereumNode, - Arc>, - BlockchainProvider2< - NodeTypesWithDBAdapter>>, - >, - >, - reth_network::EthNetworkPrimitives, - Pool< - TransactionValidationTaskExecutor< - EthTransactionValidator< - BlockchainProvider2< - NodeTypesWithDBAdapter< - EthereumNode, - Arc>, - >, - >, - EthPooledTransaction, - >, - >, - CoinbaseTipOrdering, - DiskFileBlobStore, - >, - EthEvmConfig, - BasicBlockExecutorProvider, - Arc, - >, - >, - EthApi< - BlockchainProvider2< - NodeTypesWithDBAdapter>>, - >, - Pool< - TransactionValidationTaskExecutor< - EthTransactionValidator< - BlockchainProvider2< - NodeTypesWithDBAdapter< - EthereumNode, - Arc>, - >, - >, - EthPooledTransaction, - >, - >, - CoinbaseTipOrdering, - DiskFileBlobStore, - >, - NetworkHandle, - EthEvmConfig, - >, - EthereumEngineValidatorBuilder, - >, - >, -) { - // create node config - let mut node_config = - NodeConfig::test().dev().with_rpc(RpcServerArgs::default().with_http()).with_unused_ports(); - node_config.dev.dev = false; - - let mut chain = node_config.chain.as_ref().clone(); - chain.bitfinity_evm_url = bitfinity_evm_url.clone(); - let mut node_config = node_config.with_chain(chain); - - let database = if let Some(import_data) = import_data { - let data_dir = MaybePlatformPath::::from_str( - import_data.data_dir.data_dir().to_str().unwrap(), - ) - .unwrap(); - let mut data_dir_args = node_config.datadir.clone(); - data_dir_args.datadir = data_dir; - data_dir_args.static_files_path = Some(import_data.data_dir.static_files()); - node_config = node_config.with_datadir_args(data_dir_args); - node_config = node_config.with_chain(import_data.chain.clone()); - import_data.database - } else { - let path = MaybePlatformPath::::from(tempdir_path()); - node_config = node_config - .with_datadir_args(DatadirArgs { datadir: path.clone(), ..Default::default() }); - let data_dir = - path.unwrap_or_chain_default(node_config.chain.chain, node_config.datadir.clone()); - Arc::new(init_db(data_dir.db(), Default::default()).unwrap()) - }; - - let exec = tasks.executor(); - let node_handle = NodeBuilder::new(node_config) - .testing_node(exec) - .with_types_and_provider::>() - .with_components(EthereumNode::components()) - .with_add_ons(EthereumAddOns::default()) - .on_rpc_started(|ctx, _| { - // Add custom forwarder with transactions priority queue. - let Some(queue) = queue else { return Ok(()) }; - let forwarder = BitfinityTransactionsForwarder::new(queue); - ctx.registry.eth_api().set_bitfinity_tx_forwarder(forwarder); - Ok(()) - }) - .launch_with_fn(|builder| { - let launcher = EngineNodeLauncher::new( - builder.task_executor().clone(), - builder.config().datadir(), - TreeConfig::default(), - ); - builder.launch_with(launcher) - }) - .await - .unwrap(); - - let reth_address = node_handle.node.rpc_server_handle().http_local_addr().unwrap(); - let addr_string = format!("http://{}", reth_address); - - let client: EthJsonRpcClient = - EthJsonRpcClient::new(ReqwestClient::new(addr_string)); - - (client, node_handle) -} - -/// Start a local Eth server. -/// Reth requests will be forwarded to this server -async fn mock_eth_server_start(methods: impl Into) -> (ServerHandle, SocketAddr) { - let addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let server = Server::builder().build(addr).await.unwrap(); - - let mut module = RpcModule::new(()); - module.merge(methods).unwrap(); - - let server_address = server.local_addr().unwrap(); - let handle = server.start(module); - - (handle, server_address) -} - -/// Eth server mock for local testing -pub mod eth_server { - - use alloy_consensus::{Signed, TxEnvelope, TxLegacy}; - use alloy_rlp::{Bytes, Decodable}; - use alloy_rpc_types::Transaction; - use ethereum_json_rpc_client::CertifiedResult; - use jsonrpsee::{core::RpcResult, proc_macros::rpc}; - use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; - use reth_primitives::{sign_message, TransactionSigned}; - use revm_primitives::{hex, Address, B256, U256}; - use tokio::sync::{mpsc::Sender, Mutex}; - - #[rpc(server, namespace = "eth")] - pub trait Eth { - /// Returns the current gas price. - #[method(name = "gasPrice")] - async fn gas_price(&self) -> RpcResult; - - /// Returns the current max priority fee per gas. - #[method(name = "maxPriorityFeePerGas")] - async fn max_priority_fee_per_gas(&self) -> RpcResult; - - /// Sends a raw transaction. - #[method(name = "sendRawTransaction")] - async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; - - /// Returns transaction by hash. - #[method(name = "getTransactionByHash")] - async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult>; - - /// Returns the genesis balances. - #[method(name = "getGenesisBalances", aliases = ["ic_getGenesisBalances"])] - async fn get_genesis_balances(&self) -> RpcResult>; - - /// Returns the last certified block. - #[method(name = "getLastCertifiedBlock", aliases = ["ic_getLastCertifiedBlock"])] - async fn get_last_certified_block( - &self, - ) -> RpcResult>>; - } - - /// Eth server implementation for local testing - #[derive(Debug)] - pub struct EthImpl { - /// Current gas price - pub gas_price: u128, - - /// Current max priority fee per gas - pub max_priority_fee_per_gas: u128, - - /// List of received transactions. - pub received_txs: Mutex>, - - /// The mock will send transactions to the sender, if present. - pub txs_sender: Option>, - } - - impl EthImpl { - /// Create a new Eth server implementation - pub fn new(txs_sender: Option>) -> Self { - Self { - gas_price: rand::random(), - max_priority_fee_per_gas: rand::random(), - received_txs: Mutex::default(), - txs_sender, - } - } - } - - impl Default for EthImpl { - fn default() -> Self { - Self::new(None) - } - } - - #[async_trait::async_trait] - impl EthServer for EthImpl { - async fn gas_price(&self) -> RpcResult { - Ok(U256::from(self.gas_price)) - } - - async fn max_priority_fee_per_gas(&self) -> RpcResult { - Ok(U256::from(self.max_priority_fee_per_gas)) - } - - async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { - let decoded = hex::decode(&tx).unwrap(); - let tx = TransactionSigned::decode(&mut decoded.as_ref()).unwrap(); - let hash = tx.hash(); - self.received_txs.lock().await.push(hash); - if let Some(sender) = &self.txs_sender { - sender.send(hash).await.unwrap(); - } - Ok(hash) - } - - async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult> { - if !self.received_txs.lock().await.contains(&hash) { - return Ok(None); - } - - // If tx present, ruturn it with some block number. - let legacy = TxLegacy { - nonce: 42, - to: revm_primitives::TxKind::Create, - value: Default::default(), - gas_price: Default::default(), - input: Default::default(), - chain_id: Default::default(), - gas_limit: Default::default(), - }; - - let key_pair = Keypair::new(&Secp256k1::new(), &mut rand::thread_rng()); - let signature = - sign_message(B256::from_slice(&key_pair.secret_bytes()[..]), hash).unwrap(); - let envelope = TxEnvelope::Legacy(Signed::new_unchecked(legacy, signature, hash)); - let tx = Transaction { - block_hash: Some(B256::random()), - block_number: Some(42), - transaction_index: Some(42), - from: Address::random(), - effective_gas_price: None, - inner: envelope, - }; - - Ok(Some(tx)) - } - - async fn get_genesis_balances(&self) -> RpcResult> { - Ok(vec![ - (Address::from_slice(&[1u8; 20]), U256::from(10)), - (Address::from_slice(&[2u8; 20]), U256::from(20)), - (Address::from_slice(&[3u8; 20]), U256::from(30)), - ]) - } - - async fn get_last_certified_block( - &self, - ) -> RpcResult>> { - Ok(CertifiedResult { - data: Default::default(), - witness: vec![], - certificate: vec![1u8, 3, 11], - }) - } - } -} diff --git a/bin/reth/tests/commands/mod.rs b/bin/reth/tests/commands/mod.rs index 08fc91702e8..ebff8c18ac8 100644 --- a/bin/reth/tests/commands/mod.rs +++ b/bin/reth/tests/commands/mod.rs @@ -2,9 +2,6 @@ //! bitfinity integration tests //! pub mod bitfinity_import_it; -pub mod bitfinity_import_it2; pub mod bitfinity_node_it; -pub mod bitfinity_node_it2; pub mod bitfinity_reset_evm_state_it; pub mod utils; -pub mod utils2; diff --git a/bin/reth/tests/commands/utils.rs b/bin/reth/tests/commands/utils.rs index b87e45377a8..dea88e299e8 100644 --- a/bin/reth/tests/commands/utils.rs +++ b/bin/reth/tests/commands/utils.rs @@ -18,8 +18,6 @@ use reth::{ commands::bitfinity_import::BitfinityImportCommand, dirs::{ChainPath, DataDirPath, PlatformPath}, }; -use reth_beacon_consensus::EthBeaconConsensus; -use reth_blockchain_tree::{BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals}; use reth_chainspec::ChainSpec; use reth_db::{init_db, DatabaseEnv}; use reth_downloaders::bitfinity_evm_client::BitfinityEvmClient; @@ -29,7 +27,7 @@ use reth_node_api::NodeTypesWithDBAdapter; use reth_node_ethereum::EthereumNode; use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; use reth_provider::{ - providers::{BlockchainProvider, StaticFileProvider}, + providers::{BlockchainProvider2, StaticFileProvider}, BlockNumReader, ExecutionOutcome, ProviderError, ProviderFactory, }; use reth_prune::PruneModes; @@ -74,7 +72,7 @@ pub struct ImportData { /// The provider factory. pub provider_factory: ProviderFactory, /// The blockchain provider. - pub blockchain_db: BlockchainProvider, + pub blockchain_db: BlockchainProvider2, /// The bitfinity import arguments. pub bitfinity_args: BitfinityImportArgs, } @@ -145,17 +143,7 @@ pub async fn bitfinity_import_config_data( reth_db_common::init::init_genesis(&provider_factory)?; - let consensus = Arc::new(EthBeaconConsensus::new(chain.clone())); - - let executor = MockExecutorProvider::default(); //EvmExecutorFac::new(self.chain.clone(), EthEvmConfig::default()); - - let blockchain_tree = - Arc::new(ShareableBlockchainTree::new(reth_blockchain_tree::BlockchainTree::new( - TreeExternals::new(provider_factory.clone(), consensus, executor), - BlockchainTreeConfig::default(), - )?)); - - let blockchain_db = BlockchainProvider::new(provider_factory.clone(), blockchain_tree)?; + let blockchain_db = BlockchainProvider2::new(provider_factory.clone())?; let bitfinity_args = BitfinityImportArgs { rpc_url: evm_datasource_url.to_string(), diff --git a/bin/reth/tests/commands/utils2.rs b/bin/reth/tests/commands/utils2.rs deleted file mode 100644 index f49b439c7af..00000000000 --- a/bin/reth/tests/commands/utils2.rs +++ /dev/null @@ -1,300 +0,0 @@ -//! -//! Utils for bitfinity integration tests -//! -use std::{ - fmt::{Debug, Display, Formatter}, - path::PathBuf, - str::FromStr, - sync::Arc, - time, -}; - -use alloy_eips::eip7685::Requests; -use alloy_primitives::BlockNumber; -use lightspeed_scheduler::JobExecutor; -use parking_lot::Mutex; -use reth::{ - args::{BitfinityImportArgs, IC_MAINNET_KEY}, - commands::bitfinity_import2::BitfinityImportCommand, - dirs::{ChainPath, DataDirPath, PlatformPath}, -}; -use reth_chainspec::ChainSpec; -use reth_db::{init_db, DatabaseEnv}; -use reth_downloaders::bitfinity_evm_client::BitfinityEvmClient; -use reth_errors::BlockExecutionError; -use reth_evm::execute::{BatchExecutor, BlockExecutionOutput, BlockExecutorProvider, Executor}; -use reth_node_api::NodeTypesWithDBAdapter; -use reth_node_ethereum::EthereumNode; -use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; -use reth_provider::{ - providers::{BlockchainProvider2, StaticFileProvider}, - BlockNumReader, ExecutionOutcome, ProviderError, ProviderFactory, -}; -use reth_prune::PruneModes; -use reth_tracing::{FileWorkerGuard, LayerInfo, LogFormat, RethTracer, Tracer}; -use revm_primitives::db::Database; -use tempfile::TempDir; -use tracing::{debug, info}; - -/// Local EVM canister ID for testing. -pub const LOCAL_EVM_CANISTER_ID: &str = "bkyz2-fmaaa-aaaaa-qaaaq-cai"; -/// EVM block extractor for devnet running on Digital Ocean. -pub const DEFAULT_EVM_DATASOURCE_URL: &str = - "https://block-extractor-testnet-1052151659755.europe-west9.run.app"; - -/// Initializes the logs for the tests. -pub fn init_logs() -> eyre::Result> { - let mut tracer = RethTracer::new(); - let stdout = LayerInfo::new( - LogFormat::Terminal, - "info".to_string(), - String::new(), - Some("always".to_string()), - ); - tracer = tracer.with_stdout(stdout); - - let guard = tracer.init()?; - Ok(guard) -} - -/// Type alias for the node types. -pub type NodeTypes = NodeTypesWithDBAdapter>; - -#[derive(Clone)] -/// Data needed for the import tests. -pub struct ImportData { - /// The chain spec. - pub chain: Arc, - /// The data directory. - pub data_dir: ChainPath, - /// The database. - pub database: Arc, - /// The provider factory. - pub provider_factory: ProviderFactory, - /// The blockchain provider. - pub blockchain_db: BlockchainProvider2, - /// The bitfinity import arguments. - pub bitfinity_args: BitfinityImportArgs, -} - -impl Debug for ImportData { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ImportTempData").field("chain", &self.chain).finish() - } -} - -/// Imports blocks from the EVM node. -pub async fn import_blocks( - import_data: ImportData, - timeout: time::Duration, - shutdown_when_done: bool, -) -> JobExecutor { - let end_block = import_data.bitfinity_args.end_block.unwrap(); - let import = BitfinityImportCommand::new( - None, - import_data.data_dir, - import_data.chain, - import_data.bitfinity_args, - import_data.provider_factory.clone(), - import_data.blockchain_db, - ); - let (job_executor, _import_handle) = import.schedule_execution(None).await.unwrap(); - wait_until_local_block_imported(&import_data.provider_factory, end_block, timeout).await; - - if shutdown_when_done { - debug!("Stopping job executor"); - job_executor.stop(true).await.unwrap(); - debug!("Job executor stopped"); - } - - job_executor -} - -/// Initializes the database and the blockchain tree for the bitfinity import tests. -/// If a `data_dir` is provided, it will be used, otherwise a temporary directory will be created. -pub async fn bitfinity_import_config_data( - evm_datasource_url: &str, - backup_evm_datasource_url: Option, - data_dir: Option, -) -> eyre::Result<(TempDir, ImportData)> { - let chain = Arc::new( - BitfinityEvmClient::fetch_chain_spec_with_fallback( - evm_datasource_url.to_owned(), - backup_evm_datasource_url.clone(), - ) - .await?, - ); - - let temp_dir = TempDir::new().unwrap(); - - let data_dir = data_dir.unwrap_or_else(|| temp_dir.path().to_path_buf()); - let data_dir: PlatformPath = - PlatformPath::from_str(data_dir.as_os_str().to_str().unwrap())?; - let data_dir = ChainPath::new(data_dir, chain.chain, Default::default()); - - let db_path = data_dir.db(); - - let database = Arc::new(init_db(db_path, Default::default())?); - let provider_factory = ProviderFactory::new( - database.clone(), - chain.clone(), - StaticFileProvider::read_write(data_dir.static_files())?, - ); - - reth_db_common::init::init_genesis(&provider_factory)?; - - let blockchain_db = BlockchainProvider2::new(provider_factory.clone())?; - - let bitfinity_args = BitfinityImportArgs { - rpc_url: evm_datasource_url.to_string(), - send_raw_transaction_rpc_url: None, - end_block: Some(100), - import_interval: 1, - batch_size: 1000, - max_fetch_blocks: 10000, - evmc_principal: LOCAL_EVM_CANISTER_ID.to_string(), - ic_root_key: IC_MAINNET_KEY.to_string(), - backup_rpc_url: backup_evm_datasource_url, - max_retries: 3, - retry_delay_secs: 3, - tx_queue: false, - tx_queue_size: 1000, - send_queued_txs_period_secs: 3, - queued_txs_batch_size: 10, - queued_txs_per_execution_threshold: 100, - }; - - Ok(( - temp_dir, - ImportData { data_dir, database, chain, provider_factory, blockchain_db, bitfinity_args }, - )) -} - -/// Waits until the block is imported. -pub async fn wait_until_local_block_imported( - provider_factory: &ProviderFactory, - block: BlockNumber, - timeout: time::Duration, -) { - let now = std::time::Instant::now(); - loop { - let provider = provider_factory.provider().unwrap(); - let last_block = provider.last_block_number().unwrap(); - if last_block == block { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - assert!(now.elapsed() <= timeout, "Timeout waiting for the last block to be imported. Waiting for block: {} but last block found was {}", block, last_block) - } -} - -/// Returns the local dfx port. -pub fn get_dfx_local_port() -> u16 { - use std::process::Command; - let output = Command::new("dfx") - .arg("info") - .arg("replica-port") - .output() - .expect("failed to execute process"); - - let port = String::from_utf8_lossy(&output.stdout); - info!("dfx port: {}", port); - u16::from_str(port.trim()).unwrap() -} - -/// A [`BlockExecutorProvider`] that returns mocked execution results. -/// Original code taken from ./`crates/evm/src/test_utils.rs` -#[derive(Clone, Debug, Default)] -struct MockExecutorProvider { - exec_results: Arc>>, -} - -impl BlockExecutorProvider for MockExecutorProvider { - type Executor + Display>> = Self; - - type BatchExecutor + Display>> = Self; - - fn executor(&self, _: DB) -> Self::Executor - where - DB: Database + Display>, - { - self.clone() - } - - fn batch_executor(&self, _: DB) -> Self::BatchExecutor - where - DB: Database + Display>, - { - self.clone() - } - - type Primitives = EthPrimitives; -} - -impl Executor for MockExecutorProvider { - type Input<'a> = &'a BlockWithSenders; - type Output = BlockExecutionOutput; - type Error = BlockExecutionError; - - fn execute(self, _: Self::Input<'_>) -> Result { - let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = - self.exec_results.lock().pop().unwrap(); - Ok(BlockExecutionOutput { - state: bundle, - receipts: receipts.into_iter().flatten().flatten().collect(), - requests: Requests::new(requests.into_iter().flatten().collect()), - gas_used: 0, - }) - } - - fn execute_with_state_closure( - self, - _input: Self::Input<'_>, - _state: F, - ) -> Result - where - F: FnMut(&reth_revm::State), - { - let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = - self.exec_results.lock().pop().unwrap(); - Ok(BlockExecutionOutput { - state: bundle, - receipts: receipts.into_iter().flatten().flatten().collect(), - requests: Requests::new(requests.into_iter().flatten().collect()), - gas_used: 0, - }) - } - - fn execute_with_state_hook( - self, - _input: Self::Input<'_>, - _state_hook: F, - ) -> Result - where - F: reth_evm::system_calls::OnStateHook + 'static, - { - todo!() - } -} - -impl BatchExecutor for MockExecutorProvider { - type Input<'a> = &'a BlockWithSenders; - type Output = ExecutionOutcome; - type Error = BlockExecutionError; - - fn execute_and_verify_one(&mut self, _: Self::Input<'_>) -> Result<(), Self::Error> { - Ok(()) - } - - fn finalize(self) -> Self::Output { - self.exec_results.lock().pop().unwrap() - } - - fn set_tip(&mut self, _: BlockNumber) {} - - fn size_hint(&self) -> Option { - None - } - - fn set_prune_modes(&mut self, _prune_modes: PruneModes) {} -}