diff --git a/dash-spv/benches/storage.rs b/dash-spv/benches/storage.rs index 52f98cd21..5677e6ddc 100644 --- a/dash-spv/benches/storage.rs +++ b/dash-spv/benches/storage.rs @@ -2,7 +2,7 @@ use std::time::Duration; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use dash_spv::{ - storage::{DiskStorageManager, StorageManager}, + storage::{BlockHeaderStorage, DiskStorageManager, StorageManager}, Hash, }; use dashcore::{block::Version, BlockHash, CompactTarget, Header}; diff --git a/dash-spv/examples/filter_sync.rs b/dash-spv/examples/filter_sync.rs index b1821d43d..6d0e7a395 100644 --- a/dash-spv/examples/filter_sync.rs +++ b/dash-spv/examples/filter_sync.rs @@ -28,8 +28,7 @@ async fn main() -> Result<(), Box> { let network_manager = PeerNetworkManager::new(&config).await?; // Create storage manager - let storage_manager = - DiskStorageManager::new("./.tmp/filter-sync-example-storage".into()).await?; + let storage_manager = DiskStorageManager::new("./.tmp/filter-sync-example-storage").await?; // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); diff --git a/dash-spv/examples/simple_sync.rs b/dash-spv/examples/simple_sync.rs index 920c8fca5..57e266fa2 100644 --- a/dash-spv/examples/simple_sync.rs +++ b/dash-spv/examples/simple_sync.rs @@ -24,8 +24,7 @@ async fn main() -> Result<(), Box> { let network_manager = PeerNetworkManager::new(&config).await?; // Create storage manager - let storage_manager = - DiskStorageManager::new("./.tmp/simple-sync-example-storage".into()).await?; + let storage_manager = DiskStorageManager::new("./.tmp/simple-sync-example-storage").await?; // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); diff --git a/dash-spv/examples/spv_with_wallet.rs b/dash-spv/examples/spv_with_wallet.rs index dc391f2e0..0a15e6d61 100644 --- a/dash-spv/examples/spv_with_wallet.rs +++ b/dash-spv/examples/spv_with_wallet.rs @@ -26,8 +26,7 @@ async fn main() -> Result<(), Box> { let network_manager = PeerNetworkManager::new(&config).await?; // Create storage manager - use disk storage for persistence - let storage_manager = - DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage".into()).await?; + let storage_manager = DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage").await?; // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); diff --git a/dash-spv/src/client/block_processor_test.rs b/dash-spv/src/client/block_processor_test.rs index a8330a2d2..5ec060acc 100644 --- a/dash-spv/src/client/block_processor_test.rs +++ b/dash-spv/src/client/block_processor_test.rs @@ -4,7 +4,7 @@ mod tests { use crate::client::block_processor::{BlockProcessingTask, BlockProcessor}; - use crate::storage::DiskStorageManager; + use crate::storage::{BlockHeaderStorage, DiskStorageManager}; use crate::types::{SpvEvent, SpvStats}; use dashcore::{blockdata::constants::genesis_block, Block, Network, Transaction}; diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 6b617bcda..4aefd9ec5 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -219,7 +219,7 @@ impl DashSpvClient::new(config.network))); //! //! // Create and start the client diff --git a/dash-spv/src/storage/blocks.rs b/dash-spv/src/storage/blocks.rs new file mode 100644 index 000000000..430cb17e4 --- /dev/null +++ b/dash-spv/src/storage/blocks.rs @@ -0,0 +1,180 @@ +//! Header storage operations for DiskStorageManager. + +use std::collections::HashMap; +use std::ops::Range; +use std::path::PathBuf; + +use async_trait::async_trait; +use dashcore::block::Header as BlockHeader; +use dashcore::BlockHash; +use tokio::sync::RwLock; + +use crate::error::StorageResult; +use crate::storage::io::atomic_write; +use crate::storage::segments::SegmentCache; +use crate::storage::PersistentStorage; +use crate::StorageError; + +#[async_trait] +pub trait BlockHeaderStorage { + async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()>; + + async fn store_headers_at_height( + &mut self, + headers: &[BlockHeader], + height: u32, + ) -> StorageResult<()>; + + async fn load_headers(&self, range: Range) -> StorageResult>; + + async fn get_header(&self, height: u32) -> StorageResult> { + if let Some(tip_height) = self.get_tip_height().await { + if height > tip_height { + return Ok(None); + } + } else { + return Ok(None); + } + + if let Some(start_height) = self.get_start_height().await { + if height < start_height { + return Ok(None); + } + } else { + return Ok(None); + } + + Ok(self.load_headers(height..height + 1).await?.first().copied()) + } + + async fn get_tip_height(&self) -> Option; + + async fn get_start_height(&self) -> Option; + + async fn get_stored_headers_len(&self) -> u32; + + async fn get_header_height_by_hash( + &self, + hash: &dashcore::BlockHash, + ) -> StorageResult>; +} + +pub struct PersistentBlockHeaderStorage { + block_headers: RwLock>, + header_hash_index: HashMap, +} + +impl PersistentBlockHeaderStorage { + const FOLDER_NAME: &str = "block_headers"; + const INDEX_FILE_NAME: &str = "index.dat"; +} + +#[async_trait] +impl PersistentStorage for PersistentBlockHeaderStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + let storage_path = storage_path.into(); + let segments_folder = storage_path.join(Self::FOLDER_NAME); + + let index_path = segments_folder.join(Self::INDEX_FILE_NAME); + + let mut block_headers = SegmentCache::load_or_new(&segments_folder).await?; + + let header_hash_index = match tokio::fs::read(&index_path) + .await + .ok() + .and_then(|content| bincode::deserialize(&content).ok()) + { + Some(index) => index, + _ => { + if segments_folder.exists() { + block_headers.build_block_index_from_segments().await? + } else { + HashMap::new() + } + } + }; + + Ok(Self { + block_headers: RwLock::new(block_headers), + header_hash_index, + }) + } + + async fn persist(&mut self, storage_path: impl Into + Send) -> StorageResult<()> { + let block_headers_folder = storage_path.into().join(Self::FOLDER_NAME); + let index_path = block_headers_folder.join(Self::INDEX_FILE_NAME); + + tokio::fs::create_dir_all(&block_headers_folder).await?; + + self.block_headers.write().await.persist(&block_headers_folder).await; + + let data = bincode::serialize(&self.header_hash_index) + .map_err(|e| StorageError::WriteFailed(format!("Failed to serialize index: {}", e)))?; + + atomic_write(&index_path, &data).await + } +} + +#[async_trait] +impl BlockHeaderStorage for PersistentBlockHeaderStorage { + async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { + let height = self.block_headers.read().await.next_height(); + self.store_headers_at_height(headers, height).await + } + + async fn store_headers_at_height( + &mut self, + headers: &[BlockHeader], + height: u32, + ) -> StorageResult<()> { + let mut height = height; + + let hashes = headers.iter().map(|header| header.block_hash()).collect::>(); + + self.block_headers.write().await.store_items_at_height(headers, height).await?; + + for hash in hashes { + self.header_hash_index.insert(hash, height); + height += 1; + } + + Ok(()) + } + + async fn load_headers(&self, range: Range) -> StorageResult> { + self.block_headers.write().await.get_items(range).await + } + + async fn get_tip_height(&self) -> Option { + self.block_headers.read().await.tip_height() + } + + async fn get_start_height(&self) -> Option { + self.block_headers.read().await.start_height() + } + + async fn get_stored_headers_len(&self) -> u32 { + let block_headers = self.block_headers.read().await; + + let start_height = if let Some(start_height) = block_headers.start_height() { + start_height + } else { + return 0; + }; + + let end_height = if let Some(end_height) = block_headers.tip_height() { + end_height + } else { + return 0; + }; + + end_height - start_height + 1 + } + + async fn get_header_height_by_hash( + &self, + hash: &dashcore::BlockHash, + ) -> StorageResult> { + Ok(self.header_hash_index.get(hash).copied()) + } +} diff --git a/dash-spv/src/storage/chainstate.rs b/dash-spv/src/storage/chainstate.rs new file mode 100644 index 000000000..c6c3b69af --- /dev/null +++ b/dash-spv/src/storage/chainstate.rs @@ -0,0 +1,101 @@ +use std::path::PathBuf; + +use async_trait::async_trait; + +use crate::{ + error::StorageResult, + storage::{io::atomic_write, PersistentStorage}, + ChainState, +}; + +#[async_trait] +pub trait ChainStateStorage { + async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()>; + + async fn load_chain_state(&self) -> StorageResult>; +} + +pub struct PersistentChainStateStorage { + storage_path: PathBuf, +} + +impl PersistentChainStateStorage { + const FOLDER_NAME: &str = "chainstate"; + const FILE_NAME: &str = "chainstate.json"; +} + +#[async_trait] +impl PersistentStorage for PersistentChainStateStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + Ok(PersistentChainStateStorage { + storage_path: storage_path.into(), + }) + } + + async fn persist(&mut self, _storage_path: impl Into + Send) -> StorageResult<()> { + // Current implementation persists data everytime data is stored + Ok(()) + } +} + +#[async_trait] +impl ChainStateStorage for PersistentChainStateStorage { + async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { + let state_data = serde_json::json!({ + "last_chainlock_height": state.last_chainlock_height, + "last_chainlock_hash": state.last_chainlock_hash, + "current_filter_tip": state.current_filter_tip, + "last_masternode_diff_height": state.last_masternode_diff_height, + "sync_base_height": state.sync_base_height, + }); + + let chainstate_folder = self.storage_path.join(Self::FOLDER_NAME); + let path = chainstate_folder.join(Self::FILE_NAME); + + tokio::fs::create_dir_all(chainstate_folder).await?; + + let json = state_data.to_string(); + atomic_write(&path, json.as_bytes()).await?; + + Ok(()) + } + + async fn load_chain_state(&self) -> StorageResult> { + let path = self.storage_path.join(Self::FOLDER_NAME).join(Self::FILE_NAME); + if !path.exists() { + return Ok(None); + } + + let content = tokio::fs::read_to_string(path).await?; + let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| { + crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) + })?; + + let state = ChainState { + last_chainlock_height: value + .get("last_chainlock_height") + .and_then(|v| v.as_u64()) + .map(|h| h as u32), + last_chainlock_hash: value + .get("last_chainlock_hash") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse().ok()), + current_filter_tip: value + .get("current_filter_tip") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse().ok()), + masternode_engine: None, + last_masternode_diff_height: value + .get("last_masternode_diff_height") + .and_then(|v| v.as_u64()) + .map(|h| h as u32), + sync_base_height: value + .get("sync_base_height") + .and_then(|v| v.as_u64()) + .map(|h| h as u32) + .unwrap_or(0), + }; + + Ok(Some(state)) + } +} diff --git a/dash-spv/src/storage/filters.rs b/dash-spv/src/storage/filters.rs new file mode 100644 index 000000000..0e4916805 --- /dev/null +++ b/dash-spv/src/storage/filters.rs @@ -0,0 +1,141 @@ +use std::{ops::Range, path::PathBuf}; + +use async_trait::async_trait; +use dashcore::hash_types::FilterHeader; +use tokio::sync::RwLock; + +use crate::{ + error::StorageResult, + storage::{segments::SegmentCache, PersistentStorage}, +}; + +#[async_trait] +pub trait FilterHeaderStorage { + async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()>; + + async fn load_filter_headers(&self, range: Range) -> StorageResult>; + + async fn get_filter_header(&self, height: u32) -> StorageResult> { + if let Some(tip_height) = self.get_filter_tip_height().await? { + if height > tip_height { + return Ok(None); + } + } else { + return Ok(None); + } + + if let Some(start_height) = self.get_filter_start_height().await { + if height < start_height { + return Ok(None); + } + } else { + return Ok(None); + } + + Ok(self.load_filter_headers(height..height + 1).await?.first().copied()) + } + + async fn get_filter_tip_height(&self) -> StorageResult>; + + async fn get_filter_start_height(&self) -> Option; +} + +#[async_trait] +pub trait FilterStorage { + async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()>; + + async fn load_filters(&self, range: Range) -> StorageResult>>; +} + +pub struct PersistentFilterHeaderStorage { + filter_headers: RwLock>, +} + +impl PersistentFilterHeaderStorage { + const FOLDER_NAME: &str = "filter_headers"; +} + +#[async_trait] +impl PersistentStorage for PersistentFilterHeaderStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + let storage_path = storage_path.into(); + let segments_folder = storage_path.join(Self::FOLDER_NAME); + + let filter_headers = SegmentCache::load_or_new(segments_folder).await?; + + Ok(Self { + filter_headers: RwLock::new(filter_headers), + }) + } + + async fn persist(&mut self, base_path: impl Into + Send) -> StorageResult<()> { + let filter_headers_folder = base_path.into().join(Self::FOLDER_NAME); + + tokio::fs::create_dir_all(&filter_headers_folder).await?; + + self.filter_headers.write().await.persist(&filter_headers_folder).await; + Ok(()) + } +} + +#[async_trait] +impl FilterHeaderStorage for PersistentFilterHeaderStorage { + async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> { + self.filter_headers.write().await.store_items(headers).await + } + + async fn load_filter_headers(&self, range: Range) -> StorageResult> { + self.filter_headers.write().await.get_items(range).await + } + + async fn get_filter_tip_height(&self) -> StorageResult> { + Ok(self.filter_headers.read().await.tip_height()) + } + + async fn get_filter_start_height(&self) -> Option { + self.filter_headers.read().await.start_height() + } +} + +pub struct PersistentFilterStorage { + filters: RwLock>>, +} + +impl PersistentFilterStorage { + const FOLDER_NAME: &str = "filters"; +} + +#[async_trait] +impl PersistentStorage for PersistentFilterStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + let storage_path = storage_path.into(); + let filters_folder = storage_path.join(Self::FOLDER_NAME); + + let filters = SegmentCache::load_or_new(filters_folder).await?; + + Ok(Self { + filters: RwLock::new(filters), + }) + } + + async fn persist(&mut self, storage_path: impl Into + Send) -> StorageResult<()> { + let storage_path = storage_path.into(); + let filters_folder = storage_path.join(Self::FOLDER_NAME); + + tokio::fs::create_dir_all(&filters_folder).await?; + + self.filters.write().await.persist(&filters_folder).await; + Ok(()) + } +} + +#[async_trait] +impl FilterStorage for PersistentFilterStorage { + async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> { + self.filters.write().await.store_items_at_height(&[filter.to_vec()], height).await + } + + async fn load_filters(&self, range: Range) -> StorageResult>> { + self.filters.write().await.get_items(range).await + } +} diff --git a/dash-spv/src/storage/headers.rs b/dash-spv/src/storage/headers.rs deleted file mode 100644 index 45ee02653..000000000 --- a/dash-spv/src/storage/headers.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! Header storage operations for DiskStorageManager. - -use std::collections::HashMap; -use std::path::Path; - -use dashcore::block::Header as BlockHeader; -use dashcore::BlockHash; - -use crate::error::StorageResult; -use crate::storage::io::atomic_write; -use crate::StorageError; - -use super::manager::DiskStorageManager; - -impl DiskStorageManager { - pub async fn store_headers_at_height( - &mut self, - headers: &[BlockHeader], - mut height: u32, - ) -> StorageResult<()> { - let hashes = headers.iter().map(|header| header.block_hash()).collect::>(); - - self.block_headers.write().await.store_items_at_height(headers, height).await?; - - // Update reverse index - let mut reverse_index = self.header_hash_index.write().await; - - for hash in hashes { - reverse_index.insert(hash, height); - height += 1; - } - - Ok(()) - } - - pub async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { - let height = self.block_headers.read().await.next_height(); - self.store_headers_at_height(headers, height).await - } - - /// Get header height by hash. - pub async fn get_header_height_by_hash(&self, hash: &BlockHash) -> StorageResult> { - Ok(self.header_hash_index.read().await.get(hash).copied()) - } -} - -/// Load index from file, if it fails it tries to build it from block -/// header segments and, if that also fails, it return an empty index. -/// -/// IO and deserialize errors are returned, the empty index is only built -/// if there is no persisted data to recreate it. -pub(super) async fn load_block_index( - manager: &DiskStorageManager, -) -> StorageResult> { - let index_path = manager.base_path.join("headers/index.dat"); - - if let Ok(content) = tokio::fs::read(&index_path).await { - bincode::deserialize(&content) - .map_err(|e| StorageError::ReadFailed(format!("Failed to deserialize index: {}", e))) - } else { - manager.block_headers.write().await.build_block_index_from_segments().await - } -} - -/// Save index to disk. -pub(super) async fn save_index_to_disk( - path: &Path, - index: &HashMap, -) -> StorageResult<()> { - let data = bincode::serialize(index) - .map_err(|e| StorageError::WriteFailed(format!("Failed to serialize index: {}", e)))?; - - atomic_write(path, &data).await -} diff --git a/dash-spv/src/storage/manager.rs b/dash-spv/src/storage/manager.rs deleted file mode 100644 index 96cb266a3..000000000 --- a/dash-spv/src/storage/manager.rs +++ /dev/null @@ -1,142 +0,0 @@ -//! Core DiskStorageManager struct and background worker implementation. - -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::RwLock; - -use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid}; - -use crate::error::{StorageError, StorageResult}; -use crate::storage::headers::load_block_index; -use crate::storage::segments::SegmentCache; -use crate::types::{MempoolState, UnconfirmedTransaction}; - -use super::lockfile::LockFile; - -/// Disk-based storage manager with segmented files and async background saving. -pub struct DiskStorageManager { - pub(super) base_path: PathBuf, - - // Segmented header storage - pub(super) block_headers: Arc>>, - pub(super) filter_headers: Arc>>, - pub(super) filters: Arc>>>, - - // Reverse index for O(1) lookups - pub(super) header_hash_index: Arc>>, - - // Background worker - pub(super) worker_handle: Option>, - - // Mempool storage - pub(super) mempool_transactions: Arc>>, - pub(super) mempool_state: Arc>>, - - // Lock file to prevent concurrent access from multiple processes. - _lock_file: LockFile, -} - -impl DiskStorageManager { - pub async fn new(base_path: PathBuf) -> StorageResult { - use std::fs; - - // Create directories if they don't exist - fs::create_dir_all(&base_path) - .map_err(|e| StorageError::WriteFailed(format!("Failed to create directory: {}", e)))?; - - // Acquire exclusive lock on the data directory - let lock_file = LockFile::new(base_path.join(".lock"))?; - - let headers_dir = base_path.join("headers"); - let filters_dir = base_path.join("filters"); - let state_dir = base_path.join("state"); - - fs::create_dir_all(&headers_dir).map_err(|e| { - StorageError::WriteFailed(format!("Failed to create headers directory: {}", e)) - })?; - fs::create_dir_all(&filters_dir).map_err(|e| { - StorageError::WriteFailed(format!("Failed to create filters directory: {}", e)) - })?; - fs::create_dir_all(&state_dir).map_err(|e| { - StorageError::WriteFailed(format!("Failed to create state directory: {}", e)) - })?; - - let mut storage = Self { - base_path: base_path.clone(), - block_headers: Arc::new(RwLock::new( - SegmentCache::load_or_new(base_path.clone()).await?, - )), - filter_headers: Arc::new(RwLock::new( - SegmentCache::load_or_new(base_path.clone()).await?, - )), - filters: Arc::new(RwLock::new(SegmentCache::load_or_new(base_path.clone()).await?)), - header_hash_index: Arc::new(RwLock::new(HashMap::new())), - worker_handle: None, - mempool_transactions: Arc::new(RwLock::new(HashMap::new())), - mempool_state: Arc::new(RwLock::new(None)), - _lock_file: lock_file, - }; - - // Load chain state to get sync_base_height - if let Ok(Some(state)) = storage.load_chain_state().await { - tracing::debug!("Loaded sync_base_height: {}", state.sync_base_height); - } - - // Start background worker that - // persists data when appropriate - storage.start_worker().await; - - // Rebuild index - let block_index = match load_block_index(&storage).await { - Ok(index) => index, - Err(e) => { - tracing::error!( - "An unexpected IO or deserialization error didn't allow the block index to be built: {}", - e - ); - HashMap::new() - } - }; - storage.header_hash_index = Arc::new(RwLock::new(block_index)); - - Ok(storage) - } - - #[cfg(test)] - pub async fn with_temp_dir() -> StorageResult { - use tempfile::TempDir; - - let temp_dir = TempDir::new()?; - Self::new(temp_dir.path().into()).await - } - - /// Start the background worker - pub(super) async fn start_worker(&mut self) { - let block_headers = Arc::clone(&self.block_headers); - let filter_headers = Arc::clone(&self.filter_headers); - let filters = Arc::clone(&self.filters); - - let worker_handle = tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_secs(5)); - - loop { - ticker.tick().await; - - block_headers.write().await.persist_evicted().await; - filter_headers.write().await.persist_evicted().await; - filters.write().await.persist_evicted().await; - } - }); - - self.worker_handle = Some(worker_handle); - } - - /// Stop the background worker without forcing a save. - pub(super) fn stop_worker(&mut self) { - if let Some(handle) = self.worker_handle.take() { - handle.abort(); - } - } -} diff --git a/dash-spv/src/storage/masternode.rs b/dash-spv/src/storage/masternode.rs new file mode 100644 index 000000000..d7ec1dd9f --- /dev/null +++ b/dash-spv/src/storage/masternode.rs @@ -0,0 +1,76 @@ +use std::path::PathBuf; + +use async_trait::async_trait; + +use crate::{ + error::StorageResult, + storage::{io::atomic_write, MasternodeState, PersistentStorage}, +}; + +#[async_trait] +pub trait MasternodeStateStorage { + async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()>; + + async fn load_masternode_state(&self) -> StorageResult>; +} + +pub struct PersistentMasternodeStateStorage { + storage_path: PathBuf, +} + +impl PersistentMasternodeStateStorage { + const FOLDER_NAME: &str = "masternodestate"; + const MASTERNODE_FILE_NAME: &str = "masternodestate.json"; +} + +#[async_trait] +impl PersistentStorage for PersistentMasternodeStateStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + Ok(PersistentMasternodeStateStorage { + storage_path: storage_path.into(), + }) + } + + async fn persist(&mut self, _storage_path: impl Into + Send) -> StorageResult<()> { + // Current implementation persists data everytime data is stored + Ok(()) + } +} + +#[async_trait] +impl MasternodeStateStorage for PersistentMasternodeStateStorage { + async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { + let masternodestate_folder = self.storage_path.join(Self::FOLDER_NAME); + let path = masternodestate_folder.join(Self::MASTERNODE_FILE_NAME); + + tokio::fs::create_dir_all(masternodestate_folder).await?; + + let json = serde_json::to_string_pretty(state).map_err(|e| { + crate::error::StorageError::Serialization(format!( + "Failed to serialize masternode state: {}", + e + )) + })?; + + atomic_write(&path, json.as_bytes()).await?; + Ok(()) + } + + async fn load_masternode_state(&self) -> StorageResult> { + let path = self.storage_path.join(Self::FOLDER_NAME).join(Self::MASTERNODE_FILE_NAME); + + if !path.exists() { + return Ok(None); + } + + let content = tokio::fs::read_to_string(path).await?; + let state = serde_json::from_str(&content).map_err(|e| { + crate::error::StorageError::Serialization(format!( + "Failed to deserialize masternode state: {}", + e + )) + })?; + + Ok(Some(state)) + } +} diff --git a/dash-spv/src/storage/metadata.rs b/dash-spv/src/storage/metadata.rs new file mode 100644 index 000000000..7707e41ab --- /dev/null +++ b/dash-spv/src/storage/metadata.rs @@ -0,0 +1,62 @@ +use std::path::PathBuf; + +use async_trait::async_trait; + +use crate::{ + error::StorageResult, + storage::{io::atomic_write, PersistentStorage}, +}; + +#[async_trait] +pub trait MetadataStorage { + async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()>; + + async fn load_metadata(&self, key: &str) -> StorageResult>>; +} + +pub struct PersistentMetadataStorage { + storage_path: PathBuf, +} + +impl PersistentMetadataStorage { + const FOLDER_NAME: &str = "metadata"; +} + +#[async_trait] +impl PersistentStorage for PersistentMetadataStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + Ok(PersistentMetadataStorage { + storage_path: storage_path.into(), + }) + } + + async fn persist(&mut self, _storage_path: impl Into + Send) -> StorageResult<()> { + // Current implementation persists data everytime data is stored + Ok(()) + } +} + +#[async_trait] +impl MetadataStorage for PersistentMetadataStorage { + async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { + let metadata_folder = self.storage_path.join(Self::FOLDER_NAME); + let path = metadata_folder.join(format!("{key}.dat")); + + tokio::fs::create_dir_all(metadata_folder).await?; + + atomic_write(&path, value).await?; + + Ok(()) + } + + async fn load_metadata(&self, key: &str) -> StorageResult>> { + let path = self.storage_path.join(Self::FOLDER_NAME).join(format!("{key}.dat")); + + if !path.exists() { + return Ok(None); + } + + let data = tokio::fs::read(path).await?; + Ok(Some(data)) + } +} diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index baae5862f..6c212acbf 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -4,187 +4,563 @@ pub(crate) mod io; pub mod types; -mod headers; +mod blocks; +mod chainstate; +mod filters; mod lockfile; -mod manager; +mod masternode; +mod metadata; mod segments; -mod state; +mod transactions; use async_trait::async_trait; +use dashcore::hash_types::FilterHeader; +use dashcore::{Header as BlockHeader, Txid}; use std::collections::HashMap; use std::ops::Range; - -use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, Txid}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; use crate::error::StorageResult; -use crate::types::{ChainState, MempoolState, UnconfirmedTransaction}; +use crate::storage::blocks::PersistentBlockHeaderStorage; +use crate::storage::chainstate::PersistentChainStateStorage; +use crate::storage::filters::{PersistentFilterHeaderStorage, PersistentFilterStorage}; +use crate::storage::lockfile::LockFile; +use crate::storage::masternode::PersistentMasternodeStateStorage; +use crate::storage::metadata::PersistentMetadataStorage; +use crate::storage::transactions::PersistentTransactionStorage; +use crate::types::{MempoolState, UnconfirmedTransaction}; +use crate::ChainState; + +pub use crate::storage::blocks::BlockHeaderStorage; +pub use crate::storage::chainstate::ChainStateStorage; +pub use crate::storage::filters::FilterHeaderStorage; +pub use crate::storage::filters::FilterStorage; +pub use crate::storage::masternode::MasternodeStateStorage; +pub use crate::storage::metadata::MetadataStorage; +pub use crate::storage::transactions::TransactionStorage; -pub use manager::DiskStorageManager; pub use types::*; -/// Storage manager trait for abstracting data persistence. -/// -/// # Thread Safety -/// -/// This trait requires `Send + Sync` bounds to ensure thread safety, but uses `&mut self` -/// for mutation methods. This design choice provides several benefits: -/// -/// 1. **Simplified Implementation**: Storage backends don't need to implement interior -/// mutability patterns (like `Arc>` or `RwLock`) internally. -/// -/// 2. **Performance**: Avoids unnecessary locking overhead when the storage manager -/// is already protected by external synchronization. -/// -/// 3. **Flexibility**: Callers can choose the appropriate synchronization strategy -/// based on their specific use case (e.g., single-threaded, mutex-protected, etc.). -/// -/// ## Usage Pattern -/// -/// The typical usage pattern wraps the storage manager in an `Arc>` or similar: -/// -/// ```rust,no_run -/// # use std::sync::Arc; -/// # use tokio::sync::Mutex; -/// # use dash_spv::storage::DiskStorageManager; -/// # use dashcore::blockdata::block::Header as BlockHeader; -/// # -/// # async fn example() -> Result<(), Box> { -/// let storage: Arc> = Arc::new(Mutex::new(DiskStorageManager::new("./.tmp/example-storage".into()).await?)); -/// let headers: Vec = vec![]; // Your headers here -/// -/// // In async context: -/// let mut guard = storage.lock().await; -/// guard.store_headers(&headers).await?; -/// # Ok(()) -/// # } -/// ``` -/// -/// ## Implementation Requirements -/// -/// Implementations must ensure that: -/// - All operations are atomic at the logical level (e.g., all headers in a batch succeed or fail together) -/// - Read operations are consistent (no partial reads of in-progress writes) -/// - The implementation is safe to move between threads (`Send`) -/// - The implementation can be referenced from multiple threads (`Sync`) -/// -/// Note that the `&mut self` requirement means only one thread can be mutating the storage -/// at a time when using external synchronization, which naturally provides consistency. #[async_trait] -pub trait StorageManager: Send + Sync + 'static { - /// Store block headers. - async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()>; - - /// Load block headers in the given range. - async fn load_headers(&self, range: Range) -> StorageResult>; - - /// Get a specific header by blockchain height. - async fn get_header(&self, height: u32) -> StorageResult>; - - /// Get the current tip blockchain height. - async fn get_tip_height(&self) -> Option; - - async fn get_start_height(&self) -> Option; +pub trait PersistentStorage: Sized { + /// If the storage_path contains persisted data the storage will use it, if not, + /// a empty storage will be created. + async fn open(storage_path: impl Into + Send) -> StorageResult; - async fn get_stored_headers_len(&self) -> u32; + async fn persist(&mut self, storage_path: impl Into + Send) -> StorageResult<()>; +} - /// Store filter headers. - async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()>; +#[async_trait] +pub trait StorageManager: + BlockHeaderStorage + + FilterHeaderStorage + + FilterStorage + + TransactionStorage + + MetadataStorage + + ChainStateStorage + + MasternodeStateStorage + + Send + + Sync + + 'static +{ + /// Deletes in-disk and in-memory data + async fn clear(&mut self) -> StorageResult<()>; - /// Load filter headers in the given blockchain height range. - async fn load_filter_headers(&self, range: Range) -> StorageResult>; + /// Stops all background tasks and persists the data. + async fn shutdown(&mut self); +} - /// Get a specific filter header by blockchain height. - async fn get_filter_header(&self, height: u32) -> StorageResult>; +/// Disk-based storage manager with segmented files and async background saving. +/// Only one instance of DiskStorageManager working on the same storage path +/// can exist at a time. +pub struct DiskStorageManager { + storage_path: PathBuf, - /// Get the current filter tip blockchain height. - async fn get_filter_tip_height(&self) -> StorageResult>; + block_headers: Arc>, + filter_headers: Arc>, + filters: Arc>, + transactions: Arc>, + metadata: Arc>, + chainstate: Arc>, + masternodestate: Arc>, - /// Store masternode state. - async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()>; + // Background worker + worker_handle: Option>, - /// Load masternode state. - async fn load_masternode_state(&self) -> StorageResult>; + _lock_file: LockFile, +} - /// Store chain state. - async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()>; +impl DiskStorageManager { + pub async fn new(storage_path: impl Into + Send) -> StorageResult { + use std::fs; + + let storage_path = storage_path.into(); + let lock_file = { + let mut lock_file = storage_path.clone(); + lock_file.set_extension("lock"); + lock_file + }; + + fs::create_dir_all(&storage_path)?; + + let lock_file = LockFile::new(lock_file)?; + + let mut storage = Self { + storage_path: storage_path.clone(), + + block_headers: Arc::new(RwLock::new( + PersistentBlockHeaderStorage::open(&storage_path).await?, + )), + filter_headers: Arc::new(RwLock::new( + PersistentFilterHeaderStorage::open(&storage_path).await?, + )), + filters: Arc::new(RwLock::new(PersistentFilterStorage::open(&storage_path).await?)), + transactions: Arc::new(RwLock::new( + PersistentTransactionStorage::open(&storage_path).await?, + )), + metadata: Arc::new(RwLock::new(PersistentMetadataStorage::open(&storage_path).await?)), + chainstate: Arc::new(RwLock::new( + PersistentChainStateStorage::open(&storage_path).await?, + )), + masternodestate: Arc::new(RwLock::new( + PersistentMasternodeStateStorage::open(&storage_path).await?, + )), + + worker_handle: None, + + _lock_file: lock_file, + }; + + storage.start_worker().await; + + Ok(storage) + } + + #[cfg(test)] + pub async fn with_temp_dir() -> StorageResult { + use tempfile::TempDir; + + let temp_dir = TempDir::new()?; + Self::new(temp_dir.path()).await + } + + /// Start the background worker saving data every 5 seconds + async fn start_worker(&mut self) { + let block_headers = Arc::clone(&self.block_headers); + let filter_headers = Arc::clone(&self.filter_headers); + let filters = Arc::clone(&self.filters); + let transactions = Arc::clone(&self.transactions); + let metadata = Arc::clone(&self.metadata); + let chainstate = Arc::clone(&self.chainstate); + let masternodestate = Arc::clone(&self.masternodestate); + + let storage_path = self.storage_path.clone(); + + let worker_handle = tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_secs(5)); + + loop { + ticker.tick().await; + + let _ = block_headers.write().await.persist(&storage_path).await; + let _ = filter_headers.write().await.persist(&storage_path).await; + let _ = filters.write().await.persist(&storage_path).await; + let _ = transactions.write().await.persist(&storage_path).await; + let _ = metadata.write().await.persist(&storage_path).await; + let _ = chainstate.write().await.persist(&storage_path).await; + let _ = masternodestate.write().await.persist(&storage_path).await; + } + }); + + self.worker_handle = Some(worker_handle); + } + + /// Stop the background worker without forcing a save. + fn stop_worker(&self) { + if let Some(handle) = &self.worker_handle { + handle.abort(); + } + } + + async fn persist(&self) { + let storage_path = &self.storage_path; + + let _ = self.block_headers.write().await.persist(storage_path).await; + let _ = self.filter_headers.write().await.persist(storage_path).await; + let _ = self.filters.write().await.persist(storage_path).await; + let _ = self.transactions.write().await.persist(storage_path).await; + let _ = self.metadata.write().await.persist(storage_path).await; + let _ = self.chainstate.write().await.persist(storage_path).await; + let _ = self.masternodestate.write().await.persist(storage_path).await; + } +} - /// Load chain state. - async fn load_chain_state(&self) -> StorageResult>; +#[async_trait] +impl StorageManager for DiskStorageManager { + async fn clear(&mut self) -> StorageResult<()> { + // First, stop the background worker to avoid races with file deletion + self.stop_worker(); + + // Remove all files and directories under storage_path + if self.storage_path.exists() { + // Best-effort removal; if concurrent files appear, retry once + match tokio::fs::remove_dir_all(&self.storage_path).await { + Ok(_) => {} + Err(e) + if e.kind() == std::io::ErrorKind::Other + || e.kind() == std::io::ErrorKind::DirectoryNotEmpty => + { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::fs::remove_dir_all(&self.storage_path).await?; + } + Err(e) => return Err(crate::error::StorageError::Io(e)), + } + tokio::fs::create_dir_all(&self.storage_path).await?; + } + + // Instantiate storages again once persisted data has been cleared + let storage_path = &self.storage_path; + + self.block_headers = + Arc::new(RwLock::new(PersistentBlockHeaderStorage::open(storage_path).await?)); + self.filter_headers = + Arc::new(RwLock::new(PersistentFilterHeaderStorage::open(storage_path).await?)); + self.filters = Arc::new(RwLock::new(PersistentFilterStorage::open(storage_path).await?)); + self.transactions = + Arc::new(RwLock::new(PersistentTransactionStorage::open(storage_path).await?)); + self.metadata = Arc::new(RwLock::new(PersistentMetadataStorage::open(storage_path).await?)); + self.chainstate = + Arc::new(RwLock::new(PersistentChainStateStorage::open(storage_path).await?)); + self.masternodestate = + Arc::new(RwLock::new(PersistentMasternodeStateStorage::open(storage_path).await?)); + + // Restart the background worker for future operations + self.start_worker().await; + + Ok(()) + } + + /// Shutdown the storage manager. + async fn shutdown(&mut self) { + self.stop_worker(); + + self.persist().await; + } +} - /// Store a compact filter at a blockchain height. - async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()>; +#[async_trait] +impl blocks::BlockHeaderStorage for DiskStorageManager { + async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { + self.block_headers.write().await.store_headers(headers).await + } - /// Load compact filters in the given blockchain height range. - async fn load_filters(&self, range: Range) -> StorageResult>>; + async fn store_headers_at_height( + &mut self, + headers: &[BlockHeader], + height: u32, + ) -> StorageResult<()> { + self.block_headers.write().await.store_headers_at_height(headers, height).await + } - /// Store metadata. - async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()>; + async fn load_headers(&self, range: Range) -> StorageResult> { + self.block_headers.read().await.load_headers(range).await + } - /// Load metadata. - async fn load_metadata(&self, key: &str) -> StorageResult>>; + async fn get_tip_height(&self) -> Option { + self.block_headers.read().await.get_tip_height().await + } - /// Clear all data. - async fn clear(&mut self) -> StorageResult<()>; + async fn get_start_height(&self) -> Option { + self.block_headers.read().await.get_start_height().await + } - /// Clear all filter headers and compact filters. - async fn clear_filters(&mut self) -> StorageResult<()>; + async fn get_stored_headers_len(&self) -> u32 { + self.block_headers.read().await.get_stored_headers_len().await + } - /// Get header height by block hash (reverse lookup). async fn get_header_height_by_hash( &self, hash: &dashcore::BlockHash, - ) -> StorageResult>; - - // UTXO methods removed - handled by external wallet - - /// Store a chain lock. - async fn store_chain_lock( - &mut self, - height: u32, - chain_lock: &dashcore::ChainLock, - ) -> StorageResult<()>; + ) -> StorageResult> { + self.block_headers.read().await.get_header_height_by_hash(hash).await + } +} - /// Load a chain lock by height. - async fn load_chain_lock(&self, height: u32) -> StorageResult>; +#[async_trait] +impl filters::FilterHeaderStorage for DiskStorageManager { + async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> { + self.filter_headers.write().await.store_filter_headers(headers).await + } + + async fn load_filter_headers(&self, range: Range) -> StorageResult> { + self.filter_headers.read().await.load_filter_headers(range).await + } + + async fn get_filter_tip_height(&self) -> StorageResult> { + self.filter_headers.read().await.get_filter_tip_height().await + } + + async fn get_filter_start_height(&self) -> Option { + self.filter_headers.read().await.get_filter_start_height().await + } +} - /// Get chain locks in a height range. - async fn get_chain_locks( - &self, - start_height: u32, - end_height: u32, - ) -> StorageResult>; +#[async_trait] +impl filters::FilterStorage for DiskStorageManager { + async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> { + self.filters.write().await.store_filter(height, filter).await + } + + async fn load_filters(&self, range: Range) -> StorageResult>> { + self.filters.read().await.load_filters(range).await + } +} - // Mempool storage methods - /// Store an unconfirmed transaction. +#[async_trait] +impl transactions::TransactionStorage for DiskStorageManager { async fn store_mempool_transaction( &mut self, txid: &Txid, tx: &UnconfirmedTransaction, - ) -> StorageResult<()>; + ) -> StorageResult<()> { + self.transactions.write().await.store_mempool_transaction(txid, tx).await + } - /// Remove a mempool transaction. - async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()>; + async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { + self.transactions.write().await.remove_mempool_transaction(txid).await + } - /// Get a mempool transaction. async fn get_mempool_transaction( &self, txid: &Txid, - ) -> StorageResult>; + ) -> StorageResult> { + self.transactions.read().await.get_mempool_transaction(txid).await + } - /// Get all mempool transactions. async fn get_all_mempool_transactions( &self, - ) -> StorageResult>; + ) -> StorageResult> { + self.transactions.read().await.get_all_mempool_transactions().await + } - /// Store the complete mempool state. - async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()>; + async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { + self.transactions.write().await.store_mempool_state(state).await + } - /// Load the mempool state. - async fn load_mempool_state(&self) -> StorageResult>; + async fn load_mempool_state(&self) -> StorageResult> { + self.transactions.read().await.load_mempool_state().await + } +} - /// Clear all mempool data. - async fn clear_mempool(&mut self) -> StorageResult<()>; +#[async_trait] +impl metadata::MetadataStorage for DiskStorageManager { + async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { + self.metadata.write().await.store_metadata(key, value).await + } + + async fn load_metadata(&self, key: &str) -> StorageResult>> { + self.metadata.read().await.load_metadata(key).await + } +} + +#[async_trait] +impl chainstate::ChainStateStorage for DiskStorageManager { + async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { + self.chainstate.write().await.store_chain_state(state).await + } + + async fn load_chain_state(&self) -> StorageResult> { + self.chainstate.read().await.load_chain_state().await + } +} + +#[async_trait] +impl masternode::MasternodeStateStorage for DiskStorageManager { + async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { + self.masternodestate.write().await.store_masternode_state(state).await + } + + async fn load_masternode_state(&self) -> StorageResult> { + self.masternodestate.read().await.load_masternode_state().await + } +} - /// Shutdown the storage manager - async fn shutdown(&mut self) -> StorageResult<()>; +#[cfg(test)] +mod tests { + use crate::ChainState; + + use super::*; + use dashcore::{block::Version, pow::CompactTarget, BlockHash, Header as BlockHeader}; + use dashcore_hashes::Hash; + use tempfile::TempDir; + + fn build_headers(count: usize) -> Vec { + let mut headers = Vec::with_capacity(count); + let mut prev_hash = BlockHash::from_byte_array([0u8; 32]); + + for i in 0..count { + let header = BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: prev_hash, + merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array( + [(i % 255) as u8; 32], + ) + .into(), + time: 1 + i as u32, + bits: CompactTarget::from_consensus(0x1d00ffff), + nonce: i as u32, + }; + prev_hash = header.block_hash(); + headers.push(header); + } + + headers + } + + #[tokio::test] + async fn test_load_headers() -> Result<(), Box> { + // Create a temporary directory for the test + let temp_dir = TempDir::new()?; + let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()) + .await + .expect("Unable to create storage"); + + // Create a test header + let test_header = BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: BlockHash::from_byte_array([1; 32]), + merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array([2; 32]).into(), + time: 12345, + bits: CompactTarget::from_consensus(0x1d00ffff), + nonce: 67890, + }; + + // Store just one header + storage.store_headers(&[test_header]).await?; + + let loaded_headers = storage.load_headers(0..1).await?; + + // Should only get back the one header we stored + assert_eq!(loaded_headers.len(), 1); + assert_eq!(loaded_headers[0], test_header); + + Ok(()) + } + + #[tokio::test] + async fn test_checkpoint_storage_indexing() -> StorageResult<()> { + use dashcore::TxMerkleNode; + use tempfile::tempdir; + + let temp_dir = tempdir().expect("Failed to create temp dir"); + let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; + + // Create test headers starting from checkpoint height + let checkpoint_height = 1_100_000; + let headers: Vec = (0..100) + .map(|i| BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: BlockHash::from_byte_array([i as u8; 32]), + merkle_root: TxMerkleNode::from_byte_array([(i + 1) as u8; 32]), + time: 1234567890 + i, + bits: CompactTarget::from_consensus(0x1a2b3c4d), + nonce: 67890 + i, + }) + .collect(); + + let mut base_state = ChainState::new(); + base_state.sync_base_height = checkpoint_height; + storage.store_chain_state(&base_state).await?; + + storage.store_headers_at_height(&headers, checkpoint_height).await?; + assert_eq!(storage.get_stored_headers_len().await, headers.len() as u32); + + // Verify headers are stored at correct blockchain heights + let header_at_base = storage.get_header(checkpoint_height).await?; + assert_eq!( + header_at_base.expect("Header at base blockchain height should exist"), + headers[0] + ); + + let header_at_ending = storage.get_header(checkpoint_height + 99).await?; + assert_eq!( + header_at_ending.expect("Header at ending blockchain height should exist"), + headers[99] + ); + + // Test the reverse index (hash -> blockchain height) + let hash_0 = headers[0].block_hash(); + let height_0 = storage.get_header_height_by_hash(&hash_0).await?; + assert_eq!( + height_0, + Some(checkpoint_height), + "Hash should map to blockchain height 1,100,000" + ); + + let hash_99 = headers[99].block_hash(); + let height_99 = storage.get_header_height_by_hash(&hash_99).await?; + assert_eq!( + height_99, + Some(checkpoint_height + 99), + "Hash should map to blockchain height 1,100,099" + ); + + // Store chain state to persist sync_base_height + let mut chain_state = ChainState::new(); + chain_state.sync_base_height = checkpoint_height; + storage.store_chain_state(&chain_state).await?; + + // Force save to disk + storage.persist().await; + + drop(storage); + + // Create a new storage instance to test index rebuilding + let storage2 = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; + + // Verify the index was rebuilt correctly + let height_after_rebuild = storage2.get_header_height_by_hash(&hash_0).await?; + assert_eq!( + height_after_rebuild, + Some(checkpoint_height), + "After index rebuild, hash should still map to blockchain height 1,100,000" + ); + + // Verify header can still be retrieved by blockchain height after reload + let header_after_reload = storage2.get_header(checkpoint_height).await?; + assert!( + header_after_reload.is_some(), + "Header at base blockchain height should exist after reload" + ); + assert_eq!(header_after_reload.unwrap(), headers[0]); + + Ok(()) + } + + #[tokio::test] + async fn test_shutdown_flushes_index() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let base_path = temp_dir.path().to_path_buf(); + let headers = build_headers(11_000); + let last_hash = headers.last().unwrap().block_hash(); + + { + let mut storage = DiskStorageManager::new(base_path.clone()).await?; + + storage.store_headers(&headers[..10_000]).await?; + storage.persist().await; + + storage.store_headers(&headers[10_000..]).await?; + storage.shutdown().await; + } + + let storage = DiskStorageManager::new(base_path).await?; + let height = storage.get_header_height_by_hash(&last_hash).await?; + assert_eq!(height, Some(10_999)); + + Ok(()) + } } diff --git a/dash-spv/src/storage/segments.rs b/dash-spv/src/storage/segments.rs index c33c669d1..9401ab722 100644 --- a/dash-spv/src/storage/segments.rs +++ b/dash-spv/src/storage/segments.rs @@ -20,35 +20,23 @@ use dashcore_hashes::Hash; use crate::{error::StorageResult, storage::io::atomic_write, StorageError}; pub trait Persistable: Sized + Encodable + Decodable + PartialEq + Clone { - const FOLDER_NAME: &'static str; const SEGMENT_PREFIX: &'static str = "segment"; const DATA_FILE_EXTENSION: &'static str = "dat"; - fn relative_disk_path(segment_id: u32) -> PathBuf { - format!( - "{}/{}_{:04}.{}", - Self::FOLDER_NAME, - Self::SEGMENT_PREFIX, - segment_id, - Self::DATA_FILE_EXTENSION - ) - .into() + fn segment_file_name(segment_id: u32) -> String { + format!("{}_{:04}.{}", Self::SEGMENT_PREFIX, segment_id, Self::DATA_FILE_EXTENSION) } fn sentinel() -> Self; } impl Persistable for Vec { - const FOLDER_NAME: &'static str = "filters"; - fn sentinel() -> Self { vec![] } } impl Persistable for BlockHeader { - const FOLDER_NAME: &'static str = "block_headers"; - fn sentinel() -> Self { Self { version: Version::from_consensus(i32::MAX), // Invalid version @@ -62,8 +50,6 @@ impl Persistable for BlockHeader { } impl Persistable for FilterHeader { - const FOLDER_NAME: &'static str = "filter_headers"; - fn sentinel() -> Self { FilterHeader::from_byte_array([0u8; 32]) } @@ -76,19 +62,17 @@ pub struct SegmentCache { evicted: HashMap>, tip_height: Option, start_height: Option, - base_path: PathBuf, + segments_dir: PathBuf, } impl SegmentCache { pub async fn build_block_index_from_segments( &mut self, ) -> StorageResult> { - let segments_dir = self.base_path.join(BlockHeader::FOLDER_NAME); + let entries = fs::read_dir(&self.segments_dir)?; let mut block_index = HashMap::new(); - let entries = fs::read_dir(&segments_dir)?; - for entry in entries.flatten() { let name = match entry.file_name().into_string() { Ok(s) => s, @@ -126,20 +110,19 @@ impl SegmentCache { impl SegmentCache { const MAX_ACTIVE_SEGMENTS: usize = 10; - pub async fn load_or_new(base_path: impl Into) -> StorageResult { - let base_path = base_path.into(); - let items_dir = base_path.join(I::FOLDER_NAME); + pub async fn load_or_new(segments_dir: impl Into) -> StorageResult { + let segments_dir = segments_dir.into(); let mut cache = Self { segments: HashMap::with_capacity(Self::MAX_ACTIVE_SEGMENTS), evicted: HashMap::new(), tip_height: None, start_height: None, - base_path, + segments_dir: segments_dir.clone(), }; // Building the metadata - if let Ok(entries) = fs::read_dir(&items_dir) { + if let Ok(entries) = fs::read_dir(&segments_dir) { let mut max_seg_id = None; let mut min_seg_id = None; @@ -179,7 +162,6 @@ impl SegmentCache { Ok(cache) } - /// Get the segment ID for a given storage index. #[inline] fn height_to_segment_id(height: u32) -> u32 { height / Segment::::ITEMS_PER_SEGMENT @@ -196,24 +178,6 @@ impl SegmentCache { height % Segment::::ITEMS_PER_SEGMENT } - pub fn clear_in_memory(&mut self) { - self.segments.clear(); - self.evicted.clear(); - self.tip_height = None; - } - - pub async fn clear_all(&mut self) -> StorageResult<()> { - self.clear_in_memory(); - - let persistence_dir = self.base_path.join(I::FOLDER_NAME); - if persistence_dir.exists() { - tokio::fs::remove_dir_all(&persistence_dir).await?; - } - tokio::fs::create_dir_all(&persistence_dir).await?; - - Ok(()) - } - async fn get_segment(&mut self, segment_id: &u32) -> StorageResult<&Segment> { let segment = self.get_segment_mut(segment_id).await?; Ok(&*segment) @@ -249,7 +213,7 @@ impl SegmentCache { let segment = if let Some(segment) = self.evicted.remove(segment_id) { segment } else { - Segment::load(&self.base_path, *segment_id).await? + Segment::load(&self.segments_dir, *segment_id).await? }; let segment = self.segments.entry(*segment_id).or_insert(segment); @@ -354,7 +318,8 @@ impl SegmentCache { height += 1; } - // Update cached tip height with blockchain height + // Update cached tip height and start height + // if needed self.tip_height = match self.tip_height { Some(current) => Some(current.max(height - 1)), None => Some(height - 1), @@ -368,22 +333,20 @@ impl SegmentCache { Ok(()) } - pub async fn persist_evicted(&mut self) { - for (_, segments) in self.evicted.iter_mut() { - if let Err(e) = segments.persist(&self.base_path).await { - tracing::error!("Failed to persist segment: {}", e); + pub async fn persist(&mut self, segments_dir: impl Into) { + let segments_dir = segments_dir.into(); + + for (id, segments) in self.evicted.iter_mut() { + if let Err(e) = segments.persist(&segments_dir).await { + tracing::error!("Failed to persist segment with id {id}: {e}"); } } self.evicted.clear(); - } - - pub async fn persist(&mut self) { - self.persist_evicted().await; - for (_, segments) in self.segments.iter_mut() { - if let Err(e) = segments.persist(&self.base_path).await { - tracing::error!("Failed to persist segment: {}", e); + for (id, segments) in self.segments.iter_mut() { + if let Err(e) = segments.persist(&segments_dir).await { + tracing::error!("Failed to persist segment with id {id}: {e}"); } } } @@ -464,7 +427,7 @@ impl Segment { pub async fn load(base_path: &Path, segment_id: u32) -> StorageResult { // Load segment from disk - let segment_path = base_path.join(I::relative_disk_path(segment_id)); + let segment_path = base_path.join(I::segment_file_name(segment_id)); let (items, state) = if segment_path.exists() { let file = File::open(&segment_path)?; @@ -498,12 +461,13 @@ impl Segment { Ok(Self::new(segment_id, items, state)) } - pub async fn persist(&mut self, base_path: &Path) -> StorageResult<()> { + pub async fn persist(&mut self, segments_dir: impl Into) -> StorageResult<()> { if self.state == SegmentState::Clean { return Ok(()); } - let path = base_path.join(I::relative_disk_path(self.segment_id)); + let segments_dir = segments_dir.into(); + let path = segments_dir.join(I::segment_file_name(self.segment_id)); if let Err(e) = fs::create_dir_all(path.parent().unwrap()) { return Err(StorageError::WriteFailed(format!("Failed to persist segment: {}", e))); @@ -631,25 +595,16 @@ mod tests { cache.store_items_at_height(&items, 10).await.expect("Failed to store items"); - cache.persist().await; + cache.persist(tmp_dir.path()).await; - cache.clear_in_memory(); - assert!(cache.segments.is_empty()); - assert!(cache.evicted.is_empty()); + let mut cache = SegmentCache::::load_or_new(tmp_dir.path()) + .await + .expect("Failed to load new segment_cache"); assert_eq!( - cache.get_items(10..20).await.expect("Failed to retrieve get irems from segment cache"), + cache.get_items(10..20).await.expect("Failed to get items from segment cache"), items ); - - cache.clear_all().await.expect("Failed to clean on-memory and on-disk data"); - assert!(cache.segments.is_empty()); - - let segment = cache.get_segment(&0).await.expect("Failed to create a new segment"); - - assert!(segment.first_valid_offset().is_none()); - assert!(segment.last_valid_offset().is_none()); - assert_eq!(segment.state, SegmentState::Dirty); } #[tokio::test] @@ -678,18 +633,25 @@ mod tests { cache.get_items(0..ITEMS_PER_SEGMENT - 1).await.expect("Failed to get items") ); + let items: Vec<_> = (0..ITEMS_PER_SEGMENT * 2 + ITEMS_PER_SEGMENT / 2) + .map(FilterHeader::new_test) + .collect(); + + cache.store_items(&items).await.expect("Failed to store items"); + assert_eq!( - items[0..(ITEMS_PER_SEGMENT + 1) as usize], - cache.get_items(0..ITEMS_PER_SEGMENT + 1).await.expect("Failed to get items") + items[0..ITEMS_PER_SEGMENT as usize], + cache.get_items(0..ITEMS_PER_SEGMENT).await.expect("Failed to get items") + ); + + assert_eq!( + items[0..(ITEMS_PER_SEGMENT - 1) as usize], + cache.get_items(0..ITEMS_PER_SEGMENT - 1).await.expect("Failed to get items") ); assert_eq!( - items[(ITEMS_PER_SEGMENT - 1) as usize - ..(ITEMS_PER_SEGMENT * 2 + ITEMS_PER_SEGMENT / 2) as usize], - cache - .get_items(ITEMS_PER_SEGMENT - 1..ITEMS_PER_SEGMENT * 2 + ITEMS_PER_SEGMENT / 2) - .await - .expect("Failed to get items") + items[0..(ITEMS_PER_SEGMENT + 1) as usize], + cache.get_items(0..ITEMS_PER_SEGMENT + 1).await.expect("Failed to get items") ); } diff --git a/dash-spv/src/storage/state.rs b/dash-spv/src/storage/state.rs deleted file mode 100644 index 31f5fdda8..000000000 --- a/dash-spv/src/storage/state.rs +++ /dev/null @@ -1,684 +0,0 @@ -//! State persistence and StorageManager trait implementation. - -use async_trait::async_trait; -use std::collections::HashMap; - -use dashcore::{block::Header as BlockHeader, BlockHash, Txid}; - -use crate::error::StorageResult; -use crate::storage::headers::save_index_to_disk; -use crate::storage::{MasternodeState, StorageManager}; -use crate::types::{ChainState, MempoolState, UnconfirmedTransaction}; - -use super::io::atomic_write; -use super::manager::DiskStorageManager; - -impl DiskStorageManager { - /// Store chain state to disk. - pub async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { - // Store other state as JSON - let state_data = serde_json::json!({ - "last_chainlock_height": state.last_chainlock_height, - "last_chainlock_hash": state.last_chainlock_hash, - "current_filter_tip": state.current_filter_tip, - "last_masternode_diff_height": state.last_masternode_diff_height, - "sync_base_height": state.sync_base_height, - }); - - let path = self.base_path.join("state/chain.json"); - let json = state_data.to_string(); - atomic_write(&path, json.as_bytes()).await?; - - Ok(()) - } - - /// Load chain state from disk. - pub async fn load_chain_state(&self) -> StorageResult> { - let path = self.base_path.join("state/chain.json"); - if !path.exists() { - return Ok(None); - } - - let content = tokio::fs::read_to_string(path).await?; - let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| { - crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) - })?; - - let state = ChainState { - last_chainlock_height: value - .get("last_chainlock_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32), - last_chainlock_hash: value - .get("last_chainlock_hash") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse().ok()), - current_filter_tip: value - .get("current_filter_tip") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse().ok()), - masternode_engine: None, - last_masternode_diff_height: value - .get("last_masternode_diff_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32), - sync_base_height: value - .get("sync_base_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32) - .unwrap_or(0), - }; - - Ok(Some(state)) - } - - /// Store masternode state. - pub async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { - let path = self.base_path.join("state/masternode.json"); - let json = serde_json::to_string_pretty(state).map_err(|e| { - crate::error::StorageError::Serialization(format!( - "Failed to serialize masternode state: {}", - e - )) - })?; - - atomic_write(&path, json.as_bytes()).await?; - Ok(()) - } - - /// Load masternode state. - pub async fn load_masternode_state(&self) -> StorageResult> { - let path = self.base_path.join("state/masternode.json"); - if !path.exists() { - return Ok(None); - } - - let content = tokio::fs::read_to_string(path).await?; - let state = serde_json::from_str(&content).map_err(|e| { - crate::error::StorageError::Serialization(format!( - "Failed to deserialize masternode state: {}", - e - )) - })?; - - Ok(Some(state)) - } - - /// Store a ChainLock. - pub async fn store_chain_lock( - &mut self, - height: u32, - chain_lock: &dashcore::ChainLock, - ) -> StorageResult<()> { - let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); - let data = bincode::serialize(chain_lock).map_err(|e| { - crate::error::StorageError::WriteFailed(format!( - "Failed to serialize chain lock: {}", - e - )) - })?; - - atomic_write(&path, &data).await?; - tracing::debug!("Stored chain lock at height {}", height); - Ok(()) - } - - /// Load a ChainLock. - pub async fn load_chain_lock(&self, height: u32) -> StorageResult> { - let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); - - if !path.exists() { - return Ok(None); - } - - let data = tokio::fs::read(&path).await?; - let chain_lock = bincode::deserialize(&data).map_err(|e| { - crate::error::StorageError::ReadFailed(format!( - "Failed to deserialize chain lock: {}", - e - )) - })?; - - Ok(Some(chain_lock)) - } - - /// Get ChainLocks in a height range. - pub async fn get_chain_locks( - &self, - start_height: u32, - end_height: u32, - ) -> StorageResult> { - let chainlocks_dir = self.base_path.join("chainlocks"); - - if !chainlocks_dir.exists() { - return Ok(Vec::new()); - } - - let mut chain_locks = Vec::new(); - let mut entries = tokio::fs::read_dir(&chainlocks_dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let file_name = entry.file_name(); - let file_name_str = file_name.to_string_lossy(); - - // Parse height from filename - if let Some(height_str) = - file_name_str.strip_prefix("chainlock_").and_then(|s| s.strip_suffix(".bin")) - { - if let Ok(height) = height_str.parse::() { - if height >= start_height && height <= end_height { - let path = entry.path(); - let data = tokio::fs::read(&path).await?; - if let Ok(chain_lock) = bincode::deserialize(&data) { - chain_locks.push((height, chain_lock)); - } - } - } - } - } - - // Sort by height - chain_locks.sort_by_key(|(h, _)| *h); - Ok(chain_locks) - } - - /// Store metadata. - pub async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { - let path = self.base_path.join(format!("state/{}.dat", key)); - atomic_write(&path, value).await?; - Ok(()) - } - - /// Load metadata. - pub async fn load_metadata(&self, key: &str) -> StorageResult>> { - let path = self.base_path.join(format!("state/{}.dat", key)); - if !path.exists() { - return Ok(None); - } - - let data = tokio::fs::read(path).await?; - Ok(Some(data)) - } - - /// Clear all storage. - pub async fn clear(&mut self) -> StorageResult<()> { - // First, stop the background worker to avoid races with file deletion - self.stop_worker(); - - // Clear in-memory state - self.block_headers.write().await.clear_in_memory(); - self.filter_headers.write().await.clear_in_memory(); - self.filters.write().await.clear_in_memory(); - - self.header_hash_index.write().await.clear(); - self.mempool_transactions.write().await.clear(); - *self.mempool_state.write().await = None; - - // Remove all files and directories under base_path - if self.base_path.exists() { - // Best-effort removal; if concurrent files appear, retry once - match tokio::fs::remove_dir_all(&self.base_path).await { - Ok(_) => {} - Err(e) => { - // Retry once after a short delay to handle transient races - if e.kind() == std::io::ErrorKind::Other - || e.kind() == std::io::ErrorKind::DirectoryNotEmpty - { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - tokio::fs::remove_dir_all(&self.base_path).await?; - } else { - return Err(crate::error::StorageError::Io(e)); - } - } - } - tokio::fs::create_dir_all(&self.base_path).await?; - } - - // Recreate expected subdirectories - tokio::fs::create_dir_all(self.base_path.join("headers")).await?; - tokio::fs::create_dir_all(self.base_path.join("filters")).await?; - tokio::fs::create_dir_all(self.base_path.join("state")).await?; - - // Restart the background worker for future operations - self.start_worker().await; - - Ok(()) - } - - /// Shutdown the storage manager. - pub async fn shutdown(&mut self) { - self.stop_worker(); - - // Persist all dirty data - self.save_dirty().await; - } - - /// Save all dirty data. - pub(super) async fn save_dirty(&self) { - self.filter_headers.write().await.persist().await; - self.block_headers.write().await.persist().await; - self.filters.write().await.persist().await; - - let path = self.base_path.join("headers/index.dat"); - let index = self.header_hash_index.read().await; - if let Err(e) = save_index_to_disk(&path, &index).await { - tracing::error!("Failed to persist header index: {}", e); - } - } -} - -/// Mempool storage methods -impl DiskStorageManager { - /// Store a mempool transaction. - pub async fn store_mempool_transaction( - &mut self, - txid: &Txid, - tx: &UnconfirmedTransaction, - ) -> StorageResult<()> { - self.mempool_transactions.write().await.insert(*txid, tx.clone()); - Ok(()) - } - - /// Remove a mempool transaction. - pub async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { - self.mempool_transactions.write().await.remove(txid); - Ok(()) - } - - /// Get a mempool transaction. - pub async fn get_mempool_transaction( - &self, - txid: &Txid, - ) -> StorageResult> { - Ok(self.mempool_transactions.read().await.get(txid).cloned()) - } - - /// Get all mempool transactions. - pub async fn get_all_mempool_transactions( - &self, - ) -> StorageResult> { - Ok(self.mempool_transactions.read().await.clone()) - } - - /// Store mempool state. - pub async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { - *self.mempool_state.write().await = Some(state.clone()); - Ok(()) - } - - /// Load mempool state. - pub async fn load_mempool_state(&self) -> StorageResult> { - Ok(self.mempool_state.read().await.clone()) - } - - /// Clear mempool. - pub async fn clear_mempool(&mut self) -> StorageResult<()> { - self.mempool_transactions.write().await.clear(); - *self.mempool_state.write().await = None; - Ok(()) - } -} - -#[async_trait] -impl StorageManager for DiskStorageManager { - async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { - self.store_headers(headers).await - } - - async fn load_headers(&self, range: std::ops::Range) -> StorageResult> { - self.block_headers.write().await.get_items(range).await - } - - async fn get_header(&self, height: u32) -> StorageResult> { - if self.get_tip_height().await.is_none_or(|tip_height| height > tip_height) { - return Ok(None); - } - - if self.get_start_height().await.is_none_or(|start_height| height < start_height) { - return Ok(None); - } - - Ok(self.block_headers.write().await.get_items(height..height + 1).await?.first().copied()) - } - - async fn get_tip_height(&self) -> Option { - self.block_headers.read().await.tip_height() - } - - async fn get_start_height(&self) -> Option { - self.block_headers.read().await.start_height() - } - - async fn get_stored_headers_len(&self) -> u32 { - let headers_guard = self.block_headers.read().await; - let start_height = if let Some(start_height) = headers_guard.start_height() { - start_height - } else { - return 0; - }; - - let end_height = if let Some(end_height) = headers_guard.tip_height() { - end_height - } else { - return 0; - }; - - end_height - start_height + 1 - } - - async fn store_filter_headers( - &mut self, - headers: &[dashcore::hash_types::FilterHeader], - ) -> StorageResult<()> { - self.filter_headers.write().await.store_items(headers).await - } - - async fn load_filter_headers( - &self, - range: std::ops::Range, - ) -> StorageResult> { - self.filter_headers.write().await.get_items(range).await - } - - async fn get_filter_header( - &self, - height: u32, - ) -> StorageResult> { - Ok(self.filter_headers.write().await.get_items(height..height + 1).await?.first().copied()) - } - - async fn get_filter_tip_height(&self) -> StorageResult> { - Ok(self.filter_headers.read().await.tip_height()) - } - - async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { - Self::store_masternode_state(self, state).await - } - - async fn load_masternode_state(&self) -> StorageResult> { - Self::load_masternode_state(self).await - } - - async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { - Self::store_chain_state(self, state).await - } - - async fn load_chain_state(&self) -> StorageResult> { - Self::load_chain_state(self).await - } - - async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> { - self.filters.write().await.store_items_at_height(&[filter.to_vec()], height).await - } - - async fn load_filters(&self, range: std::ops::Range) -> StorageResult>> { - self.filters.write().await.get_items(range).await - } - - async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { - Self::store_metadata(self, key, value).await - } - - async fn load_metadata(&self, key: &str) -> StorageResult>> { - Self::load_metadata(self, key).await - } - - async fn clear(&mut self) -> StorageResult<()> { - Self::clear(self).await - } - - async fn clear_filters(&mut self) -> StorageResult<()> { - // Stop worker to prevent concurrent writes to filter directories - self.stop_worker(); - - // Clear in-memory and on-disk filter headers segments - self.filter_headers.write().await.clear_all().await?; - self.filters.write().await.clear_all().await?; - - // Restart background worker for future operations - self.start_worker().await; - - Ok(()) - } - - async fn get_header_height_by_hash(&self, hash: &BlockHash) -> StorageResult> { - Self::get_header_height_by_hash(self, hash).await - } - - async fn store_chain_lock( - &mut self, - height: u32, - chain_lock: &dashcore::ChainLock, - ) -> StorageResult<()> { - Self::store_chain_lock(self, height, chain_lock).await - } - - async fn load_chain_lock(&self, height: u32) -> StorageResult> { - Self::load_chain_lock(self, height).await - } - - async fn get_chain_locks( - &self, - start_height: u32, - end_height: u32, - ) -> StorageResult> { - Self::get_chain_locks(self, start_height, end_height).await - } - - async fn store_mempool_transaction( - &mut self, - txid: &Txid, - tx: &UnconfirmedTransaction, - ) -> StorageResult<()> { - Self::store_mempool_transaction(self, txid, tx).await - } - - async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { - Self::remove_mempool_transaction(self, txid).await - } - - async fn get_mempool_transaction( - &self, - txid: &Txid, - ) -> StorageResult> { - Self::get_mempool_transaction(self, txid).await - } - - async fn get_all_mempool_transactions( - &self, - ) -> StorageResult> { - Self::get_all_mempool_transactions(self).await - } - - async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { - Self::store_mempool_state(self, state).await - } - - async fn load_mempool_state(&self) -> StorageResult> { - Self::load_mempool_state(self).await - } - - async fn clear_mempool(&mut self) -> StorageResult<()> { - Self::clear_mempool(self).await - } - - async fn shutdown(&mut self) -> StorageResult<()> { - Self::shutdown(self).await; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use dashcore::{block::Version, pow::CompactTarget}; - use dashcore_hashes::Hash; - use tempfile::TempDir; - - fn build_headers(count: usize) -> Vec { - let mut headers = Vec::with_capacity(count); - let mut prev_hash = BlockHash::from_byte_array([0u8; 32]); - - for i in 0..count { - let header = BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: prev_hash, - merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array( - [(i % 255) as u8; 32], - ) - .into(), - time: 1 + i as u32, - bits: CompactTarget::from_consensus(0x1d00ffff), - nonce: i as u32, - }; - prev_hash = header.block_hash(); - headers.push(header); - } - - headers - } - - #[tokio::test] - async fn test_load_headers() -> Result<(), Box> { - // Create a temporary directory for the test - let temp_dir = TempDir::new()?; - let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()) - .await - .expect("Unable to create storage"); - - // Create a test header - let test_header = BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: BlockHash::from_byte_array([1; 32]), - merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array([2; 32]).into(), - time: 12345, - bits: CompactTarget::from_consensus(0x1d00ffff), - nonce: 67890, - }; - - // Store just one header - storage.store_headers(&[test_header]).await?; - - let loaded_headers = storage.load_headers(0..1).await?; - - // Should only get back the one header we stored - assert_eq!(loaded_headers.len(), 1); - assert_eq!(loaded_headers[0], test_header); - - Ok(()) - } - - #[tokio::test] - async fn test_checkpoint_storage_indexing() -> StorageResult<()> { - use dashcore::TxMerkleNode; - use tempfile::tempdir; - - let temp_dir = tempdir().expect("Failed to create temp dir"); - let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; - - // Create test headers starting from checkpoint height - let checkpoint_height = 1_100_000; - let headers: Vec = (0..100) - .map(|i| BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: BlockHash::from_byte_array([i as u8; 32]), - merkle_root: TxMerkleNode::from_byte_array([(i + 1) as u8; 32]), - time: 1234567890 + i, - bits: CompactTarget::from_consensus(0x1a2b3c4d), - nonce: 67890 + i, - }) - .collect(); - - let mut base_state = ChainState::new(); - base_state.sync_base_height = checkpoint_height; - storage.store_chain_state(&base_state).await?; - - storage.store_headers_at_height(&headers, checkpoint_height).await?; - assert_eq!(storage.get_stored_headers_len().await, headers.len() as u32); - - // Verify headers are stored at correct blockchain heights - let header_at_base = storage.get_header(checkpoint_height).await?; - assert_eq!( - header_at_base.expect("Header at base blockchain height should exist"), - headers[0] - ); - - let header_at_ending = storage.get_header(checkpoint_height + 99).await?; - assert_eq!( - header_at_ending.expect("Header at ending blockchain height should exist"), - headers[99] - ); - - // Test the reverse index (hash -> blockchain height) - let hash_0 = headers[0].block_hash(); - let height_0 = storage.get_header_height_by_hash(&hash_0).await?; - assert_eq!( - height_0, - Some(checkpoint_height), - "Hash should map to blockchain height 1,100,000" - ); - - let hash_99 = headers[99].block_hash(); - let height_99 = storage.get_header_height_by_hash(&hash_99).await?; - assert_eq!( - height_99, - Some(checkpoint_height + 99), - "Hash should map to blockchain height 1,100,099" - ); - - // Store chain state to persist sync_base_height - let mut chain_state = ChainState::new(); - chain_state.sync_base_height = checkpoint_height; - storage.store_chain_state(&chain_state).await?; - - // Force save to disk - storage.save_dirty().await; - - drop(storage); - - // Create a new storage instance to test index rebuilding - let storage2 = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; - - // Verify the index was rebuilt correctly - let height_after_rebuild = storage2.get_header_height_by_hash(&hash_0).await?; - assert_eq!( - height_after_rebuild, - Some(checkpoint_height), - "After index rebuild, hash should still map to blockchain height 1,100,000" - ); - - // Verify header can still be retrieved by blockchain height after reload - let header_after_reload = storage2.get_header(checkpoint_height).await?; - assert!( - header_after_reload.is_some(), - "Header at base blockchain height should exist after reload" - ); - assert_eq!(header_after_reload.unwrap(), headers[0]); - - Ok(()) - } - - #[tokio::test] - async fn test_shutdown_flushes_index() -> Result<(), Box> { - let temp_dir = TempDir::new()?; - let base_path = temp_dir.path().to_path_buf(); - let headers = build_headers(11_000); - let last_hash = headers.last().unwrap().block_hash(); - - { - let mut storage = DiskStorageManager::new(base_path.clone()).await?; - - storage.store_headers(&headers[..10_000]).await?; - storage.save_dirty().await; - - storage.store_headers(&headers[10_000..]).await?; - storage.shutdown().await; - } - - let storage = DiskStorageManager::new(base_path).await?; - let height = storage.get_header_height_by_hash(&last_hash).await?; - assert_eq!(height, Some(10_999)); - - Ok(()) - } -} diff --git a/dash-spv/src/storage/transactions.rs b/dash-spv/src/storage/transactions.rs new file mode 100644 index 000000000..480273c4c --- /dev/null +++ b/dash-spv/src/storage/transactions.rs @@ -0,0 +1,96 @@ +use std::{collections::HashMap, path::PathBuf}; + +use async_trait::async_trait; +use dashcore::Txid; + +use crate::{ + error::StorageResult, + storage::PersistentStorage, + types::{MempoolState, UnconfirmedTransaction}, +}; + +#[async_trait] +pub trait TransactionStorage { + async fn store_mempool_transaction( + &mut self, + txid: &Txid, + tx: &UnconfirmedTransaction, + ) -> StorageResult<()>; + + async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()>; + + async fn get_mempool_transaction( + &self, + txid: &Txid, + ) -> StorageResult>; + + async fn get_all_mempool_transactions( + &self, + ) -> StorageResult>; + + async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()>; + + async fn load_mempool_state(&self) -> StorageResult>; +} + +pub struct PersistentTransactionStorage { + mempool_transactions: HashMap, + mempool_state: Option, +} + +#[async_trait] +impl PersistentStorage for PersistentTransactionStorage { + async fn open(_storage_path: impl Into + Send) -> StorageResult { + let mempool_transactions = HashMap::new(); + let mempool_state = None; + + Ok(PersistentTransactionStorage { + mempool_transactions, + mempool_state, + }) + } + + async fn persist(&mut self, _storage_path: impl Into + Send) -> StorageResult<()> { + // This data is not currently being persisted + Ok(()) + } +} + +#[async_trait] +impl TransactionStorage for PersistentTransactionStorage { + async fn store_mempool_transaction( + &mut self, + txid: &Txid, + tx: &UnconfirmedTransaction, + ) -> StorageResult<()> { + self.mempool_transactions.insert(*txid, tx.clone()); + Ok(()) + } + + async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { + self.mempool_transactions.remove(txid); + Ok(()) + } + + async fn get_mempool_transaction( + &self, + txid: &Txid, + ) -> StorageResult> { + Ok(self.mempool_transactions.get(txid).cloned()) + } + + async fn get_all_mempool_transactions( + &self, + ) -> StorageResult> { + Ok(self.mempool_transactions.clone()) + } + + async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { + self.mempool_state = Some(state.clone()); + Ok(()) + } + + async fn load_mempool_state(&self) -> StorageResult> { + Ok(self.mempool_state.clone()) + } +} diff --git a/dash-spv/tests/edge_case_filter_sync_test.rs b/dash-spv/tests/edge_case_filter_sync_test.rs index c5d4760b5..f79b4052c 100644 --- a/dash-spv/tests/edge_case_filter_sync_test.rs +++ b/dash-spv/tests/edge_case_filter_sync_test.rs @@ -16,7 +16,7 @@ use dash_spv::{ client::ClientConfig, error::NetworkResult, network::NetworkManager, - storage::{DiskStorageManager, StorageManager}, + storage::{BlockHeaderStorage, DiskStorageManager, FilterHeaderStorage}, sync::filters::FilterSyncManager, }; use dashcore::{ diff --git a/dash-spv/tests/filter_header_verification_test.rs b/dash-spv/tests/filter_header_verification_test.rs index e8753411e..0d4b6de4a 100644 --- a/dash-spv/tests/filter_header_verification_test.rs +++ b/dash-spv/tests/filter_header_verification_test.rs @@ -19,7 +19,7 @@ use dash_spv::{ client::ClientConfig, error::{NetworkError, NetworkResult, SyncError}, network::NetworkManager, - storage::{DiskStorageManager, StorageManager}, + storage::{BlockHeaderStorage, DiskStorageManager, FilterHeaderStorage}, sync::filters::FilterSyncManager, types::PeerInfo, }; diff --git a/dash-spv/tests/header_sync_test.rs b/dash-spv/tests/header_sync_test.rs index b5a795644..8cfa3e473 100644 --- a/dash-spv/tests/header_sync_test.rs +++ b/dash-spv/tests/header_sync_test.rs @@ -5,7 +5,7 @@ use std::time::Duration; use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::PeerNetworkManager, - storage::{DiskStorageManager, StorageManager}, + storage::{BlockHeaderStorage, ChainStateStorage, DiskStorageManager}, sync::{HeaderSyncManager, ReorgConfig}, types::{ChainState, ValidationMode}, }; @@ -25,7 +25,7 @@ async fn test_basic_header_sync_from_genesis() { // Create fresh storage starting from empty state let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -48,7 +48,7 @@ async fn test_header_sync_continuation() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -83,7 +83,7 @@ async fn test_header_batch_processing() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -133,7 +133,7 @@ async fn test_header_sync_edge_cases() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -171,7 +171,7 @@ async fn test_header_chain_validation() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -209,7 +209,7 @@ async fn test_header_sync_performance() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -273,7 +273,7 @@ async fn test_header_sync_with_client_integration() { // Create storage manager let storage_manager = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -329,7 +329,7 @@ async fn test_header_storage_consistency() { let _ = env_logger::try_init(); let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); @@ -365,9 +365,8 @@ async fn test_header_storage_consistency() { #[tokio::test] async fn test_prepare_sync(sync_base_height: u32, header_count: usize) { let temp_dir = TempDir::new().expect("Failed to create temp dir"); - let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()) - .await - .expect("Failed to create storage"); + let mut storage = + DiskStorageManager::new(temp_dir.path()).await.expect("Failed to create storage"); let headers = create_test_header_chain(header_count); let expected_tip_hash = headers.last().unwrap().block_hash(); diff --git a/dash-spv/tests/integration_real_node_test.rs b/dash-spv/tests/integration_real_node_test.rs index f493a7abd..bb071e045 100644 --- a/dash-spv/tests/integration_real_node_test.rs +++ b/dash-spv/tests/integration_real_node_test.rs @@ -6,10 +6,11 @@ use std::net::SocketAddr; use std::time::{Duration, Instant}; +use dash_spv::storage::BlockHeaderStorage; use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::{NetworkManager, PeerNetworkManager}, - storage::{DiskStorageManager, StorageManager}, + storage::DiskStorageManager, types::ValidationMode, }; use dashcore::Network; @@ -36,8 +37,7 @@ async fn create_test_client( // Create storage manager let storage_manager = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) - .await?; + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()).await?; // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); @@ -200,10 +200,9 @@ async fn test_real_header_sync_up_to_10k() { config.peers.push(peer_addr); // Create fresh storage and client - let storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) - .await - .expect("Failed to create tmp storage"); + let storage = DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) + .await + .expect("Failed to create tmp storage"); // Verify starting from empty state assert_eq!(storage.get_tip_height().await, None); @@ -414,10 +413,9 @@ async fn test_real_header_chain_continuity() { config.peers.push(peer_addr); - let storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) - .await - .expect("Failed to create tmp storage"); + let storage = DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) + .await + .expect("Failed to create tmp storage"); let mut client = create_test_client(config).await.expect("Failed to create SPV client"); diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index f7fdc24ef..4868293fb 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -190,7 +190,7 @@ async fn test_max_peer_limit() { // Create storage manager let storage_manager = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); diff --git a/dash-spv/tests/reverse_index_test.rs b/dash-spv/tests/reverse_index_test.rs index 2b161641b..e09d3097e 100644 --- a/dash-spv/tests/reverse_index_test.rs +++ b/dash-spv/tests/reverse_index_test.rs @@ -1,4 +1,4 @@ -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager, StorageManager}; use dashcore::block::Header as BlockHeader; use dashcore_hashes::Hash; use std::path::PathBuf; @@ -49,7 +49,7 @@ async fn test_reverse_index_disk_storage() { #[tokio::test] async fn test_clear_clears_index() { let mut storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage"); diff --git a/dash-spv/tests/rollback_test.rs b/dash-spv/tests/rollback_test.rs index 7634648c6..5a985e830 100644 --- a/dash-spv/tests/rollback_test.rs +++ b/dash-spv/tests/rollback_test.rs @@ -5,7 +5,7 @@ #![cfg(feature = "skip_mock_implementation_incomplete")] -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager, FilterHeaderStorage}; use dashcore::{ block::{Header as BlockHeader, Version}, pow::CompactTarget, diff --git a/dash-spv/tests/segmented_storage_debug.rs b/dash-spv/tests/segmented_storage_debug.rs index a26bec774..1b10dd97d 100644 --- a/dash-spv/tests/segmented_storage_debug.rs +++ b/dash-spv/tests/segmented_storage_debug.rs @@ -1,6 +1,6 @@ //! Debug test for segmented storage. -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager, StorageManager}; use dashcore::block::{Header as BlockHeader, Version}; use dashcore::pow::CompactTarget; use dashcore::BlockHash; diff --git a/dash-spv/tests/segmented_storage_test.rs b/dash-spv/tests/segmented_storage_test.rs index a9bcf4917..ebdce3b1c 100644 --- a/dash-spv/tests/segmented_storage_test.rs +++ b/dash-spv/tests/segmented_storage_test.rs @@ -1,6 +1,9 @@ //! Tests for segmented disk storage implementation. -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{ + BlockHeaderStorage, DiskStorageManager, FilterHeaderStorage, FilterStorage, MetadataStorage, + StorageManager, +}; use dashcore::block::{Header as BlockHeader, Version}; use dashcore::hash_types::FilterHeader; use dashcore::pow::CompactTarget; diff --git a/dash-spv/tests/simple_header_test.rs b/dash-spv/tests/simple_header_test.rs index 26c46d065..2676dcb5d 100644 --- a/dash-spv/tests/simple_header_test.rs +++ b/dash-spv/tests/simple_header_test.rs @@ -3,7 +3,7 @@ use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::PeerNetworkManager, - storage::{DiskStorageManager, StorageManager}, + storage::{BlockHeaderStorage, DiskStorageManager}, types::ValidationMode, }; use dashcore::Network; @@ -51,10 +51,9 @@ async fn test_simple_header_sync() { config.peers.push(peer_addr); // Create fresh storage - let storage = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) - .await - .expect("Failed to create tmp storage"); + let storage = DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) + .await + .expect("Failed to create tmp storage"); // Verify starting from empty state assert_eq!(storage.get_tip_height().await, None); diff --git a/dash-spv/tests/simple_segmented_test.rs b/dash-spv/tests/simple_segmented_test.rs index 327c08779..9cea06a35 100644 --- a/dash-spv/tests/simple_segmented_test.rs +++ b/dash-spv/tests/simple_segmented_test.rs @@ -1,6 +1,6 @@ //! Simple test without background saving. -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager}; use dashcore::block::{Header as BlockHeader, Version}; use dashcore::pow::CompactTarget; use dashcore::BlockHash; diff --git a/dash-spv/tests/storage_consistency_test.rs b/dash-spv/tests/storage_consistency_test.rs index a5640bf74..cdd166442 100644 --- a/dash-spv/tests/storage_consistency_test.rs +++ b/dash-spv/tests/storage_consistency_test.rs @@ -3,7 +3,7 @@ //! These tests are designed to expose the storage bug where get_tip_height() //! returns a value but get_header() at that height returns None. -use dash_spv::storage::{DiskStorageManager, StorageManager}; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager, StorageManager}; use dashcore::block::{Header as BlockHeader, Version}; use dashcore::pow::CompactTarget; use dashcore::BlockHash; diff --git a/dash-spv/tests/storage_test.rs b/dash-spv/tests/storage_test.rs index 89db5da24..79833d09b 100644 --- a/dash-spv/tests/storage_test.rs +++ b/dash-spv/tests/storage_test.rs @@ -1,9 +1,7 @@ //! Integration tests for storage layer functionality. -use dash_spv::{ - storage::{DiskStorageManager, StorageManager}, - StorageError, -}; +use dash_spv::error::StorageError; +use dash_spv::storage::{BlockHeaderStorage, DiskStorageManager, StorageManager}; use dashcore::{block::Header as BlockHeader, block::Version}; use dashcore_hashes::Hash; use tempfile::TempDir; @@ -88,12 +86,19 @@ async fn test_disk_storage_concurrent_access_blocked() { async fn test_disk_storage_lock_file_lifecycle() { let temp_dir = TempDir::new().expect("Failed to create temp directory"); let path = temp_dir.path().to_path_buf(); - let lock_path = path.join(".lock"); + let lock_path = { + let mut lock_file = path.clone(); + lock_file.set_extension("lock"); + lock_file + }; // Lock file created when storage opens { - let _storage = DiskStorageManager::new(path.clone()).await.unwrap(); + let mut storage = DiskStorageManager::new(path.clone()).await.unwrap(); assert!(lock_path.exists(), "Lock file should exist while storage is open"); + + storage.clear().await.expect("Failed to clear the storage"); + assert!(lock_path.exists(), "Lock file should exist after storage is cleared"); } // Lock file removed when storage drops diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index a13b5b574..8dd8d5c1b 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -22,7 +22,7 @@ async fn create_test_client( // Create storage manager let storage_manager = - DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path().into()) + DiskStorageManager::new(TempDir::new().expect("Failed to create tmp dir").path()) .await .expect("Failed to create tmp storage");