Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
713006f
txs priority queue
F3kilo Nov 21, 2024
5faa2d3
added forwarder to the node
F3kilo Nov 21, 2024
c359295
transaction sender
F3kilo Nov 22, 2024
2d87ebb
tests
F3kilo Nov 25, 2024
67fe67d
single tx test with queue
F3kilo Nov 25, 2024
119312f
test transactions order
F3kilo Nov 25, 2024
690b162
test fixed
F3kilo Nov 27, 2024
5289ab1
tests fixed
F3kilo Nov 27, 2024
3a5b76d
clear global state
F3kilo Nov 27, 2024
4502c54
build fixed
F3kilo Nov 27, 2024
d26ccc7
remove global txs list
F3kilo Nov 29, 2024
3e1a018
move send txs code to tasks module
F3kilo Nov 29, 2024
992f6d1
better cli args types + use channel
F3kilo Nov 29, 2024
b4ca64a
get transactions from forwarder, reth, and evmc
F3kilo Dec 3, 2024
b589177
better tests + clippy fixes
F3kilo Dec 4, 2024
ccbec99
query transactions test
F3kilo Dec 5, 2024
b2a6db0
taks manager live long enough
F3kilo Dec 6, 2024
01c73c6
fixed block number bug
F3kilo Dec 6, 2024
6afe3f5
increased timeout to fix test
F3kilo Dec 6, 2024
170cecc
increase timeout in another test
F3kilo Dec 6, 2024
da1747b
higher timeout for bitfinity_test_reset_should_extract_all_accounts_data
F3kilo Dec 9, 2024
76b59b3
much more timeout
F3kilo Dec 9, 2024
bc30706
send tx batches concurrently
F3kilo Dec 10, 2024
78ccfba
use newer evm-sdk
F3kilo Dec 16, 2024
a0da333
Merge branch 'bitfinity-archive-node' into send_raw_transaction_pool
F3kilo Jan 6, 2025
79dd6b8
conflicts solved + generic tx in BitfinityEvmRpc
F3kilo Jan 13, 2025
3f1457c
Merge branch 'bitfinity-archive-node' into send_raw_transaction_pool
F3kilo Jan 13, 2025
ae0eb85
tx forwarder as EthApi module
F3kilo Jan 14, 2025
82c8696
put tx forwarder into EthApi
F3kilo Jan 14, 2025
31537d0
compilation fixed
F3kilo Jan 14, 2025
297e6b2
tests fixed
F3kilo Jan 14, 2025
7228809
revert unnecessary changes in reth files
F3kilo Jan 16, 2025
e4cbd25
re-apply bitfinity changes
F3kilo Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,27 @@ similar-asserts.workspace = true
async-channel.workspace = true
candid.workspace = true
did.workspace = true
revm-primitives.workspace = true
evm-canister-client = { workspace = true, features = ["ic-agent-client"] }
lightspeed_scheduler = { workspace = true, features = ["tracing"] }
# rlp = { workspace = true }
ethereum-json-rpc-client = { workspace = true, features = ["reqwest"] }

[dev-dependencies]
tempfile.workspace = true

# bitfinity dev dependencies
async-trait = { workspace = true }
dirs.workspace = true
ethereum-json-rpc-client = { workspace = true, features = ["reqwest"] }
jsonrpsee = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
reth-primitives = { workspace = true, features = ["test-utils"] }
reth-trie = { workspace = true, features = ["test-utils"] }
revm-primitives.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
reth-db-common.workspace = true
reth-db = { workspace = true, features = ["mdbx", "test-utils"] }
serial_test.workspace = true
reth-discv5.workspace = true

[features]
default = ["jemalloc"]
Expand Down
3 changes: 3 additions & 0 deletions bin/reth/src/bitfinity_tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! This module contains tasks to run in parallel with reth node.

pub mod send_txs;
153 changes: 153 additions & 0 deletions bin/reth/src/bitfinity_tasks/send_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Utils for raw transaction batching.

use std::time::Duration;

use did::H256;
use ethereum_json_rpc_client::reqwest::ReqwestClient;
use ethereum_json_rpc_client::{EthJsonRpcClient, Id, Params};
use eyre::eyre;
use futures::future::join_all;
use lightspeed_scheduler::job::Job;
use lightspeed_scheduler::scheduler::Scheduler;
use lightspeed_scheduler::JobExecutor;
use reth_node_core::version::SHORT_VERSION;
use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::SharedQueue;
use revm_primitives::{hex, U256};
use tracing::{info, trace, warn};

/// Periodically sends transactions from priority queue.
#[derive(Debug, Clone)]
pub struct BitfinityTransactionSender {
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: usize,
txs_per_execution_threshold: usize,
}

impl BitfinityTransactionSender {
/// Creates new instance of the transaction sender.
pub const fn new(
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: usize,
txs_per_execution_threshold: usize,
) -> Self {
Self { queue, rpc_url, period, batch_size, txs_per_execution_threshold }
}

/// Schedule the transaction sending job and return a handle to it.
pub async fn schedule_execution(
self,
job_executor: Option<JobExecutor>,
) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> {
info!(target: "reth::cli - BitfinityTransactionSender", "reth {} starting", SHORT_VERSION);

let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz);

// Schedule the import job
{
let interval =
Scheduler::Interval { interval_duration: self.period, execute_at_startup: true };
job_executor
.add_job_with_scheduler(
interval,
Job::new("send transactions", "bitfinity tx sending", None, move || {
let tx_sender = self.clone();
Box::pin(async move {
tx_sender.single_execution().await?;
Ok(())
})
}),
)
.await;
}

let job_handle = job_executor.run().await?;
Ok((job_executor, job_handle))
}

/// Execute the transaction sending job.
pub async fn single_execution(&self) -> eyre::Result<()> {
let mut to_send = self.get_transactions_to_send().await;
let batch_size = self.batch_size.max(1);
let mut send_futures = vec![];

loop {
let last_idx = batch_size.min(to_send.len());
if last_idx == 0 {
break;
}

let to_send_batch: Vec<_> = to_send.drain(..last_idx).collect();

let send_future = async move {
let result = match to_send_batch.len() {
0 => return,
1 => self.send_single_tx(&to_send_batch[0].1).await,
_ => self.send_txs_batch(&to_send_batch).await,
};

if let Err(e) = result {
warn!("Failed to send transactions to EVM: {e}");
}
};
send_futures.push(send_future);
}

join_all(send_futures).await;

Ok(())
}

async fn get_transactions_to_send(&self) -> Vec<(U256, Vec<u8>)> {
let mut batch = Vec::with_capacity(self.txs_per_execution_threshold);
let mut queue = self.queue.lock().await;
let txs_to_pop = self.txs_per_execution_threshold.max(1); // if batch size is zero, take at least one tx.

for _ in 0..txs_to_pop {
let Some(entry) = queue.pop_tx_with_highest_price() else {
break;
};

batch.push(entry);
}

batch
}

async fn send_single_tx(&self, to_send: &[u8]) -> Result<(), eyre::Error> {
let client = self.get_client()?;
let hash = client
.send_raw_transaction_bytes(to_send)
.await
.map_err(|e| eyre!("failed to send single transaction: {e}"))?;

trace!("Single transaction with hash {hash} sent.");

Ok(())
}

async fn send_txs_batch(&self, to_send: &[(U256, Vec<u8>)]) -> Result<(), eyre::Error> {
let client = self.get_client()?;

let params =
to_send.iter().map(|(_, raw)| (Params::Array(vec![hex::encode(raw).into()]), Id::Null));
let max_batch_size = usize::MAX;
let hashes = client
.batch_request::<H256>("eth_sendRawTransaction".into(), params, max_batch_size)
.await
.map_err(|e| eyre!("failed to send single transaction: {e}"))?;

trace!("Raw transactions batch sent. Hashes: {hashes:?}");

Ok(())
}

fn get_client(&self) -> eyre::Result<EthJsonRpcClient<ReqwestClient>> {
let client = EthJsonRpcClient::new(ReqwestClient::new(self.rpc_url.clone()));

Ok(client)
}
}
3 changes: 2 additions & 1 deletion bin/reth/src/commands/bitfinity_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ impl BitfinityImportCommand {
/// Schedule the import job and return a handle to it.
pub async fn schedule_execution(
self,
job_executor: Option<JobExecutor>,
) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> {
info!(target: "reth::cli - BitfinityImportCommand", "reth {} starting", SHORT_VERSION);

let job_executor = JobExecutor::new_with_local_tz();
let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz);

// Schedule the import job
{
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! This contains all of the `reth` commands

pub mod bitfinity_reset_evm_state;
pub mod bitfinity_import;
pub mod bitfinity_reset_evm_state;
pub mod debug_cmd;
1 change: 1 addition & 0 deletions bin/reth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub mod bitfinity_tasks;
pub mod cli;
pub mod commands;

Expand Down
33 changes: 30 additions & 3 deletions bin/reth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#[global_allocator]
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();

use std::{sync::Arc, time::Duration};

use clap::{Args, Parser};
use reth::bitfinity_tasks::send_txs::BitfinityTransactionSender;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::{
engine_tree_config::{
Expand All @@ -13,7 +16,9 @@ use reth_node_builder::{
};
use reth_node_ethereum::node::EthereumAddOns;
use reth_provider::providers::BlockchainProvider2;
use reth_rpc_api::eth::helpers::bitfinity_tx_forwarder::{BitfinityTransactionsForwarder, TransactionsPriorityQueue};
use reth_tracing::tracing::warn;
use tokio::sync::Mutex;
use tracing::info;

/// Parameters for configuring the engine
Expand Down Expand Up @@ -102,19 +107,41 @@ fn main() {
let datadir = handle.node.data_dir.clone();
let (provider_factory, bitfinity) = handle.bitfinity_import.clone().expect("Bitfinity import not configured");


// Init bitfinity import
{
let executor = {
let import = BitfinityImportCommand::new(
config,
datadir,
chain,
bitfinity,
bitfinity.clone(),
provider_factory,
blockchain_provider,
);
let _import_handle = import.schedule_execution().await?;
let (executor, _import_handle) = import.schedule_execution(None).await?;
executor
};

if bitfinity.tx_queue {
let queue = Arc::new(Mutex::new(TransactionsPriorityQueue::new(1000)));

// Make EthApi handler move new txs to queue.
let queue_clone = Arc::clone(&queue);
handle.node.add_ons_handle.eth_api().set_bitfinity_tx_forwarder(BitfinityTransactionsForwarder::new(queue_clone));

// Run batch transaction sender.
let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url);
let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs);
let transaction_sending = BitfinityTransactionSender::new(
queue,
url,
period,
bitfinity.queued_txs_batch_size,
bitfinity.queued_txs_per_execution_threshold,
);
let _sending_handle = transaction_sending.schedule_execution(Some(executor)).await?;
}

handle.node_exit_future.await
}
}
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/tests/bitfinity_it.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//!
//! bitfinity integration tests
//!
pub mod commands;
//!
pub mod commands;
Loading
Loading