@@ -347,7 +347,13 @@ impl Builder {
347347 EsploraBlockchain :: from_client ( tx_sync. client ( ) . clone ( ) , BDK_CLIENT_STOP_GAP )
348348 . with_concurrency ( BDK_CLIENT_CONCURRENCY ) ;
349349
350- let wallet = Arc :: new ( Wallet :: new ( blockchain, bdk_wallet, Arc :: clone ( & logger) ) ) ;
350+ let runtime = Arc :: new ( RwLock :: new ( None ) ) ;
351+ let wallet = Arc :: new ( Wallet :: new (
352+ blockchain,
353+ bdk_wallet,
354+ Arc :: clone ( & runtime) ,
355+ Arc :: clone ( & logger) ,
356+ ) ) ;
351357
352358 let kv_store = Arc :: new ( FilesystemStore :: new ( ldk_data_dir. clone ( ) . into ( ) ) ) ;
353359
@@ -556,10 +562,11 @@ impl Builder {
556562 }
557563 } ;
558564
559- let running = RwLock :: new ( None ) ;
565+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
560566
561567 Node {
562- running,
568+ runtime,
569+ stop_running,
563570 config,
564571 wallet,
565572 tx_sync,
@@ -579,18 +586,12 @@ impl Builder {
579586 }
580587}
581588
582- /// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped
583- /// upon [`Node::stop()`].
584- struct Runtime {
585- tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
586- stop_runtime : Arc < AtomicBool > ,
587- }
588-
589589/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
590590///
591591/// Needs to be initialized and instantiated through [`Builder::build`].
592592pub struct Node {
593- running : RwLock < Option < Runtime > > ,
593+ runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
594+ stop_running : Arc < AtomicBool > ,
594595 config : Arc < Config > ,
595596 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
596597 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -616,49 +617,15 @@ impl Node {
616617 /// a thread-safe manner.
617618 pub fn start ( & self ) -> Result < ( ) , Error > {
618619 // Acquire a run lock and hold it until we're setup.
619- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
620- if run_lock . is_some ( ) {
620+ let mut runtime_lock = self . runtime . write ( ) . unwrap ( ) ;
621+ if runtime_lock . is_some ( ) {
621622 // We're already running.
622623 return Err ( Error :: AlreadyRunning ) ;
623624 }
624625
625- let runtime = self . setup_runtime ( ) ?;
626- * run_lock = Some ( runtime) ;
627- Ok ( ( ) )
628- }
626+ let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
629627
630- /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
631- ///
632- /// After this returns most API methods will return [`Error::NotRunning`].
633- pub fn stop ( & self ) -> Result < ( ) , Error > {
634- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
635- if run_lock. is_none ( ) {
636- return Err ( Error :: NotRunning ) ;
637- }
638-
639- let runtime = run_lock. as_ref ( ) . unwrap ( ) ;
640-
641- // Stop the runtime.
642- runtime. stop_runtime . store ( true , Ordering :: Release ) ;
643-
644- // Stop disconnect peers.
645- self . peer_manager . disconnect_all_peers ( ) ;
646-
647- // Drop the held runtimes.
648- self . wallet . drop_runtime ( ) ;
649-
650- // Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
651- * run_lock = None ;
652- Ok ( ( ) )
653- }
654-
655- fn setup_runtime ( & self ) -> Result < Runtime , Error > {
656- let tokio_runtime =
657- Arc :: new ( tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ) ;
658-
659- self . wallet . set_runtime ( Arc :: clone ( & tokio_runtime) ) ;
660-
661- let stop_runtime = Arc :: new ( AtomicBool :: new ( false ) ) ;
628+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
662629
663630 let event_handler = Arc :: new ( EventHandler :: new (
664631 Arc :: clone ( & self . wallet ) ,
@@ -667,7 +634,7 @@ impl Node {
667634 Arc :: clone ( & self . network_graph ) ,
668635 Arc :: clone ( & self . keys_manager ) ,
669636 Arc :: clone ( & self . payment_store ) ,
670- Arc :: clone ( & tokio_runtime ) ,
637+ Arc :: clone ( & self . runtime ) ,
671638 Arc :: clone ( & self . logger ) ,
672639 Arc :: clone ( & self . config ) ,
673640 ) ) ;
@@ -678,7 +645,7 @@ impl Node {
678645 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
679646 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
680647 let sync_logger = Arc :: clone ( & self . logger ) ;
681- let stop_sync = Arc :: clone ( & stop_runtime ) ;
648+ let stop_sync = Arc :: clone ( & stop_running ) ;
682649
683650 std:: thread:: spawn ( move || {
684651 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
@@ -709,8 +676,8 @@ impl Node {
709676 } ) ;
710677
711678 let sync_logger = Arc :: clone ( & self . logger ) ;
712- let stop_sync = Arc :: clone ( & stop_runtime ) ;
713- tokio_runtime . spawn ( async move {
679+ let stop_sync = Arc :: clone ( & stop_running ) ;
680+ runtime . spawn ( async move {
714681 loop {
715682 if stop_sync. load ( Ordering :: Acquire ) {
716683 return ;
@@ -737,10 +704,10 @@ impl Node {
737704 if let Some ( listening_address) = & self . config . listening_address {
738705 // Setup networking
739706 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
740- let stop_listen = Arc :: clone ( & stop_runtime ) ;
707+ let stop_listen = Arc :: clone ( & stop_running ) ;
741708 let listening_address = listening_address. clone ( ) ;
742709
743- tokio_runtime . spawn ( async move {
710+ runtime . spawn ( async move {
744711 let listener =
745712 tokio:: net:: TcpListener :: bind ( listening_address) . await . expect (
746713 "Failed to bind to listen address/port - is something else already listening on it?" ,
@@ -767,8 +734,8 @@ impl Node {
767734 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
768735 let connect_logger = Arc :: clone ( & self . logger ) ;
769736 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
770- let stop_connect = Arc :: clone ( & stop_runtime ) ;
771- tokio_runtime . spawn ( async move {
737+ let stop_connect = Arc :: clone ( & stop_running ) ;
738+ runtime . spawn ( async move {
772739 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
773740 loop {
774741 if stop_connect. load ( Ordering :: Acquire ) {
@@ -808,7 +775,7 @@ impl Node {
808775 let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
809776 let background_logger = Arc :: clone ( & self . logger ) ;
810777 let background_scorer = Arc :: clone ( & self . scorer ) ;
811- let stop_background_processing = Arc :: clone ( & stop_runtime ) ;
778+ let stop_background_processing = Arc :: clone ( & stop_running ) ;
812779 let sleeper = move |d| {
813780 let stop = Arc :: clone ( & stop_background_processing) ;
814781 Box :: pin ( async move {
@@ -821,7 +788,7 @@ impl Node {
821788 } )
822789 } ;
823790
824- tokio_runtime . spawn ( async move {
791+ runtime . spawn ( async move {
825792 process_events_async (
826793 background_persister,
827794 |e| background_event_handler. handle_event ( e) ,
@@ -838,7 +805,23 @@ impl Node {
838805 . expect ( "Failed to process events" ) ;
839806 } ) ;
840807
841- Ok ( Runtime { tokio_runtime, stop_runtime } )
808+ * runtime_lock = Some ( runtime) ;
809+ Ok ( ( ) )
810+ }
811+
812+ /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
813+ ///
814+ /// After this returns most API methods will return [`Error::NotRunning`].
815+ pub fn stop ( & self ) -> Result < ( ) , Error > {
816+ let runtime = self . runtime . write ( ) . unwrap ( ) . take ( ) . ok_or ( Error :: NotRunning ) ?;
817+ // Stop the runtime.
818+ self . stop_running . store ( true , Ordering :: Release ) ;
819+
820+ // Stop disconnect peers.
821+ self . peer_manager . disconnect_all_peers ( ) ;
822+
823+ runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
824+ Ok ( ( ) )
842825 }
843826
844827 /// Blocks until the next event is available.
@@ -888,12 +871,11 @@ impl Node {
888871 pub fn connect (
889872 & self , node_id : PublicKey , address : SocketAddr , permanently : bool ,
890873 ) -> Result < ( ) , Error > {
891- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
892- if runtime_lock . is_none ( ) {
874+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
875+ if rt_lock . is_none ( ) {
893876 return Err ( Error :: NotRunning ) ;
894877 }
895-
896- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
878+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
897879
898880 let peer_info = PeerInfo { pubkey : node_id, address } ;
899881
@@ -905,7 +887,7 @@ impl Node {
905887 let con_pm = Arc :: clone ( & self . peer_manager ) ;
906888
907889 tokio:: task:: block_in_place ( move || {
908- runtime. tokio_runtime . block_on ( async move {
890+ runtime. block_on ( async move {
909891 let res =
910892 connect_peer_if_necessary ( con_peer_pubkey, con_peer_addr, con_pm, con_logger)
911893 . await ;
@@ -931,8 +913,8 @@ impl Node {
931913 /// Will also remove the peer from the peer store, i.e., after this has been called we won't
932914 /// try to reconnect on restart.
933915 pub fn disconnect ( & self , counterparty_node_id : & PublicKey ) -> Result < ( ) , Error > {
934- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
935- if runtime_lock . is_none ( ) {
916+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
917+ if rt_lock . is_none ( ) {
936918 return Err ( Error :: NotRunning ) ;
937919 }
938920
@@ -962,12 +944,11 @@ impl Node {
962944 & self , node_id : PublicKey , address : SocketAddr , channel_amount_sats : u64 ,
963945 push_to_counterparty_msat : Option < u64 > , announce_channel : bool ,
964946 ) -> Result < ( ) , Error > {
965- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
966- if runtime_lock . is_none ( ) {
947+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
948+ if rt_lock . is_none ( ) {
967949 return Err ( Error :: NotRunning ) ;
968950 }
969-
970- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
951+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
971952
972953 let cur_balance = self . wallet . get_balance ( ) ?;
973954 if cur_balance. get_spendable ( ) < channel_amount_sats {
@@ -985,7 +966,7 @@ impl Node {
985966 let con_pm = Arc :: clone ( & self . peer_manager ) ;
986967
987968 tokio:: task:: block_in_place ( move || {
988- runtime. tokio_runtime . block_on ( async move {
969+ runtime. block_on ( async move {
989970 let res =
990971 connect_peer_if_necessary ( con_peer_pubkey, con_peer_addr, con_pm, con_logger)
991972 . await ;
@@ -1040,10 +1021,12 @@ impl Node {
10401021 ///
10411022 /// Note that the wallets will be also synced regularly in the background.
10421023 pub fn sync_wallets ( & self ) -> Result < ( ) , Error > {
1043- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
1044- if runtime_lock . is_none ( ) {
1024+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1025+ if rt_lock . is_none ( ) {
10451026 return Err ( Error :: NotRunning ) ;
10461027 }
1028+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
1029+
10471030 let wallet = Arc :: clone ( & self . wallet ) ;
10481031 let tx_sync = Arc :: clone ( & self . tx_sync ) ;
10491032 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
@@ -1054,7 +1037,6 @@ impl Node {
10541037 & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
10551038 ] ;
10561039
1057- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
10581040 tokio:: task:: block_in_place ( move || {
10591041 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
10601042 async move {
@@ -1079,7 +1061,7 @@ impl Node {
10791061
10801062 let sync_logger = Arc :: clone ( & self . logger ) ;
10811063 tokio:: task:: block_in_place ( move || {
1082- runtime. tokio_runtime . block_on ( async move {
1064+ runtime. block_on ( async move {
10831065 let now = Instant :: now ( ) ;
10841066 match tx_sync. sync ( confirmables) . await {
10851067 Ok ( ( ) ) => {
@@ -1114,7 +1096,8 @@ impl Node {
11141096
11151097 /// Send a payement given an invoice.
11161098 pub fn send_payment ( & self , invoice : & Invoice ) -> Result < PaymentHash , Error > {
1117- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1099+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1100+ if rt_lock. is_none ( ) {
11181101 return Err ( Error :: NotRunning ) ;
11191102 }
11201103
@@ -1180,7 +1163,8 @@ impl Node {
11801163 pub fn send_payment_using_amount (
11811164 & self , invoice : & Invoice , amount_msat : u64 ,
11821165 ) -> Result < PaymentHash , Error > {
1183- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1166+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1167+ if rt_lock. is_none ( ) {
11841168 return Err ( Error :: NotRunning ) ;
11851169 }
11861170
@@ -1268,7 +1252,8 @@ impl Node {
12681252 pub fn send_spontaneous_payment (
12691253 & self , amount_msat : u64 , node_id : & PublicKey ,
12701254 ) -> Result < PaymentHash , Error > {
1271- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1255+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1256+ if rt_lock. is_none ( ) {
12721257 return Err ( Error :: NotRunning ) ;
12731258 }
12741259
0 commit comments