diff --git a/Cargo.toml b/Cargo.toml index 1711e063f9e..4997b28504f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,6 +137,9 @@ members = [ "examples/exex/op-bridge/", "testing/ef-tests/", "testing/testing-utils", + + # bitfinity + "bin/reth/src/bitfinity_readonly.rs", ] default-members = ["bin/reth"] diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index caf21dd0c74..ec0ce84a267 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -9,6 +9,10 @@ repository.workspace = true description = "Reth node implementation" default-run = "reth" +[[bin]] +name = "reth-readonly" +path = "src/bitfinity_readonly.rs" + [lints] workspace = true diff --git a/bin/reth/src/bitfinity_readonly.rs b/bin/reth/src/bitfinity_readonly.rs new file mode 100644 index 00000000000..47a1543d171 --- /dev/null +++ b/bin/reth/src/bitfinity_readonly.rs @@ -0,0 +1,179 @@ +//! Bitfinity Readonly Node + +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; +use tokio::time; + +use reth::blockchain_tree::BlockchainTreeEngine; +use reth_db::DatabaseEnv; +use reth_errors::ProviderError; +use reth_provider::providers::BlockchainProvider; +use reth_provider::{ + BlockNumReader, BlockReader, CanonChainTracker, DatabaseProviderFactory, HeaderProvider, + HeaderSyncGapProvider, +}; +use reth_stages::Pipeline; +// We use jemalloc for performance reasons. +#[cfg(all(feature = "jemalloc", unix))] +#[global_allocator] +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + +#[cfg(all(feature = "optimism", not(test)))] +compile_error!("Cannot build the `reth` binary with the `optimism` feature flag enabled. Did you mean to build `op-reth`?"); + +#[cfg(not(feature = "optimism"))] +fn main() { + use lightspeed_scheduler::job::Job; + use lightspeed_scheduler::scheduler::Scheduler; + use lightspeed_scheduler::JobExecutor; + use reth::cli::Cli; + + use reth_node_ethereum::EthereumNode; + + reth::sigsegv_handler::install(); + + // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided. + if std::env::var_os("RUST_BACKTRACE").is_none() { + std::env::set_var("RUST_BACKTRACE", "1"); + } + + if let Err(err) = Cli::parse_args().run(|builder, _| async { + let handle = builder.launch_readonly_node(EthereumNode::default()).await?; + + // Schedule for refetching the chain info + let job_executor = JobExecutor::new_with_local_tz(); + + // Schedule the import job + { + let interval = Duration::from_secs(5); + job_executor + .add_job_with_scheduler( + Scheduler::Interval { interval_duration: interval, execute_at_startup: false }, + Job::new("update_chain_info", "update chain info", None, move || { + let blockchain_provider = handle.node.provider.clone(); + Box::pin(async move { + start_chain_info_updater(blockchain_provider) + .expect("Failed to update chain info"); + + Ok(()) + }) + }), + ) + .await; + } + + let _job_executor = job_executor.run().await.expect("Failed to run job executor"); + + handle.node_exit_future.await + }) { + eprintln!("Error: {err:?}"); + std::process::exit(1); + } +} + +fn start_chain_info_updater( + blockchain_provider: BlockchainProvider>, +) -> eyre::Result<()> { + // Get a read-only database provider + let provider = blockchain_provider.database_provider_ro()?; + + // Get the current chain info from database + let chain_info = provider.chain_info()?; + + // Get the current provider's chain info + let current_info = blockchain_provider.chain_info()?; + + // Only update if the database has a newer block + if chain_info.best_number > current_info.best_number { + println!( + "Found new blocks: current={} new={}", + current_info.best_number, chain_info.best_number + ); + + // First, update the block hashes and clear any buffered blocks + if let Ok(updated_hashes) = blockchain_provider.update_block_hashes_and_clear_buffered() { + println!("Updated block hashes in the tree"); + + // Now walk through all new blocks and update the provider's state + for block_number in (current_info.best_number + 1)..=chain_info.best_number { + if let Ok(Some(header)) = provider.header_by_number(block_number) { + let block_hash = header.hash_slow(); + + // Try to get all the block data + match (provider.block(block_hash), provider.receipt(block_number)) { + (Ok(Some(block)), Ok(Some(receipts))) => { + println!( + "Processing block {}: hash={:?} txns={}", + block_number, + block_hash, + block.body.len() + ); + + // Get the sealed header for chain info updates + if let Ok(Some(sealed_header)) = provider.sealed_header(block_number) { + // Update the chain info tracker + blockchain_provider.set_canonical_head(sealed_header.clone()); + blockchain_provider.set_finalized(sealed_header.clone()); + blockchain_provider.set_safe(sealed_header); + + // Get a read-write provider to update the database view + if let Ok(mut provider_rw) = + blockchain_provider.database_provider_rw() + { + // Update block tables + if let Err(e) = provider_rw.insert_block(block.clone(), None) { + println!( + "Failed to insert block {}: {:?}", + block_number, e + ); + continue; + } + + // Update receipts + if let Err(e) = + provider_rw.insert_receipts(block_number, receipts) + { + println!( + "Failed to insert receipts for block {}: {:?}", + block_number, e + ); + continue; + } + + // Commit changes + if let Err(e) = provider_rw.commit() { + println!( + "Failed to commit changes for block {}: {:?}", + block_number, e + ); + continue; + } + + println!( + "Successfully updated state for block {}", + block_number + ); + } + } + } + _ => { + println!("Incomplete data for block {}, skipping", block_number); + continue; + } + } + } + } + + // Verify the final state + if let Ok(final_info) = blockchain_provider.chain_info() { + println!( + "Final provider state: number={} hash={:?}", + final_info.best_number, final_info.best_hash + ); + } + } + } + + Ok(()) +} diff --git a/bin/reth/src/commands/node/mod.rs b/bin/reth/src/commands/node/mod.rs index a9368d36113..9cb6d85d1b5 100644 --- a/bin/reth/src/commands/node/mod.rs +++ b/bin/reth/src/commands/node/mod.rs @@ -1,9 +1,8 @@ //! Main node command for launching a node use crate::args::{ - utils::parse_socket_address, - DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs, - RpcServerArgs, TxPoolArgs, + utils::parse_socket_address, DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, NetworkArgs, + PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs, }; use clap::{value_parser, Args, Parser}; use reth_cli_runner::CliContext; @@ -32,7 +31,6 @@ pub struct NodeCommand { // required = false, // )] // pub chain: Arc, - /// Enable Prometheus metrics. /// /// The metrics will be served at the given interface and port. @@ -188,7 +186,11 @@ impl NodeCommand { let db_path = data_dir.db(); tracing::info!(target: "reth::cli", path = ?db_path, "Opening database"); - let database = Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics()); + let database = if node_config.bitfinity_import_arg.readonly { + Arc::new(reth_db::open_db_read_only(&db_path.clone(), self.db.database_args())?) + } else { + Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics()) + }; if with_unused_ports { node_config = node_config.with_unused_ports(); diff --git a/bin/reth/tests/commands/utils.rs b/bin/reth/tests/commands/utils.rs index b2fa8338932..2640a350166 100644 --- a/bin/reth/tests/commands/utils.rs +++ b/bin/reth/tests/commands/utils.rs @@ -153,6 +153,7 @@ pub async fn bitfinity_import_config_data( backup_rpc_url: backup_evm_datasource_url, max_retries: 3, retry_delay_secs: 3, + readonly: false, }; Ok(( diff --git a/crates/node/builder/src/builder/bitfinity_builder.rs b/crates/node/builder/src/builder/bitfinity_builder.rs new file mode 100644 index 00000000000..ba093d15a97 --- /dev/null +++ b/crates/node/builder/src/builder/bitfinity_builder.rs @@ -0,0 +1,58 @@ +use reth_db::database::Database; +use reth_db::database_metrics::{DatabaseMetadata, DatabaseMetrics}; +use reth_node_api::NodeTypes; + +use crate::bitfinity_launch::{DefaultReadOnlyNodeLauncher, ReadOnlyLaunchNode}; +use crate::components::NodeComponentsBuilder; +use crate::{Node, NodeHandle}; + +use super::{ + NodeAdapter, NodeBuilder, NodeBuilderWithComponents, RethFullAdapter, WithLaunchContext, +}; + +impl WithLaunchContext> +where + DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static, +{ + /// Launches a preconfigured [Node] + /// + /// This bootstraps the node internals, creates all the components with the given [Node] + /// + /// Returns a [`NodeHandle`] that can be used to interact with the node. + pub async fn launch_readonly_node( + self, + node: N, + ) -> eyre::Result< + NodeHandle< + NodeAdapter< + RethFullAdapter, + >>::Components, + >, + >, + > + where + N: Node>, + { + self.node(node).launch_readonly_node().await + } +} + +impl WithLaunchContext, CB>> +where + DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static, + T: NodeTypes, + CB: NodeComponentsBuilder>, +{ + /// Launches the node and returns a handle to it. + pub async fn launch_readonly_node( + self, + ) -> eyre::Result, CB::Components>>> { + let Self { builder, task_executor } = self; + + let launcher = DefaultReadOnlyNodeLauncher::new(task_executor, builder.config.datadir()); + + builder + .launch_with_fn(|builder| async move { launcher.launch_readonly_node(builder).await }) + .await + } +} diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index 72e56b71a3e..1c041fb9c67 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -41,6 +41,7 @@ pub use states::*; use std::sync::Arc; mod states; +pub mod bitfinity_builder; /// The adapter type for a reth node with the builtin provider type // Note: we need to hardcode this because custom components might depend on it in associated types. diff --git a/crates/node/builder/src/launch/bitfinity_common.rs b/crates/node/builder/src/launch/bitfinity_common.rs new file mode 100644 index 00000000000..f38d093a792 --- /dev/null +++ b/crates/node/builder/src/launch/bitfinity_common.rs @@ -0,0 +1,133 @@ +//! Helper types that can be used by launchers. + +use crate::{ + components::{NodeComponents, NodeComponentsBuilder}, + hooks::OnComponentInitializedHook, + BuilderContext, NodeAdapter, +}; +use backon::{ConstantBuilder, Retryable}; +use eyre::Context; +use rayon::ThreadPoolBuilder; +use reth_auto_seal_consensus::MiningMode; +use reth_beacon_consensus::EthBeaconConsensus; +use reth_blockchain_tree::{ + noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, + TreeExternals, +}; +use reth_chainspec::{Chain, ChainSpec}; +use reth_config::{config::EtlConfig, PruneConfig}; +use reth_consensus::Consensus; +use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; +use reth_db_common::init::{init_genesis, InitDatabaseError}; +use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; +use reth_evm::noop::NoopBlockExecutorProvider; +use reth_network_p2p::headers::client::HeadersClient; +use reth_node_api::FullNodeTypes; +use reth_node_core::{ + dirs::{ChainPath, DataDirPath}, + node_config::NodeConfig, +}; +use reth_primitives::{BlockNumber, Head, B256}; +use reth_provider::{ + providers::{BlockchainProvider, StaticFileProvider}, + CanonStateNotificationSender, ProviderFactory, StaticFileProviderFactory, +}; +use reth_prune::{PruneModes, PrunerBuilder}; +use reth_rpc_builder::config::RethRpcServerConfig; +use reth_rpc_layer::JwtSecret; +use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget}; +use reth_static_file::StaticFileProducer; +use reth_tasks::TaskExecutor; +use reth_tracing::tracing::{debug, error, info, warn}; +use std::{marker::PhantomData, sync::Arc, thread::available_parallelism}; +use tokio::sync::{ + mpsc::{unbounded_channel, Receiver, UnboundedSender}, + oneshot, watch, +}; + +use super::common::{Attached, LaunchContextWith, WithConfigs}; + +impl LaunchContextWith> +where + DB: Database + Clone + 'static, +{ + /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check + /// between the database and static files. **It may execute a pipeline unwind if it fails this + /// check.** + pub async fn create_readonly_provider_factory(&self) -> eyre::Result> { + let factory = ProviderFactory::new( + self.right().clone(), + self.chain_spec(), + StaticFileProvider::read_only(self.data_dir().static_files())?, + ) + .with_static_files_metrics(); + + let has_receipt_pruning = + self.toml_config().prune.as_ref().map_or(false, |a| a.has_receipts_pruning()); + + info!(target: "reth::cli", "Verifying storage consistency."); + + // Check for consistency between database and static files. If it fails, it unwinds to + // the first block that's consistent between database and static files. + if let Some(unwind_target) = factory + .static_file_provider() + .check_consistency(&factory.provider()?, has_receipt_pruning)? + { + // Highly unlikely to happen, and given its destructive nature, it's better to panic + // instead. + assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0"); + + info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + + let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); + + // Builds an unwind-only pipeline + let pipeline = Pipeline::builder() + .add_stages(DefaultStages::new( + factory.clone(), + tip_rx, + Arc::new(EthBeaconConsensus::new(self.chain_spec())), + NoopHeaderDownloader::default(), + NoopBodiesDownloader::default(), + NoopBlockExecutorProvider::default(), + self.toml_config().stages.clone(), + self.prune_modes().unwrap_or_default(), + )) + .build( + factory.clone(), + StaticFileProducer::new( + factory.clone(), + self.prune_modes().unwrap_or_default(), + ), + ); + + // Unwinds to block + let (tx, rx) = oneshot::channel(); + + // Pipeline should be run as blocking and panic if it fails. + self.task_executor().spawn_critical_blocking( + "pipeline task", + Box::pin(async move { + let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await; + let _ = tx.send(result); + }), + ); + rx.await??; + } + + Ok(factory) + } + + /// Creates a new [`ProviderFactory`] and attaches it to the launch context. + pub async fn with_readonly_provider_factory( + self, + ) -> eyre::Result>>> { + let factory = self.create_readonly_provider_factory().await?; + let ctx = LaunchContextWith { + inner: self.inner, + attachment: self.attachment.map_right(|_| factory), + }; + + Ok(ctx) + } +} diff --git a/crates/node/builder/src/launch/bitfinity_launch.rs b/crates/node/builder/src/launch/bitfinity_launch.rs new file mode 100644 index 00000000000..12696a35a4c --- /dev/null +++ b/crates/node/builder/src/launch/bitfinity_launch.rs @@ -0,0 +1,402 @@ +//! Abstraction for launching a node. + +use crate::ExExLauncher; +use crate::{ + builder::{NodeAdapter, NodeAddOns, NodeTypesAdapter}, + components::{NodeComponents, NodeComponentsBuilder}, + hooks::NodeHooks, + node::FullNode, + NodeBuilderWithComponents, NodeHandle, +}; +use futures::{future::Either, stream, stream_select, StreamExt}; +use reth_beacon_consensus::{ + hooks::{EngineHooks, PruneHook, StaticFileHook}, + BeaconConsensusEngine, +}; +use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; +use reth_engine_util::EngineMessageStreamExt; +use reth_exex::ExExManagerHandle; +use reth_network::NetworkEvents; +use reth_node_api::FullNodeTypes; +use reth_node_core::{ + dirs::{ChainPath, DataDirPath}, + exit::NodeExitFuture, + version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA}, +}; +use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; +use reth_primitives::format_ether; +use reth_provider::providers::BlockchainProvider; +use reth_rpc_engine_api::EngineApi; +use reth_rpc_types::engine::ClientVersionV1; +use reth_tasks::TaskExecutor; +use reth_tracing::tracing::{debug, info}; +use reth_transaction_pool::TransactionPool; +use std::{future::Future, sync::Arc}; +use tokio::sync::{mpsc::unbounded_channel, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use super::LaunchContext; + +/// A general purpose trait that launches a new node of any kind. +/// +/// Acts as a node factory. +/// +/// This is essentially the launch logic for a node. +/// +/// See also [`DefaultNodeLauncher`] and [`NodeBuilderWithComponents::launch_with`] +pub trait ReadOnlyLaunchNode { + /// The node type that is created. + type Node; + + /// Create and return a new node asynchronously. + fn launch_readonly_node(self, target: Target) -> impl Future> + Send; +} + +impl ReadOnlyLaunchNode for F +where + F: FnOnce(Target) -> Fut + Send, + Fut: Future> + Send, +{ + type Node = Node; + + fn launch_readonly_node(self, target: Target) -> impl Future> + Send { + self(target) + } +} + +/// The default launcher for a node. +#[derive(Debug)] +pub struct DefaultReadOnlyNodeLauncher { + /// The task executor for the node. + pub ctx: LaunchContext, +} + +impl DefaultReadOnlyNodeLauncher { + /// Create a new instance of the default node launcher. + pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath) -> Self { + Self { ctx: LaunchContext::new(task_executor, data_dir) } + } +} + +impl ReadOnlyLaunchNode> for DefaultReadOnlyNodeLauncher +where + T: FullNodeTypes::DB>>, + CB: NodeComponentsBuilder, +{ + type Node = NodeHandle>; + + async fn launch_readonly_node( + self, + target: NodeBuilderWithComponents, + ) -> eyre::Result { + let Self { ctx } = self; + let NodeBuilderWithComponents { + adapter: NodeTypesAdapter { database }, + components_builder, + add_ons: NodeAddOns { hooks, rpc, exexs: installed_exex }, + config, + } = target; + let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; + + // setup the launch context + let ctx = ctx + .with_configured_globals() + // load the toml config + .with_loaded_toml_config(config).await? + // add resolved peers + .with_resolved_peers().await? + // attach the database + .attach(database.clone()) + // ensure certain settings take effect + .with_adjusted_configs() + .with_readonly_provider_factory().await? + .inspect(|_| { + info!(target: "reth::cli", "Readonly Provider Factory opened"); + }) + .with_prometheus().await? + .inspect(|this| { + debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); + }) + .with_genesis()? + .inspect(|this| { + info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); + }) + .with_metrics() + // passing FullNodeTypes as type parameter here so that we can build + // later the components. + .with_blockchain_db::().await? + .with_components(components_builder, on_component_initialized).await?; + + // spawn exexs + let exex_manager_handle = ExExLauncher::new( + ctx.head(), + ctx.node_adapter().clone(), + installed_exex, + ctx.configs().clone(), + ) + .launch() + .await; + + // create pipeline + let network_client = ctx.components().network().fetch_client().await?; + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + + let node_config = ctx.node_config(); + let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + .maybe_skip_fcu(node_config.debug.skip_fcu) + .maybe_skip_new_payload(node_config.debug.skip_new_payload) + // Store messages _after_ skipping so that `replay-engine` command + // would replay only the messages that were observed by the engine + // during this run. + .maybe_store_messages(node_config.debug.engine_api_store.clone()); + + let max_block = ctx.max_block(network_client.clone()).await?; + let mut hooks = EngineHooks::new(); + + let static_file_producer = ctx.static_file_producer(); + let static_file_producer_events = static_file_producer.lock().events(); + hooks.add(StaticFileHook::new( + static_file_producer.clone(), + Box::new(ctx.task_executor().clone()), + )); + info!(target: "reth::cli", "StaticFileProducer initialized"); + + // Configure the pipeline + let pipeline_exex_handle = + exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); + let (pipeline, client) = if ctx.is_dev() { + info!(target: "reth::cli", "Starting Reth in dev mode"); + + for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() { + info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx, address.to_string(), format_ether(alloc.balance)); + } + + // install auto-seal + let mining_mode = + ctx.dev_mining_mode(ctx.components().pool().pending_transactions_listener()); + info!(target: "reth::cli", mode=%mining_mode, "configuring dev mining mode"); + + let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new( + ctx.chain_spec(), + ctx.blockchain_db().clone(), + ctx.components().pool().clone(), + consensus_engine_tx.clone(), + mining_mode, + ctx.components().block_executor().clone(), + ) + .build(); + + let pipeline = crate::setup::build_networked_pipeline( + &ctx.toml_config().stages, + client.clone(), + ctx.consensus(), + ctx.provider_factory().clone(), + ctx.task_executor(), + ctx.sync_metrics_tx(), + ctx.prune_config(), + max_block, + static_file_producer, + ctx.components().block_executor().clone(), + pipeline_exex_handle, + ) + .await?; + + let pipeline_events = pipeline.events(); + task.set_pipeline_events(pipeline_events); + debug!(target: "reth::cli", "Spawning auto mine task"); + ctx.task_executor().spawn(Box::pin(task)); + + (pipeline, Either::Left(client)) + } else { + let pipeline = crate::setup::build_networked_pipeline( + &ctx.toml_config().stages, + network_client.clone(), + ctx.consensus(), + ctx.provider_factory().clone(), + ctx.task_executor(), + ctx.sync_metrics_tx(), + ctx.prune_config(), + max_block, + static_file_producer, + ctx.components().block_executor().clone(), + pipeline_exex_handle, + ) + .await?; + + (pipeline, Either::Right(network_client.clone())) + }; + + let pipeline_events = pipeline.events(); + + let initial_target = ctx.node_config().debug.tip; + + let mut pruner_builder = + ctx.pruner_builder().max_reorg_depth(ctx.tree_config().max_reorg_depth() as usize); + if let Some(exex_manager_handle) = &exex_manager_handle { + pruner_builder = + pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); + } + + let pruner = pruner_builder.build(ctx.provider_factory().clone()); + + let pruner_events = pruner.events(); + info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); + hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone()))); + + // Configure the consensus engine + let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( + client, + pipeline, + ctx.blockchain_db().clone(), + Box::new(ctx.task_executor().clone()), + Box::new(ctx.components().network().clone()), + max_block, + ctx.components().payload_builder().clone(), + initial_target, + reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, + consensus_engine_tx, + Box::pin(consensus_engine_stream), + hooks, + )?; + info!(target: "reth::cli", "Consensus engine initialized"); + + let events = stream_select!( + ctx.components().network().event_listener().map(Into::into), + beacon_engine_handle.event_listener().map(Into::into), + pipeline_events.map(Into::into), + if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + }, + pruner_events.map(Into::into), + static_file_producer_events.map(Into::into), + ); + ctx.task_executor().spawn_critical( + "events task", + node::handle_events( + Some(ctx.components().network().clone()), + Some(ctx.head().number), + events, + database, + ), + ); + + let client = ClientVersionV1 { + code: CLIENT_CODE, + name: NAME_CLIENT.to_string(), + version: CARGO_PKG_VERSION.to_string(), + commit: VERGEN_GIT_SHA.to_string(), + }; + let engine_api = EngineApi::new( + ctx.blockchain_db().clone(), + ctx.chain_spec(), + beacon_engine_handle, + ctx.components().payload_builder().clone().into(), + Box::new(ctx.task_executor().clone()), + client, + ); + info!(target: "reth::cli", "Engine API handler initialized"); + + // extract the jwt secret from the args if possible + let jwt_secret = ctx.auth_jwt_secret()?; + + // Start RPC servers + let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers( + ctx.node_adapter().clone(), + engine_api, + ctx.node_config(), + jwt_secret, + rpc, + ) + .await?; + + // in dev mode we generate 20 random dev-signer accounts + if ctx.is_dev() { + rpc_registry.eth_api().with_dev_accounts(); + } + + // Run consensus engine to completion + let (tx, rx) = oneshot::channel(); + info!(target: "reth::cli", "Starting consensus engine"); + ctx.task_executor().spawn_critical_blocking("consensus engine", async move { + let res = beacon_consensus_engine.await; + let _ = tx.send(res); + }); + + if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { + info!(target: "reth::cli", "Using etherscan as consensus client"); + + let chain = ctx.node_config().chain.chain; + let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { + // If URL isn't provided, use default Etherscan URL for the chain if it is known + chain + .etherscan_urls() + .map(|urls| urls.0.to_string()) + .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) + })?; + + let block_provider = EtherscanBlockProvider::new( + etherscan_url, + chain.etherscan_api_key().ok_or_else(|| { + eyre::eyre!( + "etherscan api key not found for rpc consensus client for chain: {chain}" + ) + })?, + ); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("etherscan consensus client", async move { + rpc_consensus_client.run::().await + }); + } + + if let Some(rpc_ws_url) = ctx.node_config().debug.rpc_consensus_ws.clone() { + info!(target: "reth::cli", "Using rpc provider as consensus client"); + + let block_provider = RpcBlockProvider::new(rpc_ws_url); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("rpc consensus client", async move { + rpc_consensus_client.run::().await + }); + } + + let full_node = FullNode { + evm_config: ctx.components().evm_config().clone(), + block_executor: ctx.components().block_executor().clone(), + pool: ctx.components().pool().clone(), + network: ctx.components().network().clone(), + provider: ctx.node_adapter().provider.clone(), + payload_builder: ctx.components().payload_builder().clone(), + task_executor: ctx.task_executor().clone(), + rpc_server_handles, + rpc_registry, + config: ctx.node_config().clone(), + data_dir: ctx.data_dir().clone(), + }; + // Notify on node started + on_node_started.on_event(full_node.clone())?; + + let handle = NodeHandle { + node_exit_future: NodeExitFuture::new( + async { Ok(rx.await??) }, + full_node.config.debug.terminate, + ), + node: full_node, + bitfinity_import: Some(( + ctx.provider_factory().clone(), + ctx.node_config().bitfinity_import_arg.clone(), + )), + }; + + Ok(handle) + } +} diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 4077137c58f..f5cd6902332 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -34,7 +34,9 @@ use std::{future::Future, sync::Arc}; use tokio::sync::{mpsc::unbounded_channel, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; +mod bitfinity_common; pub mod common; +pub mod bitfinity_launch; pub use common::LaunchContext; mod exex; pub use exex::ExExLauncher; @@ -111,7 +113,6 @@ where .attach(database.clone()) // ensure certain settings take effect .with_adjusted_configs() - // Create the provider factory .with_provider_factory().await? .inspect(|_| { info!(target: "reth::cli", "Database opened"); diff --git a/crates/node/core/src/args/bitfinity_args.rs b/crates/node/core/src/args/bitfinity_args.rs index 97489ff823a..dcbc7016b0d 100644 --- a/crates/node/core/src/args/bitfinity_args.rs +++ b/crates/node/core/src/args/bitfinity_args.rs @@ -56,6 +56,10 @@ pub struct BitfinityImportArgs { /// Root key for the IC network #[arg(long, value_name = "IC_ROOT_KEY", default_value = IC_MAINNET_KEY)] pub ic_root_key: String, + + /// Readonly mode + #[arg(long("ro"), default_value = "false")] + pub readonly: bool, } /// Bitfinity Related Args diff --git a/docker-compose.yml b/docker-compose.yml index cd04856e6ca..884a5bacfad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,4 +10,4 @@ services: - '8080:8080' command: node -vvv --http --http.port 8080 --http.addr 0.0.0.0 --http.api "debug,eth,net,trace,txpool,web3" --disable-discovery --ipcdisable --no-persist-peers -r https://orca-app-5yyst.ondigitalocean.app -i 10 -b 100 --datadir /reth/data volumes: - - ./target/reth:/reth + - ./target/reth:/reth:ro