Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Dockerfile.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM rust:1.87.0-slim

RUN apt update && apt install -y clang build-essential libssl-dev pkg-config

WORKDIR /app

2 changes: 1 addition & 1 deletion bin/reth/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ mod tests {
}

// /// Tests that the log directory is parsed correctly when using the node command. It's
// /// always tied to the specific chain's name.
// /// always tied to the specific chain's name.
// #[test]
// fn parse_logs_path_node() {
// let mut reth = Cli::try_parse_args_from(["reth", "node"]).unwrap();
Expand Down
303 changes: 289 additions & 14 deletions bin/reth/src/commands/bitfinity_reset_evm_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

Expand All @@ -10,17 +11,33 @@ use clap::Parser;
use did::evm_state::EvmResetState;
use did::{AccountInfoMap, RawAccountInfo, H160, H256};
use evm_canister_client::{CanisterClient, EvmCanisterClient, IcAgentClient};
use reth_cli_commands::common::{AccessRights, EnvironmentArgs};
use reth_cli_commands::import::build_import_pipeline;
use reth_config::Config;
use reth_db::cursor::DbCursorRO;
use reth_db::mdbx::tx::Tx;
use reth_db::mdbx::RO;
use reth_db::transaction::DbTx;
use reth_db::{init_db, tables, DatabaseEnv};
use reth_downloaders::bitfinity_evm_client::BitfinityEvmClient;
use reth_node_api::NodeTypesWithDBAdapter;
use reth_node_core::args::{BitfinityResetEvmStateArgs, DatadirArgs};
use reth_downloaders::file_client::ChunkedFileReader;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_api::{BlockTy, NodeTypesWithDBAdapter};
use reth_node_core::args::{BitfinityResetEvmStateArgs, DatabaseArgs, DatadirArgs};
use reth_node_core::dirs::{DataDirPath, MaybePlatformPath};
use reth_node_ethereum::EthereumNode;
use reth_node_core::version::SHORT_VERSION;
use reth_node_ethereum::consensus::EthBeaconConsensus;
use reth_node_ethereum::{EthExecutorProvider, EthereumNode};
use reth_primitives::StorageEntry;
use reth_provider::providers::StaticFileProvider;
use reth_provider::{BlockNumReader, BlockReader, ProviderFactory};
use reth_provider::{
BlockNumReader, BlockReader, ChainSpecProvider as _, DatabaseProvider, HeaderProvider as _,
ProviderFactory, StageCheckpointReader as _,
};
use reth_prune::PruneModes;
use reth_stages::StageId;
use reth_static_file::StaticFileProducer;
use tokio::io::AsyncWriteExt as _;
use tracing::{debug, error, info, trace, warn};

/// `ProviderFactory` type alias.
Expand All @@ -45,6 +62,15 @@ pub struct BitfinityResetEvmStateCommandBuilder {
pub bitfinity: BitfinityResetEvmStateArgs,
}

#[derive(Debug)]
/// Configuration for the working directory used during the reset process
pub struct ImportUpToConfig {
block_dir: PathBuf,
config_path: PathBuf,
provider_factory: BitfinityResetEvmProviderFactory,
end_block: u64,
}

impl BitfinityResetEvmStateCommandBuilder {
/// Build the command
pub async fn build(self) -> eyre::Result<BitfinityResetEvmStateCommand> {
Expand All @@ -64,26 +90,65 @@ impl BitfinityResetEvmStateCommandBuilder {
);
let executor = Arc::new(EvmCanisterResetStateExecutor::new(evm_client));

let data_dir = self.datadir.unwrap_or_chain_default(chain.chain, DatadirArgs::default());
let chain_spec = chain.chain;
let data_dir = self.datadir.unwrap_or_chain_default(chain_spec, DatadirArgs::default());
let db_path = data_dir.db();
let db = Arc::new(init_db(db_path, Default::default())?);
let db: Arc<DatabaseEnv> = Arc::new(init_db(db_path.clone(), Default::default())?);
let provider_factory: BitfinityResetEvmProviderFactory = ProviderFactory::new(
db,
chain,
chain.clone(),
StaticFileProvider::read_write(data_dir.static_files())?,
);
// let provider_factory = ProviderFactory::new(
// db,
// chain,
// StaticFileProvider::read_write(data_dir.static_files())?,
// );

let wrkdir =
self.bitfinity.workdir.unwrap_or_chain_default(chain_spec, DatadirArgs::default());

// init genesis for workdir
EnvironmentArgs::<EthereumChainSpecParser> {
db: DatabaseArgs::default(),
datadir: DatadirArgs {
datadir: self.bitfinity.workdir,
static_files_path: Some(wrkdir.static_files()),
},
config: Some(data_dir.config()),
chain: chain.clone(),
}
.init::<EthereumNode>(AccessRights::RW)?;
info!(target: "reth::cli",
"Initialized workdir with genesis block",
);

let import_up_to_config = if let (Some(block_dir), Some(end_block)) =
(self.bitfinity.block_dir, self.bitfinity.end_block)
{
let wrkdir_db_path = wrkdir.db();
let wrkdir_config_path = wrkdir.config();
let wrkdir_db: Arc<DatabaseEnv> =
Arc::new(init_db(wrkdir_db_path.clone(), Default::default())?);
let wrkdir_provider_factory: BitfinityResetEvmProviderFactory = ProviderFactory::new(
wrkdir_db,
chain,
StaticFileProvider::read_write(wrkdir.static_files())?,
);

Some(ImportUpToConfig {
block_dir,
end_block,
config_path: wrkdir_config_path,
provider_factory: wrkdir_provider_factory,
})
} else {
warn!(target: "reth::cli", "No block directory specified, using default: bitfinity_import");
None
};

Ok(BitfinityResetEvmStateCommand::new(
provider_factory,
executor,
self.bitfinity.parallel_requests,
self.bitfinity.max_request_bytes,
self.bitfinity.max_account_request_bytes,
import_up_to_config,
))
}
}
Expand All @@ -96,6 +161,9 @@ pub struct BitfinityResetEvmStateCommand {
parallel_requests: usize,
max_request_bytes: usize,
max_account_request_bytes: usize,
/// Configuration for working when the end block is specified.
/// it's required since we need to dump and reprocess the blocks before syncing the evm.
import_up_to: Option<ImportUpToConfig>,
}

impl BitfinityResetEvmStateCommand {
Expand All @@ -106,20 +174,227 @@ impl BitfinityResetEvmStateCommand {
parallel_requests: usize,
max_request_bytes: usize,
max_account_request_bytes: usize,
import_up_to: Option<ImportUpToConfig>,
) -> Self {
Self {
provider_factory,
executor,
parallel_requests: parallel_requests.max(1),
max_request_bytes,
max_account_request_bytes,
import_up_to,
}
}

/// Execute the command
/// Execute command
pub async fn execute(&self) -> eyre::Result<()> {
let mut provider = self.provider_factory.provider()?;
// check if end_block is specified
if let Some(config) = self.import_up_to.as_ref() {
// dump blocks up to the end block
info!(target: "reth::cli", "End block specified: {}", config.end_block);
self.import_up_to(config).await?;

// use provider of wrkdir
self.reset(config.provider_factory.provider().expect("failed to get provider")).await?;
} else {
// just reset using the current data dir
info!(target: "reth::cli", "No end block specified, resetting to the last block in the database");
self.reset(self.provider_factory.provider().expect("failed to get provider")).await?;
}

Ok(())
}

async fn dump_database_blocks(
&self,
import_up_to: &ImportUpToConfig,
) -> eyre::Result<Vec<PathBuf>> {
let mut block_paths = BTreeMap::new();
let provider = self.provider_factory.provider()?;
let tx_ref = provider.tx_ref();

info!(target: "reth::cli",
"Dumping blocks from database up to block {}",
import_up_to.end_block
);

// Dump all blocks in the database
let mut cursor = tx_ref.cursor_read::<tables::HeaderNumbers>()?;
while let Some((_, block_number)) = cursor.next()? {
if block_number > import_up_to.end_block || block_number == 0 {
continue; // apparently blocks are not ordered and we don't want to import genesis as well
}
info!(target: "reth::cli", "Dumping block {}", block_number);
let block = provider.block_by_number(block_number)?;
if let Some(block) = block {
let file_path = import_up_to.block_dir.join(format!("block_{}.rlp", block_number));

//block
let mut buf = Vec::new();
block.encode(&mut buf);
let mut file = tokio::fs::File::create(&file_path).await?;
file.write_all(&buf).await?;
file.flush().await?;

info!(target: "reth::cli",
"Dumped block {} to file: {}",
block_number,
file_path.display()
);
block_paths.insert(block_number, file_path);
}
}

// get just files now
let block_paths = block_paths.values().cloned().collect();

Ok(block_paths)
}

/// Execute `import` command
async fn import_up_to(&self, import_up_to: &ImportUpToConfig) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);

// dump files
let block_paths = self.dump_database_blocks(import_up_to).await?;

// open file
let consensus =
Arc::new(EthBeaconConsensus::new(import_up_to.provider_factory.chain_spec()));

info!(target: "reth::cli",
"Starting import of chain file from: {}",
import_up_to.block_dir.display()
);

let mut total_decoded_blocks = 0;
let mut total_decoded_txns = 0;

for block_file in block_paths {
info!(target: "reth::cli",
"Importing chain file: {}",
block_file.display()
);
let mut reader = ChunkedFileReader::new(&block_file, Some(8192)).await?;

let mut sealed_header = import_up_to
.provider_factory
.sealed_header(import_up_to.provider_factory.last_block_number()?)?
.expect("should have genesis");

let config = Config::from_path(&import_up_to.config_path)
.expect("Failed to load BitfinityImportCommand configuration");

while let Some(file_client) = reader
.next_chunk::<BlockTy<EthereumNode>>(consensus.clone(), Some(sealed_header))
.await?
{
// create a new FileClient from chunk read from file
debug!(target: "reth::cli",
"Importing chain file chunk"
);

let tip = file_client.tip().ok_or(eyre::eyre!("file client has no tip"))?;
debug!(target: "reth::cli", "Chain file chunk read");

total_decoded_blocks += file_client.headers_len();
total_decoded_txns += file_client.total_transactions();

let producer = StaticFileProducer::new(
import_up_to.provider_factory.clone(),
PruneModes::default(),
);

debug!(target: "reth::cli",
"Total decoded blocks: {}, total decoded transactions: {}",
total_decoded_blocks,
total_decoded_txns
);

let executor =
EthExecutorProvider::ethereum(import_up_to.provider_factory.chain_spec());

debug!("executor ready");

let (mut pipeline, events) = build_import_pipeline(
&config,
import_up_to.provider_factory.clone(),
&consensus,
Arc::new(file_client),
producer,
false,
executor,
)?;

debug!(target: "reth::cli", "Import pipeline built successfully");

// override the tip
pipeline.set_tip(tip);
debug!(target: "reth::cli", ?tip, "Tip manually set");

let provider = import_up_to.provider_factory.provider()?;

let latest_block_number = provider
.get_stage_checkpoint(StageId::Finish)?
.map(|ch: reth_stages::StageCheckpoint| ch.block_number);

debug!(target: "reth::cli",
"Latest block number: {:?}",
latest_block_number
);

tokio::spawn(reth_node_events::node::handle_events(
None,
latest_block_number,
events,
));

// Run pipeline
debug!(target: "reth::cli", "Starting sync pipeline");
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}
let last_block_number = provider.last_block_number()?;

sealed_header = import_up_to
.provider_factory
.sealed_header(last_block_number)?
.expect("should have genesis");
}
}

let provider = import_up_to.provider_factory.provider()?;

let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
let total_imported_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;

info!(target: "reth::cli",
total_imported_blocks,
total_imported_txns,
"Chain file imported"
);

// recheck latest block with provider
let last_block_number = provider.last_block_number()?;
info!(target: "reth::cli",
"Last block number after import: {}",
last_block_number
);

Ok(())
}

/// Execute the command
pub async fn reset(
&self,
mut provider: DatabaseProvider<
Tx<RO>,
NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>,
>,
) -> eyre::Result<()> {
let last_block_number = provider.last_block_number()?;

let last_block =
provider.block_by_number(last_block_number)?.expect("Block should be present");

Expand Down
Loading
Loading