diff --git a/.github/workflows/cln-integration.yml b/.github/workflows/cln-integration.yml index 32e7b74c0..d7624181b 100644 --- a/.github/workflows/cln-integration.yml +++ b/.github/workflows/cln-integration.yml @@ -27,4 +27,4 @@ jobs: socat -d -d UNIX-LISTEN:/tmp/lightning-rpc,reuseaddr,fork TCP:127.0.0.1:9937& - name: Run CLN integration tests - run: RUSTFLAGS="--cfg cln_test" cargo test --test integration_tests_cln + run: RUSTFLAGS="--cfg cln_test --cfg cycle_tests" cargo test --test integration_tests_cln diff --git a/.github/workflows/lnd-integration.yml b/.github/workflows/lnd-integration.yml index f913e92ad..894209c4b 100644 --- a/.github/workflows/lnd-integration.yml +++ b/.github/workflows/lnd-integration.yml @@ -51,6 +51,6 @@ jobs: - name: Run LND integration tests run: LND_CERT_PATH=$LND_DATA_DIR/tls.cert LND_MACAROON_PATH=$LND_DATA_DIR/data/chain/bitcoin/regtest/admin.macaroon - RUSTFLAGS="--cfg lnd_test" cargo test --test integration_tests_lnd -- --exact --show-output + RUSTFLAGS="--cfg lnd_test --cfg cycle_tests" cargo test --test integration_tests_lnd -- --exact --show-output env: LND_DATA_DIR: ${{ env.LND_DATA_DIR }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 661703ded..1ccade444 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,11 +80,11 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | - RUSTFLAGS="--cfg no_download" cargo test + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test - name: Test with UniFFI support on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest' && matrix.build-uniffi" run: | - RUSTFLAGS="--cfg no_download" cargo test --features uniffi + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test --features uniffi doc: name: Documentation diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 8473ed413..b5c4e9a0b 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -45,4 +45,4 @@ jobs: cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" RUSTFLAGS="--cfg vss_test" cargo test io::vss_store - RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + RUSTFLAGS="--cfg vss_test --cfg cycle_tests" cargo test --test integration_tests_vss diff --git a/Cargo.toml b/Cargo.toml index cc2a4b194..c6ac55c61 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "a9ad849a0eb7b155a688d713de6d9010cb48f073" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } @@ -124,6 +124,7 @@ check-cfg = [ "cfg(tokio_unstable)", "cfg(cln_test)", "cfg(lnd_test)", + "cfg(cycle_tests)", ] [[bench]] diff --git a/benches/payments.rs b/benches/payments.rs index ba69e046d..4114f69cb 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -8,7 +8,7 @@ use bitcoin::hex::DisplayHex; use bitcoin::Amount; use common::{ expect_channel_ready_event, generate_blocks_and_wait, premine_and_distribute_funds, - setup_bitcoind_and_electrsd, setup_two_nodes_with_store, TestChainSource, + setup_bitcoind_and_electrsd, setup_two_nodes_with_store, TestChainSource, TestNode, }; use criterion::{criterion_group, criterion_main, Criterion}; use ldk_node::{Event, Node}; @@ -18,7 +18,7 @@ use tokio::task::{self}; use crate::common::open_channel_push_amt; -fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { +fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { let mut preimage_bytes = [0u8; 32]; rand::rng().fill_bytes(&mut preimage_bytes); let preimage = PaymentPreimage(preimage_bytes); @@ -61,7 +61,7 @@ fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { }); } -async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Duration { +async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Duration { let start = Instant::now(); let total_payments = 1000; diff --git a/src/builder.rs b/src/builder.rs index 187f780d2..f661d2da1 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -20,7 +20,7 @@ use bitcoin::key::Secp256k1; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; -use lightning::chain::{chainmonitor, BestBlock, Watch}; +use lightning::chain::{chainmonitor, BestBlock}; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; @@ -33,7 +33,7 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -51,7 +51,7 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; -use crate::gossip::GossipSource; +use crate::gossip::{GossipSource, RuntimeSpawner}; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, @@ -72,8 +72,9 @@ use crate::peer_store::PeerStore; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, + KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1051,10 +1052,20 @@ fn build_with_store_internal( } } + let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); + let fee_estimator = Arc::new(OnchainFeeEstimator::new()); + + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (payment_store_res, node_metris_res) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + ) + }); + // Initialize the status fields. - let node_metrics = match runtime - .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) - { + let node_metrics = match node_metris_res { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1065,23 +1076,20 @@ fn build_with_store_internal( } }, }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); - let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = - match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = match payment_store_res { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1261,8 +1269,9 @@ fn build_with_store_internal( )); let peer_storage_key = keys_manager.get_peer_storage_key(); - let persister = Arc::new(Persister::new( + let monitor_reader = Arc::new(AsyncPersister::new( Arc::clone(&kv_store), + RuntimeSpawner::new(Arc::clone(&runtime)), Arc::clone(&logger), PERSISTER_MAX_PENDING_UPDATES, Arc::clone(&keys_manager), @@ -1271,8 +1280,18 @@ fn build_with_store_internal( Arc::clone(&fee_estimator), )); + // Read ChannelMonitors and the NetworkGraph + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (monitor_read_res, network_graph_res) = runtime.block_on(async move { + tokio::join!( + monitor_reader.read_all_channel_monitors_with_updates_parallel(), + read_network_graph(&*kv_store_ref, logger_ref), + ) + }); + // Read ChannelMonitor state from store - let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + let channel_monitors = match monitor_read_res { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1284,6 +1303,16 @@ fn build_with_store_internal( }, }; + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + PERSISTER_MAX_PENDING_UPDATES, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&chain_source)), @@ -1296,9 +1325,7 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = match runtime - .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) - { + let network_graph = match network_graph_res { Ok(graph) => Arc::new(graph), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1310,9 +1337,42 @@ fn build_with_store_internal( }, }; - let local_scorer = match runtime.block_on(async { - read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await - }) { + // Read various smaller LDK and ldk-node objects from the store + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let network_graph_ref = Arc::clone(&network_graph); + let output_sweeper_future = read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store_ref), + Arc::clone(&logger_ref), + ); + let ( + scorer_res, + external_scores_res, + channel_manager_bytes_res, + sweeper_bytes_res, + event_queue_res, + peer_info_res, + ) = runtime.block_on(async move { + tokio::join!( + read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)), + read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)), + KVStore::read( + &*kv_store_ref, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ), + output_sweeper_future, + read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + ) + }); + + let local_scorer = match scorer_res { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1328,9 +1388,7 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match runtime.block_on(async { - read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await - }) { + match external_scores_res { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1383,12 +1441,7 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(reader) = KVStoreSync::read( - &*kv_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ) { + if let Ok(reader) = channel_manager_bytes_res { let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1481,8 +1534,12 @@ fn build_with_store_internal( let gossip_source = match gossip_source_config { GossipSourceConfig::P2PNetwork => { - let p2p_source = - Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger))); + let p2p_source = Arc::new(GossipSource::new_p2p( + Arc::clone(&network_graph), + Arc::clone(&chain_source), + Arc::clone(&runtime), + Arc::clone(&logger), + )); // Reset the RGS sync timestamp in case we somehow switch gossip sources { @@ -1597,34 +1654,19 @@ fn build_with_store_internal( Arc::clone(&keys_manager), )); - let peer_manager_clone = Arc::clone(&peer_manager); - + let peer_manager_clone = Arc::downgrade(&peer_manager); hrn_resolver.register_post_queue_action(Box::new(move || { - peer_manager_clone.process_events(); + if let Some(upgraded_pointer) = peer_manager_clone.upgrade() { + upgraded_pointer.process_events(); + } })); - liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); - - gossip_source.set_gossip_verifier( - Arc::clone(&chain_source), - Arc::clone(&peer_manager), - Arc::clone(&runtime), - ); + liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::downgrade(&peer_manager))); let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match runtime.block_on(async { - read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .await - }) { + let output_sweeper = match sweeper_bytes_res { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1645,9 +1687,7 @@ fn build_with_store_internal( }, }; - let event_queue = match runtime - .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let event_queue = match event_queue_res { Ok(event_queue) => Arc::new(event_queue), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1659,9 +1699,7 @@ fn build_with_store_internal( }, }; - let peer_store = match runtime - .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let peer_store = match peer_info_res { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { diff --git a/src/gossip.rs b/src/gossip.rs index 563d9e1ea..6fd3fb039 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -17,7 +17,7 @@ use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup}; +use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; pub(crate) enum GossipSource { @@ -33,12 +33,15 @@ pub(crate) enum GossipSource { } impl GossipSource { - pub fn new_p2p(network_graph: Arc, logger: Arc) -> Self { - let gossip_sync = Arc::new(P2PGossipSync::new( - network_graph, - None::>, - Arc::clone(&logger), - )); + pub fn new_p2p( + network_graph: Arc, chain_source: Arc, runtime: Arc, + logger: Arc, + ) -> Self { + let verifier = chain_source.as_utxo_source().map(|utxo_source| { + Arc::new(GossipVerifier::new(Arc::new(utxo_source), RuntimeSpawner::new(runtime))) + }); + + let gossip_sync = Arc::new(P2PGossipSync::new(network_graph, verifier, logger)); Self::P2PNetwork { gossip_sync } } @@ -62,27 +65,6 @@ impl GossipSource { } } - pub(crate) fn set_gossip_verifier( - &self, chain_source: Arc, peer_manager: Arc, - runtime: Arc, - ) { - match self { - Self::P2PNetwork { gossip_sync } => { - if let Some(utxo_source) = chain_source.as_utxo_source() { - let spawner = RuntimeSpawner::new(Arc::clone(&runtime)); - let gossip_verifier = Arc::new(GossipVerifier::new( - Arc::new(utxo_source), - spawner, - Arc::clone(gossip_sync), - peer_manager, - )); - gossip_sync.add_utxo_lookup(Some(gossip_verifier)); - } - }, - _ => (), - } - } - pub async fn update_rgs_snapshot(&self) -> Result { match self { Self::P2PNetwork { gossip_sync: _, .. } => Ok(0), @@ -144,7 +126,17 @@ impl RuntimeSpawner { } impl FutureSpawner for RuntimeSpawner { - fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn_cancellable_background_task(future); + type E = tokio::sync::oneshot::error::RecvError; + type SpawnedFutureResult = tokio::sync::oneshot::Receiver; + fn spawn + Send + 'static>( + &self, future: F, + ) -> Self::SpawnedFutureResult { + let (result, output) = tokio::sync::oneshot::channel(); + self.runtime.spawn_cancellable_background_task(async move { + // We don't care if the send works or not, if the receiver is dropped its not our + // problem. + let _ = result.send(future.await); + }); + output } } diff --git a/src/lib.rs b/src/lib.rs index d9bca4551..cd234798b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1761,6 +1761,12 @@ impl Node { Error::PersistenceFailed }) } + + #[cfg(cycle_tests)] + /// Fetch a reference to the inner NetworkGraph, for Arc cycle detection + pub fn fetch_ref(&self) -> std::sync::Weak { + Arc::downgrade(&self.network_graph) + } } impl Drop for Node { diff --git a/src/liquidity.rs b/src/liquidity.rs index 74e6098dd..2151110b6 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::ops::Deref; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::Duration; use bitcoin::hashes::{sha256, Hash}; @@ -291,7 +291,7 @@ where lsps2_service: Option, wallet: Arc, channel_manager: Arc, - peer_manager: RwLock>>, + peer_manager: RwLock>>, keys_manager: Arc, liquidity_manager: Arc, config: Arc, @@ -302,7 +302,7 @@ impl LiquiditySource where L::Target: LdkLogger, { - pub(crate) fn set_peer_manager(&self, peer_manager: Arc) { + pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { *self.peer_manager.write().unwrap() = Some(peer_manager); } @@ -715,8 +715,8 @@ where return; }; - let init_features = if let Some(peer_manager) = - self.peer_manager.read().unwrap().as_ref() + let init_features = if let Some(Some(peer_manager)) = + self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) { // Fail if we're not connected to the prospective channel partner. if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { diff --git a/src/logger.rs b/src/logger.rs index 4eaefad74..6a98b92f2 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -171,6 +171,14 @@ impl LogWriter for Writer { } } +#[cfg(cycle_tests)] +/// Our log writer +pub struct Logger { + /// Specifies the logger's writer. + writer: Writer, +} + +#[cfg(not(cycle_tests))] pub(crate) struct Logger { /// Specifies the logger's writer. writer: Writer, @@ -195,10 +203,12 @@ impl Logger { Ok(Self { writer: Writer::FileWriter { file_path, max_log_level } }) } + /// Creates a new logger writing to the `log` crate pub fn new_log_facade() -> Self { Self { writer: Writer::LogFacadeWriter } } + /// Creates a new logger writing to an underlying [`LogWriter`] pub fn new_custom_writer(log_writer: Arc) -> Self { Self { writer: Writer::CustomWriter(log_writer) } } diff --git a/src/types.rs b/src/types.rs index 5e9cd74c9..f1112fcf8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,7 +23,9 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync, +}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -185,6 +187,16 @@ impl DynStoreTrait for DynStoreWrapper } } +pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< + Arc, + RuntimeSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub type Persister = MonitorUpdatingPersister< Arc, Arc, @@ -254,7 +266,7 @@ pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; -pub(crate) type UtxoLookup = GossipVerifier, Arc>; +pub(crate) type UtxoLookup = GossipVerifier>; pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 96f58297c..5a725812d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod logging; use std::collections::{HashMap, HashSet}; use std::env; use std::future::Future; +use std::ops::Deref; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -261,10 +262,51 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { TestConfig { node_config, ..Default::default() } } +pub struct TestNode { + #[cfg(feature = "uniffi")] + inner: Option>, + #[cfg(not(feature = "uniffi"))] + inner: Option, +} + #[cfg(feature = "uniffi")] -type TestNode = Arc; +impl From> for TestNode { + fn from(inner: Arc) -> Self { + Self { inner: Some(inner) } + } +} + #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +impl From for TestNode { + fn from(inner: Node) -> Self { + Self { inner: Some(inner) } + } +} + +impl Deref for TestNode { + type Target = Node; + fn deref(&self) -> &Node { + #[cfg(feature = "uniffi")] + { + self.inner.as_ref().unwrap().deref() + } + #[cfg(not(feature = "uniffi"))] + { + &self.inner.as_ref().unwrap() + } + } +} + +#[cfg(cycle_tests)] +impl Drop for TestNode { + fn drop(&mut self) { + if !std::thread::panicking() { + let graph_ref = (**self).fetch_ref(); + self.inner.take(); + assert_eq!(graph_ref.strong_count(), 0); + } + } +} #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -430,7 +472,7 @@ pub(crate) fn setup_node_for_async_payments( node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); - node + node.into() } pub(crate) async fn generate_blocks_and_wait( diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4d2a17422..99743cad5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,8 +23,8 @@ use common::{ expect_splice_pending_event, generate_blocks_and_wait, open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, - setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, - TestSyncStore, + setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestNode, + TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::liquidity::LSPS2ServiceConfig; @@ -154,15 +154,15 @@ async fn multi_hop_sending() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); // Setup and fund 5 nodes - let mut nodes = Vec::new(); + let mut nodes: Vec = Vec::new(); for _ in 0..5 { let config = random_config(true); let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = builder.build(config.node_entropy.into()).unwrap(); + let node: TestNode = builder.build(config.node_entropy.into()).unwrap().into(); node.start().unwrap(); - nodes.push(node); + nodes.push(node.into()); } let addresses = nodes.iter().map(|n| n.onchain_payment().new_address().unwrap()).collect(); @@ -1730,7 +1730,8 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); @@ -1740,13 +1741,15 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr, None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr = service_node.onchain_payment().new_address().unwrap(); @@ -2047,7 +2050,8 @@ async fn lsps2_client_trusts_lsp() { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); let service_addr = service_node.listening_addresses().unwrap().first().unwrap().clone(); @@ -2056,14 +2060,16 @@ async fn lsps2_client_trusts_lsp() { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr_onchain = service_node.onchain_payment().new_address().unwrap(); @@ -2220,7 +2226,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); @@ -2230,7 +2237,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); @@ -2238,7 +2246,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr_onchain = service_node.onchain_payment().new_address().unwrap(); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 54912b358..2184052e9 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -24,28 +24,30 @@ async fn channel_full_cycle_with_vss_store() { let mut builder_a = Builder::from_config(config_a.node_config); builder_a.set_chain_source_esplora(esplora_url.clone(), None); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); - let node_a = builder_a + let node_a: common::TestNode = builder_a .build_with_vss_store_and_fixed_headers( config_a.node_entropy, vss_base_url.clone(), "node_1_store".to_string(), HashMap::new(), ) - .unwrap(); + .unwrap() + .into(); node_a.start().unwrap(); println!("\n== Node B =="); let config_b = common::random_config(true); let mut builder_b = Builder::from_config(config_b.node_config); builder_b.set_chain_source_esplora(esplora_url.clone(), None); - let node_b = builder_b + let node_b: common::TestNode = builder_b .build_with_vss_store_and_fixed_headers( config_b.node_entropy, vss_base_url, "node_2_store".to_string(), HashMap::new(), ) - .unwrap(); + .unwrap() + .into(); node_b.start().unwrap(); common::do_channel_full_cycle(