diff --git a/Cargo.lock b/Cargo.lock index 8b570cf2cd3..362a2ad5c32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7235,6 +7235,7 @@ dependencies = [ "reth-db", "reth-db-api", "reth-db-common", + "reth-discv5", "reth-downloaders", "reth-engine-util", "reth-errors", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index ca31910c019..b6fcda2834c 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -100,9 +100,10 @@ similar-asserts.workspace = true async-channel.workspace = true candid.workspace = true did.workspace = true +revm-primitives.workspace = true evm-canister-client = { workspace = true, features = ["ic-agent-client"] } lightspeed_scheduler = { workspace = true, features = ["tracing"] } -# rlp = { workspace = true } +ethereum-json-rpc-client = { workspace = true, features = ["reqwest"] } [dev-dependencies] tempfile.workspace = true @@ -110,16 +111,16 @@ tempfile.workspace = true # bitfinity dev dependencies async-trait = { workspace = true } dirs.workspace = true -ethereum-json-rpc-client = { workspace = true, features = ["reqwest"] } jsonrpsee = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } reth-primitives = { workspace = true, features = ["test-utils"] } reth-trie = { workspace = true, features = ["test-utils"] } -revm-primitives.workspace = true +reth-transaction-pool = { workspace = true, features = ["test-utils"] } reth-db-common.workspace = true reth-db = { workspace = true, features = ["mdbx", "test-utils"] } serial_test.workspace = true +reth-discv5.workspace = true [features] default = ["jemalloc"] diff --git a/bin/reth/src/bitfinity_tasks.rs b/bin/reth/src/bitfinity_tasks.rs new file mode 100644 index 00000000000..66032569f2b --- /dev/null +++ b/bin/reth/src/bitfinity_tasks.rs @@ -0,0 +1,3 @@ +//! This module contains tasks to run in parallel with reth node. + +pub mod send_txs; diff --git a/bin/reth/src/bitfinity_tasks/send_txs.rs b/bin/reth/src/bitfinity_tasks/send_txs.rs new file mode 100644 index 00000000000..a65a7ea174e --- /dev/null +++ b/bin/reth/src/bitfinity_tasks/send_txs.rs @@ -0,0 +1,153 @@ +//! Utils for raw transaction batching. + +use std::time::Duration; + +use did::H256; +use ethereum_json_rpc_client::reqwest::ReqwestClient; +use ethereum_json_rpc_client::{EthJsonRpcClient, Id, Params}; +use eyre::eyre; +use futures::future::join_all; +use lightspeed_scheduler::job::Job; +use lightspeed_scheduler::scheduler::Scheduler; +use lightspeed_scheduler::JobExecutor; +use reth_node_core::version::SHORT_VERSION; +use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::SharedQueue; +use revm_primitives::{hex, U256}; +use tracing::{info, trace, warn}; + +/// Periodically sends transactions from priority queue. +#[derive(Debug, Clone)] +pub struct BitfinityTransactionSender { + queue: SharedQueue, + rpc_url: String, + period: Duration, + batch_size: usize, + txs_per_execution_threshold: usize, +} + +impl BitfinityTransactionSender { + /// Creates new instance of the transaction sender. + pub const fn new( + queue: SharedQueue, + rpc_url: String, + period: Duration, + batch_size: usize, + txs_per_execution_threshold: usize, + ) -> Self { + Self { queue, rpc_url, period, batch_size, txs_per_execution_threshold } + } + + /// Schedule the transaction sending job and return a handle to it. + pub async fn schedule_execution( + self, + job_executor: Option, + ) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> { + info!(target: "reth::cli - BitfinityTransactionSender", "reth {} starting", SHORT_VERSION); + + let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz); + + // Schedule the import job + { + let interval = + Scheduler::Interval { interval_duration: self.period, execute_at_startup: true }; + job_executor + .add_job_with_scheduler( + interval, + Job::new("send transactions", "bitfinity tx sending", None, move || { + let tx_sender = self.clone(); + Box::pin(async move { + tx_sender.single_execution().await?; + Ok(()) + }) + }), + ) + .await; + } + + let job_handle = job_executor.run().await?; + Ok((job_executor, job_handle)) + } + + /// Execute the transaction sending job. + pub async fn single_execution(&self) -> eyre::Result<()> { + let mut to_send = self.get_transactions_to_send().await; + let batch_size = self.batch_size.max(1); + let mut send_futures = vec![]; + + loop { + let last_idx = batch_size.min(to_send.len()); + if last_idx == 0 { + break; + } + + let to_send_batch: Vec<_> = to_send.drain(..last_idx).collect(); + + let send_future = async move { + let result = match to_send_batch.len() { + 0 => return, + 1 => self.send_single_tx(&to_send_batch[0].1).await, + _ => self.send_txs_batch(&to_send_batch).await, + }; + + if let Err(e) = result { + warn!("Failed to send transactions to EVM: {e}"); + } + }; + send_futures.push(send_future); + } + + join_all(send_futures).await; + + Ok(()) + } + + async fn get_transactions_to_send(&self) -> Vec<(U256, Vec)> { + let mut batch = Vec::with_capacity(self.txs_per_execution_threshold); + let mut queue = self.queue.lock().await; + let txs_to_pop = self.txs_per_execution_threshold.max(1); // if batch size is zero, take at least one tx. + + for _ in 0..txs_to_pop { + let Some(entry) = queue.pop_tx_with_highest_price() else { + break; + }; + + batch.push(entry); + } + + batch + } + + async fn send_single_tx(&self, to_send: &[u8]) -> Result<(), eyre::Error> { + let client = self.get_client()?; + let hash = client + .send_raw_transaction_bytes(to_send) + .await + .map_err(|e| eyre!("failed to send single transaction: {e}"))?; + + trace!("Single transaction with hash {hash} sent."); + + Ok(()) + } + + async fn send_txs_batch(&self, to_send: &[(U256, Vec)]) -> Result<(), eyre::Error> { + let client = self.get_client()?; + + let params = + to_send.iter().map(|(_, raw)| (Params::Array(vec![hex::encode(raw).into()]), Id::Null)); + let max_batch_size = usize::MAX; + let hashes = client + .batch_request::("eth_sendRawTransaction".into(), params, max_batch_size) + .await + .map_err(|e| eyre!("failed to send single transaction: {e}"))?; + + trace!("Raw transactions batch sent. Hashes: {hashes:?}"); + + Ok(()) + } + + fn get_client(&self) -> eyre::Result> { + let client = EthJsonRpcClient::new(ReqwestClient::new(self.rpc_url.clone())); + + Ok(client) + } +} diff --git a/bin/reth/src/commands/bitfinity_import.rs b/bin/reth/src/commands/bitfinity_import.rs index f6a6a714843..55f9f9d9a41 100644 --- a/bin/reth/src/commands/bitfinity_import.rs +++ b/bin/reth/src/commands/bitfinity_import.rs @@ -99,10 +99,11 @@ impl BitfinityImportCommand { /// Schedule the import job and return a handle to it. pub async fn schedule_execution( self, + job_executor: Option, ) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> { info!(target: "reth::cli - BitfinityImportCommand", "reth {} starting", SHORT_VERSION); - let job_executor = JobExecutor::new_with_local_tz(); + let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz); // Schedule the import job { diff --git a/bin/reth/src/commands/mod.rs b/bin/reth/src/commands/mod.rs index cb32bdd5f3f..9380056c8b1 100644 --- a/bin/reth/src/commands/mod.rs +++ b/bin/reth/src/commands/mod.rs @@ -1,5 +1,5 @@ //! This contains all of the `reth` commands -pub mod bitfinity_reset_evm_state; pub mod bitfinity_import; +pub mod bitfinity_reset_evm_state; pub mod debug_cmd; diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 53c592063ec..14bc2d9fb89 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -27,6 +27,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +pub mod bitfinity_tasks; pub mod cli; pub mod commands; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index f7d9bf62499..88d13499ea7 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -3,7 +3,10 @@ #[global_allocator] static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); +use std::{sync::Arc, time::Duration}; + use clap::{Args, Parser}; +use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_node_builder::{ engine_tree_config::{ @@ -13,7 +16,9 @@ use reth_node_builder::{ }; use reth_node_ethereum::node::EthereumAddOns; use reth_provider::providers::BlockchainProvider2; +use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{BitfinityTransactionsForwarder, TransactionsPriorityQueue}; use reth_tracing::tracing::warn; +use tokio::sync::Mutex; use tracing::info; /// Parameters for configuring the engine @@ -102,19 +107,41 @@ fn main() { let datadir = handle.node.data_dir.clone(); let (provider_factory, bitfinity) = handle.bitfinity_import.clone().expect("Bitfinity import not configured"); + // Init bitfinity import - { + let executor = { let import = BitfinityImportCommand::new( config, datadir, chain, - bitfinity, + bitfinity.clone(), provider_factory, blockchain_provider, ); - let _import_handle = import.schedule_execution().await?; + let (executor, _import_handle) = import.schedule_execution(None).await?; + executor }; + if bitfinity.tx_queue { + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(1000))); + + // Make EthApi handler move new txs to queue. + let queue_clone = Arc::clone(&queue); + handle.node.add_ons_handle.eth_api().set_bitfinity_tx_forwarder(BitfinityTransactionsForwarder::new(queue_clone)); + + // Run batch transaction sender. + let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url); + let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs); + let transaction_sending = BitfinityTransactionSender::new( + queue, + url, + period, + bitfinity.queued_txs_batch_size, + bitfinity.queued_txs_per_execution_threshold, + ); + let _sending_handle = transaction_sending.schedule_execution(Some(executor)).await?; + } + handle.node_exit_future.await } } diff --git a/bin/reth/tests/bitfinity_it.rs b/bin/reth/tests/bitfinity_it.rs index 9b836391f55..dbe5e9821c8 100644 --- a/bin/reth/tests/bitfinity_it.rs +++ b/bin/reth/tests/bitfinity_it.rs @@ -1,4 +1,4 @@ //! //! bitfinity integration tests -//! -pub mod commands; \ No newline at end of file +//! +pub mod commands; diff --git a/bin/reth/tests/commands/bitfinity_node_it.rs b/bin/reth/tests/commands/bitfinity_node_it.rs index 8c9de89f861..d834a6429c6 100644 --- a/bin/reth/tests/commands/bitfinity_node_it.rs +++ b/bin/reth/tests/commands/bitfinity_node_it.rs @@ -11,7 +11,7 @@ use jsonrpsee::{ server::{Server, ServerHandle}, Methods, RpcModule, }; -use rand::RngCore; +use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender; use reth::{ args::{DatadirArgs, RpcServerArgs}, dirs::{DataDirPath, MaybePlatformPath}, @@ -19,6 +19,7 @@ use reth::{ use reth_consensus::FullConsensus; use reth_db::DatabaseEnv; use reth_db::{init_db, test_utils::tempdir_path}; +use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; use reth_network::NetworkHandle; use reth_node_api::{FullNodeTypesAdapter, NodeTypesWithDBAdapter}; use reth_node_builder::components::Components; @@ -28,22 +29,31 @@ use reth_node_ethereum::node::EthereumEngineValidatorBuilder; use reth_node_ethereum::{ BasicBlockExecutorProvider, EthEvmConfig, EthExecutionStrategyFactory, EthereumNode, }; +use reth_primitives::{Transaction, TransactionSigned}; use reth_provider::providers::BlockchainProvider; use reth_rpc::EthApi; +use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{ + BitfinityTransactionsForwarder, SharedQueue, TransactionsPriorityQueue, +}; use reth_tasks::TaskManager; use reth_transaction_pool::blobstore::DiskFileBlobStore; +use reth_transaction_pool::test_utils::MockTransaction; use reth_transaction_pool::{ CoinbaseTipOrdering, EthPooledTransaction, EthTransactionValidator, Pool, TransactionValidationTaskExecutor, }; -use revm_primitives::{hex, Address, U256}; +use revm_primitives::{Address, B256, U256}; +use std::time::Duration; use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; #[tokio::test] async fn bitfinity_test_should_start_local_reth_node() { // Arrange let _log = init_logs(); - let (reth_client, _reth_node) = start_reth_node(None, None).await; + let tasks = TaskManager::current(); + let (reth_client, _reth_node) = start_reth_node(&tasks, None, None, None).await; // Act & Assert assert!(reth_client.get_chain_id().await.is_ok()); @@ -53,12 +63,13 @@ async fn bitfinity_test_should_start_local_reth_node() { async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; // Act let result = reth_client.get_last_certified_block().await; @@ -83,13 +94,14 @@ async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { async fn bitfinity_test_node_forward_get_gas_price_requests() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let gas_price = eth_server.gas_price; let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; // Act let gas_price_result = reth_client.gas_price().await; @@ -102,13 +114,14 @@ async fn bitfinity_test_node_forward_get_gas_price_requests() { async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let max_priority_fee_per_gas = eth_server.max_priority_fee_per_gas; let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; // Act let result = reth_client.max_priority_fee_per_gas().await; @@ -121,12 +134,13 @@ async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() { async fn bitfinity_test_node_forward_eth_get_genesis_balances() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; // Act let result: Vec<(did::H160, did::U256)> = reth_client @@ -155,12 +169,13 @@ async fn bitfinity_test_node_forward_eth_get_genesis_balances() { async fn bitfinity_test_node_forward_ic_get_genesis_balances() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + start_reth_node(&tasks, Some(format!("http://{}", eth_server_address)), None, None).await; // Act let result = reth_client.get_genesis_balances().await.unwrap(); @@ -182,42 +197,291 @@ async fn bitfinity_test_node_forward_ic_get_genesis_balances() { async fn bitfinity_test_node_forward_send_raw_transaction_requests() { // Arrange let _log = init_logs(); + let tasks = TaskManager::current(); + + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); - let eth_server = EthImpl::new(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; - let (reth_client, _reth_node) = - start_reth_node(Some(format!("http://{}", eth_server_address)), None).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = start_reth_node( + &tasks, + Some(format!("http://{}", eth_server_address)), + None, + Some(queue.clone()), + ) + .await; // Create a random transaction - let mut tx = [0u8; 256]; - rand::thread_rng().fill_bytes(&mut tx); - let expected_tx_hash = keccak::keccak_hash(format!("0x{}", hex::encode(tx)).as_bytes()); + let tx = transaction_with_gas_price(100); + let encoded = alloy_rlp::encode(&tx); + let expected_tx_hash = keccak::keccak_hash(&encoded); // Act - let result = reth_client.send_raw_transaction_bytes(&tx).await; + let result = reth_client.send_raw_transaction_bytes(&encoded).await.unwrap(); // Assert - assert_eq!(result.unwrap(), expected_tx_hash); + assert_eq!(result, expected_tx_hash); + + let transaction_sending = BitfinityTransactionSender::new( + queue, + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + let received_txs = consume_received_txs(&mut tx_receiver, 1).await.unwrap(); + + assert_eq!(received_txs[0], expected_tx_hash.0); +} + +#[tokio::test] +async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); + + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; + + const TXS_NUMBER: usize = 100; + + // Create a random transactions + let transactions = (1..=TXS_NUMBER) + .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) + .collect::>(); + + // Only highest price transactions should be sent. + let expected_hashes = + transactions.iter().rev().take(10).map(|tx| keccak::keccak_hash(tx)).collect::>(); + + // Act + for tx in &transactions { + reth_client.send_raw_transaction_bytes(tx).await.unwrap(); + } + + let transaction_sending = BitfinityTransactionSender::new( + queue.clone(), + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + let received_txs = consume_received_txs(&mut tx_receiver, 10).await.unwrap(); + + // Check all queued transactions sent. + assert!(queue.lock().await.is_empty()); + + for expected_hash in expected_hashes.iter().rev() { + assert!(received_txs.contains(&expected_hash.0)); + } +} + +#[tokio::test] +async fn bitfinity_test_node_get_transaction_when_it_is_queued() { + // Arrange + let _log = init_logs(); + let tasks = TaskManager::current(); + + let eth_server = EthImpl::new(None); + + let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(10))); + + let (_server, eth_server_address) = + mock_eth_server_start(EthServer::into_rpc(eth_server)).await; + let bitfinity_evm_url = format!("http://{}", eth_server_address); + let (reth_client, _reth_node) = + start_reth_node(&tasks, Some(bitfinity_evm_url.clone()), None, Some(queue.clone())).await; + + const TXS_NUMBER: usize = 10; + + // Create a random transactions + let transactions = (1..=TXS_NUMBER) + .map(|i| alloy_rlp::encode(transaction_with_gas_price(100 * i as u128))) + .collect::>(); + + let expected_hashes = transactions.iter().map(|tx| keccak::keccak_hash(tx)).collect::>(); + + // Act + for (tx, expected_hash) in transactions.iter().zip(expected_hashes.iter()) { + let hash = reth_client.send_raw_transaction_bytes(tx).await.unwrap(); + assert_eq!(hash, *expected_hash); + } + + for hash in &expected_hashes { + let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); + // Transaction in forwarder has NO block number. + dbg!(tx.block_number); + assert!(tx.block_number.is_none()); + } + + let transaction_sending = BitfinityTransactionSender::new( + queue, + bitfinity_evm_url, + Duration::from_millis(200), + 10, + 100, + ); + transaction_sending.single_execution().await.unwrap(); + + for hash in &expected_hashes { + let tx = reth_client.get_transaction_by_hash(hash.clone()).await.unwrap().unwrap(); + // Transaction in mock has block number. + assert!(tx.block_number.is_some()); + } +} + +/// Waits until `n` transactions appear in `received_txs` with one second timeout. +/// Returns true if `received_txs` contains at least `n` transactions. +async fn consume_received_txs(received_txs: &mut Receiver, n: usize) -> Option> { + let wait_future = async { + let mut txs = Vec::with_capacity(n); + while txs.len() < n { + let tx = received_txs.recv().await.unwrap(); + txs.push(tx); + } + txs + }; + + let wait_result = tokio::time::timeout(Duration::from_secs(3), wait_future).await; + wait_result.ok() +} + +fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned { + let mock = MockTransaction::legacy().with_gas_price(gas_price); + let transaction: Transaction = mock.into(); + + sign_tx_with_random_key_pair(transaction) +} + +fn sign_tx_with_random_key_pair(tx: Transaction) -> TransactionSigned { + let secp = Secp256k1::new(); + let key_pair = Keypair::new(&secp, &mut rand::thread_rng()); + sign_tx_with_key_pair(key_pair, tx) +} + +fn sign_tx_with_key_pair(key_pair: Keypair, tx: Transaction) -> TransactionSigned { + let signature = reth_primitives::sign_message( + B256::from_slice(&key_pair.secret_bytes()[..]), + tx.signature_hash(), + ) + .unwrap(); + TransactionSigned::new(tx, signature, Default::default()) } /// Start a local reth node async fn start_reth_node( + tasks: &TaskManager, bitfinity_evm_url: Option, import_data: Option, + queue: Option, ) -> ( EthJsonRpcClient, - NodeHandle, BlockchainProvider>>>, Components, BlockchainProvider>>>, reth_network::EthNetworkPrimitives, Pool>>, EthPooledTransaction>>, CoinbaseTipOrdering, DiskFileBlobStore>, EthEvmConfig, BasicBlockExecutorProvider, Arc>>, RpcAddOns, BlockchainProvider>>>, Components, BlockchainProvider>>>, reth_network::EthNetworkPrimitives, Pool>>, EthPooledTransaction>>, CoinbaseTipOrdering, DiskFileBlobStore>, EthEvmConfig, BasicBlockExecutorProvider, Arc>>, EthApi>>, Pool>>, EthPooledTransaction>>, CoinbaseTipOrdering, DiskFileBlobStore>, NetworkHandle, EthEvmConfig>, EthereumEngineValidatorBuilder>>, + NodeHandle< + NodeAdapter< + FullNodeTypesAdapter< + EthereumNode, + Arc, + BlockchainProvider>>, + >, + Components< + FullNodeTypesAdapter< + EthereumNode, + Arc, + BlockchainProvider>>, + >, + reth_network::EthNetworkPrimitives, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider< + NodeTypesWithDBAdapter>, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + EthEvmConfig, + BasicBlockExecutorProvider, + Arc, + >, + >, + RpcAddOns< + NodeAdapter< + FullNodeTypesAdapter< + EthereumNode, + Arc, + BlockchainProvider>>, + >, + Components< + FullNodeTypesAdapter< + EthereumNode, + Arc, + BlockchainProvider>>, + >, + reth_network::EthNetworkPrimitives, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider< + NodeTypesWithDBAdapter>, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + EthEvmConfig, + BasicBlockExecutorProvider, + Arc, + >, + >, + EthApi< + BlockchainProvider>>, + Pool< + TransactionValidationTaskExecutor< + EthTransactionValidator< + BlockchainProvider< + NodeTypesWithDBAdapter>, + >, + EthPooledTransaction, + >, + >, + CoinbaseTipOrdering, + DiskFileBlobStore, + >, + NetworkHandle, + EthEvmConfig, + >, + EthereumEngineValidatorBuilder, + >, + >, ) { - let tasks = TaskManager::current(); - // create node config let mut node_config = NodeConfig::test().dev().with_rpc(RpcServerArgs::default().with_http()).with_unused_ports(); node_config.dev.dev = false; let mut chain = node_config.chain.as_ref().clone(); - chain.bitfinity_evm_url = bitfinity_evm_url; + chain.bitfinity_evm_url = bitfinity_evm_url.clone(); let mut node_config = node_config.with_chain(chain); let database = if let Some(import_data) = import_data { @@ -243,14 +507,23 @@ async fn start_reth_node( let node_handle = NodeBuilder::new(node_config) .with_database(database) .with_launch_context(tasks.executor()) - .launch_node(EthereumNode::default()) + .node(EthereumNode::default()) + .on_rpc_started(|ctx, _| { + // Add custom forwarder with transactions priority queue. + let Some(queue) = queue else { return Ok(()) }; + let forwarder = BitfinityTransactionsForwarder::new(queue); + ctx.registry.eth_api().set_bitfinity_tx_forwarder(forwarder); + Ok(()) + }) + .launch() .await .unwrap(); let reth_address = node_handle.node.rpc_server_handle().http_local_addr().unwrap(); + let addr_string = format!("http://{}", reth_address); let client: EthJsonRpcClient = - EthJsonRpcClient::new(ReqwestClient::new(format!("http://{}", reth_address))); + EthJsonRpcClient::new(ReqwestClient::new(addr_string)); (client, node_handle) } @@ -273,11 +546,15 @@ async fn mock_eth_server_start(methods: impl Into) -> (ServerHandle, So /// Eth server mock for local testing pub mod eth_server { - use alloy_rlp::Bytes; - use did::keccak; + use alloy_consensus::{Signed, TxEnvelope, TxLegacy}; + use alloy_rlp::{Bytes, Decodable}; + use alloy_rpc_types::Transaction; use ethereum_json_rpc_client::CertifiedResult; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; - use revm_primitives::{Address, B256, U256}; + use reth_discv5::discv5::enr::secp256k1::{Keypair, Secp256k1}; + use reth_primitives::{sign_message, TransactionSigned}; + use revm_primitives::{hex, Address, B256, U256}; + use tokio::sync::{mpsc::Sender, Mutex}; #[rpc(server, namespace = "eth")] pub trait Eth { @@ -293,6 +570,10 @@ pub mod eth_server { #[method(name = "sendRawTransaction")] async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; + /// Returns transaction by hash. + #[method(name = "getTransactionByHash")] + async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult>; + /// Returns the genesis balances. #[method(name = "getGenesisBalances", aliases = ["ic_getGenesisBalances"])] async fn get_genesis_balances(&self) -> RpcResult>; @@ -309,20 +590,32 @@ pub mod eth_server { pub struct EthImpl { /// Current gas price pub gas_price: u128, + /// Current max priority fee per gas pub max_priority_fee_per_gas: u128, + + /// List of received transactions. + pub received_txs: Mutex>, + + /// The mock will send transactions to the sender, if present. + pub txs_sender: Option>, } impl EthImpl { /// Create a new Eth server implementation - pub fn new() -> Self { - Self { gas_price: rand::random(), max_priority_fee_per_gas: rand::random() } + pub fn new(txs_sender: Option>) -> Self { + Self { + gas_price: rand::random(), + max_priority_fee_per_gas: rand::random(), + received_txs: Mutex::default(), + txs_sender, + } } } impl Default for EthImpl { fn default() -> Self { - Self::new() + Self::new(None) } } @@ -337,8 +630,46 @@ pub mod eth_server { } async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { - let hash = keccak::keccak_hash(&tx); - Ok(hash.into()) + let decoded = hex::decode(&tx).unwrap(); + let tx = TransactionSigned::decode(&mut decoded.as_ref()).unwrap(); + let hash = tx.hash(); + self.received_txs.lock().await.push(hash); + if let Some(sender) = &self.txs_sender { + sender.send(hash).await.unwrap(); + } + Ok(hash) + } + + async fn get_transaction_by_hash(&self, hash: B256) -> RpcResult> { + if !self.received_txs.lock().await.contains(&hash) { + return Ok(None); + } + + // If tx present, ruturn it with some block number. + let legacy = TxLegacy { + nonce: 42, + to: revm_primitives::TxKind::Create, + value: Default::default(), + gas_price: Default::default(), + input: Default::default(), + chain_id: Default::default(), + gas_limit: Default::default(), + }; + + let key_pair = Keypair::new(&Secp256k1::new(), &mut rand::thread_rng()); + let signature = + sign_message(B256::from_slice(&key_pair.secret_bytes()[..]), hash).unwrap(); + let envelope = TxEnvelope::Legacy(Signed::new_unchecked(legacy, signature, hash)); + let tx = Transaction { + block_hash: Some(B256::random()), + block_number: Some(42), + transaction_index: Some(42), + from: Address::random(), + effective_gas_price: None, + inner: envelope, + }; + + Ok(Some(tx)) } async fn get_genesis_balances(&self) -> RpcResult> { diff --git a/bin/reth/tests/commands/mod.rs b/bin/reth/tests/commands/mod.rs index 3dd1b918b02..ebff8c18ac8 100644 --- a/bin/reth/tests/commands/mod.rs +++ b/bin/reth/tests/commands/mod.rs @@ -1,6 +1,6 @@ //! //! bitfinity integration tests -//! +//! pub mod bitfinity_import_it; pub mod bitfinity_node_it; pub mod bitfinity_reset_evm_state_it; diff --git a/bin/reth/tests/commands/utils.rs b/bin/reth/tests/commands/utils.rs index 3f2b102f22a..b87e45377a8 100644 --- a/bin/reth/tests/commands/utils.rs +++ b/bin/reth/tests/commands/utils.rs @@ -1,6 +1,6 @@ //! //! Utils for bitfinity integration tests -//! +//! use std::{ fmt::{Debug, Display, Formatter}, path::PathBuf, @@ -24,9 +24,7 @@ use reth_chainspec::ChainSpec; use reth_db::{init_db, DatabaseEnv}; use reth_downloaders::bitfinity_evm_client::BitfinityEvmClient; use reth_errors::BlockExecutionError; -use reth_evm::execute::{ - BatchExecutor, BlockExecutionOutput, BlockExecutorProvider, Executor, -}; +use reth_evm::execute::{BatchExecutor, BlockExecutionOutput, BlockExecutorProvider, Executor}; use reth_node_api::NodeTypesWithDBAdapter; use reth_node_ethereum::EthereumNode; use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; @@ -64,7 +62,6 @@ pub fn init_logs() -> eyre::Result> { /// Type alias for the node types. pub type NodeTypes = NodeTypesWithDBAdapter>; - #[derive(Clone)] /// Data needed for the import tests. pub struct ImportData { @@ -103,7 +100,7 @@ pub async fn import_blocks( import_data.provider_factory.clone(), import_data.blockchain_db, ); - let (job_executor, _import_handle) = import.schedule_execution().await.unwrap(); + let (job_executor, _import_handle) = import.schedule_execution(None).await.unwrap(); wait_until_local_block_imported(&import_data.provider_factory, end_block, timeout).await; if shutdown_when_done { @@ -172,6 +169,11 @@ pub async fn bitfinity_import_config_data( backup_rpc_url: backup_evm_datasource_url, max_retries: 3, retry_delay_secs: 3, + tx_queue: false, + tx_queue_size: 1000, + send_queued_txs_period_secs: 3, + queued_txs_batch_size: 10, + queued_txs_per_execution_threshold: 100, }; Ok(( diff --git a/bitfinity.md b/bitfinity.md index e687051e77a..9fa7a6f06fa 100644 --- a/bitfinity.md +++ b/bitfinity.md @@ -34,7 +34,7 @@ reth node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth With cargo: ```sh -cargo run -p reth -- node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth,net,trace,txpool,web3" --disable-discovery --ipcdisable --no-persist-peers -r https://orca-app-5yyst.ondigitalocean.app -i 30 -b 100 --max-fetch-blocks 5000 --log.file.directory ./target/logs --datadir ./target/reth +cargo run -p reth -- node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth,net,trace,txpool,web3" --disable-discovery --ipcdisable --no-persist-peers -r https://block-extractor-testnet-1052151659755.europe-west9.run.app -i 30 -b 100 --max-fetch-blocks 5000 --log.file.directory ./target/logs --datadir ./target/reth ``` You can query the node using the JSON-RPC API. For example, to get the block number, you can use the following command: diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index 2ebf3cfe827..fe8948187cb 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -185,7 +185,7 @@ where // TODO Check difficulty increment between parent and self // Ace age did increment it by some formula that we need to follow. - // self.validate_against_parent_gas_limit(header, parent)?; + // self.validate_against_parent_gas_limit(header, parent)?; // Disabled by Bitfinity validate_against_parent_eip1559_base_fee( header.header(), diff --git a/crates/net/downloaders/src/bitfinity_evm_client.rs b/crates/net/downloaders/src/bitfinity_evm_client.rs index 4c529c71920..8e058d64ec5 100644 --- a/crates/net/downloaders/src/bitfinity_evm_client.rs +++ b/crates/net/downloaders/src/bitfinity_evm_client.rs @@ -12,7 +12,7 @@ use itertools::Either; use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use reth_chainspec::{ once_cell_set, BaseFeeParams, BaseFeeParamsKind, Chain, ChainHardforks, ChainSpec, - EthereumHardfork, + EthereumHardfork, ForkCondition, }; use alloy_rlp::Decodable; @@ -24,7 +24,7 @@ use reth_network_p2p::{ priority::Priority, }; use reth_network_peers::PeerId; -use reth_primitives::{BlockBody, ForkCondition, Header, TransactionSigned}; +use reth_primitives::{BlockBody, Header, TransactionSigned}; use serde_json::json; use std::{self, cmp::min, collections::HashMap}; diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index dadd056e76c..def5ef39ede 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -397,7 +397,11 @@ where full_node.config.debug.terminate, ), node: full_node, - bitfinity_import: Some((ctx.provider_factory().clone(), ctx.node_config().bitfinity_import_arg.clone()))}; + bitfinity_import: Some(( + ctx.provider_factory().clone(), + ctx.node_config().bitfinity_import_arg.clone(), + )), + }; Ok(handle) } diff --git a/crates/node/core/src/args/bitfinity_args.rs b/crates/node/core/src/args/bitfinity_args.rs index 029d2079aab..7b78e39eca2 100644 --- a/crates/node/core/src/args/bitfinity_args.rs +++ b/crates/node/core/src/args/bitfinity_args.rs @@ -56,6 +56,37 @@ pub struct BitfinityImportArgs { /// Root key for the IC network #[arg(long, value_name = "IC_ROOT_KEY", default_value = IC_MAINNET_KEY)] pub ic_root_key: String, + + /// Enable transactions priority queue + /// Default: true + #[arg(long, value_name = "TX_PRIORITY_QUEUE", default_value = "true")] + pub tx_queue: bool, + + /// Transactions priority queue will contain this much transactions at max. + /// Default: 1000 + #[arg(long, value_name = "TX_PRIORITY_QUEUE_SIZE", default_value = "1000")] + pub tx_queue_size: usize, + + /// Time period to send transactions batch from the priority queue. + /// Do nothing, if `tx_queue` is disabled. + /// Default: 3 + #[arg(long, value_name = "SEND_QUEUED_TXS_PERIOD_SECS", default_value = "3")] + pub send_queued_txs_period_secs: u64, + + /// Send queued transactions by batches with this number of entries. + /// If set to 0 or 1, no batching is used. + /// Do nothing, if `tx_queue` is disabled. + /// Default: 10 + #[arg(long, value_name = "QUEUED_TXS_BATCH_SIZE", default_value = "10")] + pub queued_txs_batch_size: usize, + + /// If transaction sender sent more queued transactions at single execution, + /// it will wait for next execution to continue. + /// If set to 0, transaction sender will try to empty queue at each execution. + /// Do nothing, if `tx_queue` is disabled. + /// Default: 500 + #[arg(long, value_name = "QUEUED_TXS_PER_EXECUTION", default_value = "500")] + pub queued_txs_per_execution_threshold: usize, } /// Bitfinity Related Args diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index f131ae7f9b3..bee59a647e9 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -1795,6 +1795,16 @@ mod tests { assert_eq!(decoded, TxKind::Create); } + #[test] + fn test_decode_create_goerli() { + // test that an example create tx from goerli decodes properly + let tx_bytes = hex!("b901f202f901ee05228459682f008459682f11830209bf8080b90195608060405234801561001057600080fd5b50610175806100206000396000f3fe608060405234801561001057600080fd5b506004361061002b5760003560e01c80630c49c36c14610030575b600080fd5b61003861004e565b604051610045919061011d565b60405180910390f35b60606020600052600f6020527f68656c6c6f2073746174656d696e64000000000000000000000000000000000060405260406000f35b600081519050919050565b600082825260208201905092915050565b60005b838110156100be5780820151818401526020810190506100a3565b838111156100cd576000848401525b50505050565b6000601f19601f8301169050919050565b60006100ef82610084565b6100f9818561008f565b93506101098185602086016100a0565b610112816100d3565b840191505092915050565b6000602082019050818103600083015261013781846100e4565b90509291505056fea264697066735822122051449585839a4ea5ac23cae4552ef8a96b64ff59d0668f76bfac3796b2bdbb3664736f6c63430008090033c080a0136ebffaa8fc8b9fda9124de9ccb0b1f64e90fbd44251b4c4ac2501e60b104f9a07eb2999eec6d185ef57e91ed099afb0a926c5b536f0155dd67e537c7476e1471"); + + let decoded = TransactionSigned::decode(&mut &tx_bytes[..]).unwrap(); + assert_eq!(tx_bytes.len(), decoded.length()); + assert_eq!(tx_bytes, &alloy_rlp::encode(decoded)[..]); + } + #[test] fn test_decode_recover_mainnet_tx() { // random mainnet tx diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index 2120db192ef..afcf5524eb6 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -514,10 +514,20 @@ where hash: B256, ) -> RpcResult>> { trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionByHash"); - Ok(EthTransactions::transaction_by_hash(self, hash) + // Ok(EthTransactions::transaction_by_hash(self, hash).await?.map(Into::into)) + + let mut tx_opt = EthTransactions::transaction_by_hash(self, hash) .await? .map(|tx| tx.into_transaction(self.tx_resp_builder())) - .transpose()?) + .transpose()?; + if tx_opt.is_none() { + tx_opt = BitfinityEvmRpc::btf_transaction_by_hash(self, hash) + .await? + .map(|tx| tx.into_transaction(self.tx_resp_builder())) + .transpose()?; + } + + Ok(tx_opt) } /// Handler for: `eth_getRawTransactionByBlockHashAndIndex` @@ -706,7 +716,7 @@ where /// Handler for: `eth_gasPrice` async fn gas_price(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_gasPrice"); - Ok(BitfinityEvmRpc::gas_price(self).await?) + Ok(BitfinityEvmRpc::btf_gas_price(self).await?) } /// Handler for: `eth_getAccount` @@ -722,7 +732,7 @@ where /// Handler for: `eth_maxPriorityFeePerGas` async fn max_priority_fee_per_gas(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_maxPriorityFeePerGas"); - Ok(BitfinityEvmRpc::max_priority_fee_per_gas(self).await?) + Ok(BitfinityEvmRpc::btf_max_priority_fee_per_gas(self).await?) } /// Handler for: `eth_blobBaseFee` @@ -790,7 +800,7 @@ where async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { trace!(target: "rpc::eth", ?tx, "Serving eth_sendRawTransaction"); // Ok(EthTransactions::send_raw_transaction(self, tx).await?) - BitfinityEvmRpc::send_raw_transaction(self, tx).await + BitfinityEvmRpc::btf_send_raw_transaction(self, tx).await } /// Handler for: `eth_sign` diff --git a/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs index dd18583d5bf..1e93180341c 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_evm_rpc.rs @@ -2,22 +2,35 @@ use std::sync::Arc; +use alloy_network::TransactionResponse; +use alloy_rlp::Decodable; use did::{Block, H256}; use ethereum_json_rpc_client::CertifiedResult; use ethereum_json_rpc_client::{reqwest::ReqwestClient, EthJsonRpcClient}; use futures::Future; use jsonrpsee::core::RpcResult; use reth_chainspec::ChainSpec; -use reth_rpc_server_types::result::internal_rpc_err; +use reth_primitives::{RecoveredTx, TransactionSigned}; +use reth_primitives_traits::SignedTransaction; +use reth_rpc_eth_types::TransactionSource; +use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err}; use revm_primitives::{Address, Bytes, B256, U256}; +use super::bitfinity_tx_forwarder::BitfinityTransactionsForwarder; + /// Proxy to the Bitfinity EVM RPC. pub trait BitfinityEvmRpc { + /// Transaction type. + type Transaction: SignedTransaction; + + /// Returns transaction forwarder. + fn bitfinity_transaction_forwarder(&self) -> Option; + /// Returns the `ChainSpec`. fn chain_spec(&self) -> Arc; /// Forwards `eth_gasPrice` calls to the Bitfinity EVM. - fn gas_price(&self) -> impl Future> + Send { + fn btf_gas_price(&self) -> impl Future> + Send { let chain_spec = self.chain_spec(); async move { let (rpc_url, client) = get_client(&chain_spec)?; @@ -33,8 +46,86 @@ pub trait BitfinityEvmRpc { } } + /// Returns transaction from forwarder or query it from EVM RPC. + fn btf_transaction_by_hash( + &self, + hash: B256, + ) -> impl Future>>> + Send { + let chain_spec = self.chain_spec(); + let forwarder = self.bitfinity_transaction_forwarder(); + + async move { + // Try to find transaction in transaction forwarder. + if let Some(forwarder) = forwarder { + if let Some(raw_tx) = forwarder.get_transaction_by_hash(hash).await { + let self_tx = + ::decode(&mut raw_tx.as_ref()) + .map_err(|e| { + internal_rpc_err(format!( + "failed to decode BitfinityEvmRpc::Transaction from tx forwarder: {e}" + )) + })?; + + let signer = self_tx.recover_signer().ok_or_else(|| { + internal_rpc_err( + "failed to recover signer from decoded BitfinityEvmRpc::Transaction", + ) + })?; + let recovered_tx = RecoveredTx::new_unchecked(self_tx, signer); + return Ok(Some(TransactionSource::Pool(recovered_tx))); + } + }; + + // If transaction is not found in forwarder, query it from EVM rpc. + let (rpc_url, client) = get_client(&chain_spec)?; + let Some(tx) = client.get_transaction_by_hash(hash.into()).await.map_err(|e| { + internal_rpc_err(format!( + "failed to forward eth_transactionByHash request to {}: {}", + rpc_url, e + )) + })? + else { + return Ok(None); + }; + + let alloy_tx: alloy_rpc_types_eth::Transaction = tx.try_into().map_err(|e| { + internal_rpc_err(format!( + "failed to convert did::Transaction into alloy_rpc_types::Transaction: {e}" + )) + })?; + let encoded = alloy_rlp::encode(&alloy_tx.inner); + let self_tx = + ::decode(&mut encoded.as_ref()) + .map_err(|e| internal_rpc_err(format!("failed to decode BitfinityEvmRpc::Transaction from received did::Transaction: {e}")))?; + + let signer = self_tx.recover_signer().ok_or_else(|| { + internal_rpc_err( + "failed to recover signer from decoded BitfinityEvmRpc::Transaction", + ) + })?; + let recovered_tx = RecoveredTx::new_unchecked(self_tx, signer); + + let block_params = alloy_tx + .block_number() + .zip(alloy_tx.transaction_index()) + .zip(alloy_tx.block_hash()); + let tx_source = match block_params { + Some(((block_number, index), block_hash)) => TransactionSource::Block { + transaction: recovered_tx, + index, + block_hash, + block_number, + base_fee: None, + }, + None => TransactionSource::Pool(recovered_tx), + }; + + Ok(Some(tx_source)) + } + } + /// Forwards `eth_maxPriorityFeePerGas` calls to the Bitfinity EVM - fn max_priority_fee_per_gas(&self) -> impl Future> + Send { + fn btf_max_priority_fee_per_gas(&self) -> impl Future> + Send { let chain_spec = self.chain_spec(); async move { let (rpc_url, client) = get_client(&chain_spec)?; @@ -51,9 +142,22 @@ pub trait BitfinityEvmRpc { } /// Forwards `eth_sendRawTransaction` calls to the Bitfinity EVM - fn send_raw_transaction(&self, tx: Bytes) -> impl Future> + Send { + fn btf_send_raw_transaction(&self, tx: Bytes) -> impl Future> + Send { let chain_spec = self.chain_spec(); + let forwarder = self.bitfinity_transaction_forwarder(); + async move { + if let Some(forwarder) = forwarder { + let typed_tx = TransactionSigned::decode(&mut tx.as_ref()).map_err(|e| { + invalid_params_rpc_err(format!( + "failed to decode eth_sendRawTransaction input {tx}: {e}" + )) + })?; + let hash = typed_tx.hash(); + forwarder.forward_raw_transaction(&tx).await?; + return Ok(hash); + }; + let (rpc_url, client) = get_client(&chain_spec)?; let tx_hash = client.send_raw_transaction_bytes(&tx).await.map_err(|e| { diff --git a/crates/rpc/rpc-eth-api/src/helpers/bitfinity_tx_forwarder.rs b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_tx_forwarder.rs new file mode 100644 index 00000000000..e66de22f41e --- /dev/null +++ b/crates/rpc/rpc-eth-api/src/helpers/bitfinity_tx_forwarder.rs @@ -0,0 +1,227 @@ +//! Module with instruments for transactions sorting, batching and sending. + +use std::{collections::BTreeSet, sync::Arc}; + +use alloy_consensus::Transaction; +use alloy_primitives::Uint; +use alloy_rlp::Decodable; +use reth_primitives::TransactionSigned; +use reth_rpc_eth_types::{EthApiError, EthResult}; +use revm_primitives::{HashMap, B256, U256}; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +/// Alias for multithread transactions priority queue. +pub type SharedQueue = Arc>; + +/// Forwarder to push transactions to the priority queue. +#[derive(Debug, Clone)] +pub struct BitfinityTransactionsForwarder { + queue: SharedQueue, +} + +impl BitfinityTransactionsForwarder { + /// Creates new forwarder with the given parameters. + pub const fn new(queue: SharedQueue) -> Self { + Self { queue } + } + + /// Adds raw tx to the priority queue. + pub async fn forward_raw_transaction(&self, raw: &[u8]) -> EthResult<()> { + let typed_tx = TransactionSigned::decode(&mut (&raw[..])).map_err(|e| { + warn!("Failed to decode signed transaction in the BitfinityTransactionsForwarder: {e}"); + EthApiError::FailedToDecodeSignedTransaction + })?; + + debug!("Pushing tx with hash {:?} to priority queue", typed_tx.hash); + let gas_price = typed_tx.effective_gas_price(None); + + self.queue.lock().await.push(typed_tx.hash(), Uint::from(gas_price), raw.to_vec()); + + Ok(()) + } + + /// Returns transaction from priority queue, if present. + pub async fn get_transaction_by_hash(&self, hash: B256) -> Option> { + self.queue.lock().await.get(&hash) + } +} + +/// Priority queue to get transactions sorted by gas price. +#[derive(Debug)] +pub struct TransactionsPriorityQueue { + priority: BTreeSet, + transactions: HashMap>, + size_limit: usize, +} + +impl TransactionsPriorityQueue { + /// Creates new instance of the queue with the given limil. + pub fn new(size_limit: usize) -> Self { + Self { priority: BTreeSet::default(), transactions: HashMap::default(), size_limit } + } + + /// Adds the tx with the given gas price. + pub fn push(&mut self, hash: B256, gas_price: U256, tx: Vec) { + let key = TxKey { gas_price, hash }; + self.priority.insert(key); + self.transactions.insert(hash, tx); + + if self.len() > self.size_limit { + self.pop_tx_with_lowest_price(); + } + } + + /// Returns raw transaction if it is present in the queue. + pub fn get(&self, hash: &B256) -> Option> { + self.transactions.get(hash).cloned() + } + + /// Returns tx with highest gas price, if present. + pub fn pop_tx_with_highest_price(&mut self) -> Option<(U256, Vec)> { + let tx_key = self.priority.pop_last()?; + let Some(tx) = self.transactions.remove(&tx_key.hash) else { + warn!("Transaction key present in priority queue, but not found in transactions map."); + return None; + }; + + Some((tx_key.gas_price, tx)) + } + + /// Returns tx with lowest gas price, if present. + pub fn pop_tx_with_lowest_price(&mut self) -> Option<(U256, Vec)> { + let tx_key = self.priority.pop_first()?; + let Some(tx) = self.transactions.remove(&tx_key.hash) else { + warn!("Transaction key present in priority queue, but not found in transactions map."); + return None; + }; + + Some((tx_key.gas_price, tx)) + } + + /// Number of transactions in the queue. + pub fn len(&self) -> usize { + self.transactions.len() + } + + /// Change size limit of the queue. + pub fn set_size_limit(&mut self, new_limit: usize) { + self.size_limit = new_limit; + + while self.len() > self.size_limit { + self.pop_tx_with_lowest_price(); + } + } + + /// Returns true if length == 0. + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } +} + +/// This struct will sort transactions by gas price, +/// but if it is equal, the key will still be different due to hash difference. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +struct TxKey { + gas_price: U256, + hash: B256, +} + +#[cfg(test)] +mod tests { + use std::sync::OnceLock; + + use super::TransactionsPriorityQueue; + use alloy_consensus::Transaction; + use reth_primitives::TransactionSigned; + use reth_transaction_pool::test_utils::MockTransaction; + use reth_transaction_pool::PoolTransaction; + use revm_primitives::{PrimitiveSignature, U256}; + + #[test] + fn test_pop_order() { + let mut queue = TransactionsPriorityQueue::new(10); + let tx1 = transaction_with_gas_price(100); + let tx2 = transaction_with_gas_price(300); + let tx3 = transaction_with_gas_price(200); + + let tx1_bytes = alloy_rlp::encode(&tx1); + let tx2_bytes = alloy_rlp::encode(&tx2); + let tx3_bytes = alloy_rlp::encode(&tx3); + + queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes.clone()); + queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes.clone()); + queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes.clone()); + + let expected_order = [tx2_bytes, tx3_bytes, tx1_bytes]; + for expected_tx in expected_order { + let popped_tx = queue.pop_tx_with_highest_price().unwrap().1; + assert_eq!(popped_tx, expected_tx); + } + + assert!(queue.is_empty()) + } + + #[test] + fn test_size_limit_should_shrink_tx_with_lowest_price() { + let mut queue = TransactionsPriorityQueue::new(2); + let tx1 = transaction_with_gas_price(100); + let tx2 = transaction_with_gas_price(300); + let tx3 = transaction_with_gas_price(200); + + let tx1_bytes = alloy_rlp::encode(&tx1); + let tx2_bytes = alloy_rlp::encode(&tx2); + let tx3_bytes = alloy_rlp::encode(&tx3); + + queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes); + queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes.clone()); + queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes.clone()); + + let expected_order = [tx2_bytes, tx3_bytes]; + for expected_tx in expected_order { + let popped_tx = queue.pop_tx_with_highest_price().unwrap().1; + assert_eq!(popped_tx, expected_tx); + } + + assert!(queue.is_empty()) + } + + #[test] + fn test_get_transaction_from_queue() { + let mut queue = TransactionsPriorityQueue::new(100); + let tx1 = transaction_with_gas_price(100); + let tx2 = transaction_with_gas_price(300); + let tx3 = transaction_with_gas_price(200); + + let tx1_bytes = alloy_rlp::encode(&tx1); + let tx2_bytes = alloy_rlp::encode(&tx2); + let tx3_bytes = alloy_rlp::encode(&tx3); + + queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes); + queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes); + queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes); + + let hashes = [tx1.hash(), tx2.hash(), tx3.hash()]; + for hash in hashes { + assert!(queue.get(&hash).is_some()); + } + + assert_eq!(queue.len(), 3); + + let tx4 = transaction_with_gas_price(400); + assert!(queue.get(&tx4.hash()).is_none()); + } + + fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned { + let tx = MockTransaction::legacy().with_gas_price(gas_price).rng_hash(); + + let hash = OnceLock::new(); + hash.get_or_init(|| *tx.hash()); + + TransactionSigned { + hash, + signature: PrimitiveSignature::test_signature(), + transaction: tx.into(), + } + } +} diff --git a/crates/rpc/rpc-eth-api/src/helpers/mod.rs b/crates/rpc/rpc-eth-api/src/helpers/mod.rs index 6981a7a5f9b..c533d4c53b7 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/mod.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/mod.rs @@ -15,6 +15,8 @@ //! all the `Eth` traits, e.g. `reth_rpc::EthApi`. pub mod bitfinity_evm_rpc; +pub mod bitfinity_tx_forwarder; + pub mod block; pub mod blocking_task; pub mod call; diff --git a/crates/rpc/rpc-eth-api/src/types.rs b/crates/rpc/rpc-eth-api/src/types.rs index 2da1bdac281..6f570350019 100644 --- a/crates/rpc/rpc-eth-api/src/types.rs +++ b/crates/rpc/rpc-eth-api/src/types.rs @@ -11,7 +11,10 @@ use reth_provider::{ProviderTx, ReceiptProvider, TransactionsProvider}; use reth_rpc_types_compat::TransactionCompat; use reth_transaction_pool::{PoolTransaction, TransactionPool}; -use crate::{AsEthApiError, FromEthApiError, FromEvmError, RpcNodeCore}; +use crate::{ + helpers::bitfinity_evm_rpc::BitfinityEvmRpc, AsEthApiError, FromEthApiError, FromEvmError, + RpcNodeCore, +}; /// Network specific `eth` API types. pub trait EthApiTypes: Send + Sync + Clone { @@ -61,7 +64,7 @@ where Transaction = RpcTransaction, Error = RpcError, >, - >, + > + BitfinityEvmRpc::Transaction>, { } @@ -77,6 +80,6 @@ impl FullEthApiTypes for T where Transaction = RpcTransaction, Error = RpcError, >, - > + > + BitfinityEvmRpc::Transaction> { } diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index cf5753a34c1..ca8ef9078b3 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -1,7 +1,7 @@ //! Implementation of the [`jsonrpsee`] generated [`EthApiServer`](crate::EthApi) trait //! Handles RPC requests for the `eth_` namespace. -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use alloy_consensus::BlockHeader; use alloy_eips::BlockNumberOrTag; @@ -14,7 +14,7 @@ use reth_provider::{ ProviderReceipt, }; use reth_rpc_eth_api::{ - helpers::{EthSigner, SpawnBlocking}, + helpers::{bitfinity_tx_forwarder::BitfinityTransactionsForwarder, EthSigner, SpawnBlocking}, node::RpcNodeCoreExt, EthApiTypes, RpcNodeCore, }; @@ -48,6 +48,9 @@ pub struct EthApi { pub(super) inner: Arc>, /// Transaction RPC response builder. pub tx_resp_builder: EthTxBuilder, + + /// Bitfinity transaction forwarder. + pub bitfinity_tx_forwarder: Arc>, } impl Clone for EthApi @@ -55,7 +58,11 @@ where Provider: BlockReader, { fn clone(&self) -> Self { - Self { inner: self.inner.clone(), tx_resp_builder: EthTxBuilder } + Self { + inner: self.inner.clone(), + tx_resp_builder: EthTxBuilder, + bitfinity_tx_forwarder: self.bitfinity_tx_forwarder.clone(), + } } } @@ -95,7 +102,19 @@ where proof_permits, ); - Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder } + Self { + inner: Arc::new(inner), + tx_resp_builder: EthTxBuilder, + bitfinity_tx_forwarder: Arc::new(OnceLock::new()), + } + } + + /// Sets bitfinity tx forwarder to handle new raw transactions. + pub fn set_bitfinity_tx_forwarder( + &self, + bitfinity_tx_forwarder: BitfinityTransactionsForwarder, + ) { + self.bitfinity_tx_forwarder.get_or_init(|| bitfinity_tx_forwarder); } } @@ -138,7 +157,11 @@ where ctx.config.proof_permits, ); - Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder } + Self { + inner: Arc::new(inner), + tx_resp_builder: EthTxBuilder, + bitfinity_tx_forwarder: Arc::new(OnceLock::new()), + } } } diff --git a/crates/rpc/rpc/src/eth/helpers/bitfinity_evm_rpc.rs b/crates/rpc/rpc/src/eth/helpers/bitfinity_evm_rpc.rs index c5a7151d1c5..69ce1d7bd08 100644 --- a/crates/rpc/rpc/src/eth/helpers/bitfinity_evm_rpc.rs +++ b/crates/rpc/rpc/src/eth/helpers/bitfinity_evm_rpc.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use reth_provider::{BlockReader, ChainSpecProvider}; -use reth_rpc_eth_api::helpers::bitfinity_evm_rpc::BitfinityEvmRpc; +use reth_rpc_eth_api::helpers::{ + bitfinity_evm_rpc::BitfinityEvmRpc, bitfinity_tx_forwarder::BitfinityTransactionsForwarder, +}; use crate::EthApi; @@ -12,6 +14,12 @@ impl BitfinityEvmRpc where Provider: BlockReader + ChainSpecProvider, { + type Transaction = Provider::Transaction; + + fn bitfinity_transaction_forwarder(&self) -> Option { + self.bitfinity_tx_forwarder.get().cloned() + } + fn chain_spec(&self) -> Arc { self.inner.provider().chain_spec() } diff --git a/docker-compose.yml b/docker-compose.yml index cd04856e6ca..01488cd7020 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,6 @@ services: dockerfile: ./Dockerfile ports: - '8080:8080' - command: node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth,net,trace,txpool,web3" --disable-discovery --ipcdisable --no-persist-peers -r https://orca-app-5yyst.ondigitalocean.app -i 10 -b 100 --datadir /reth/data + command: node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth,net,trace,txpool,web3" --disable-discovery --ipcdisable --no-persist-peers -r https://block-extractor-testnet-1052151659755.europe-west9.run.app -i 10 -b 100 --datadir /reth/data volumes: - ./target/reth:/reth