@@ -1120,6 +1120,10 @@ where
11201120
11211121 let mut futures = Joiner :: new ( ) ;
11221122
1123+ // Capture the pending count before persisting. Only this many writes will be
1124+ // flushed afterward, so that updates arriving after persist aren't included.
1125+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
1126+
11231127 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
11241128 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
11251129
@@ -1317,6 +1321,10 @@ where
13171321 res?;
13181322 }
13191323
1324+ // Flush monitor operations that were pending before we persisted. New updates
1325+ // that arrived after are left for the next iteration.
1326+ let _ = chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1327+
13201328 match check_and_reset_sleeper ( & mut last_onion_message_handler_call, || {
13211329 sleeper ( ONION_MESSAGE_HANDLER_TIMER )
13221330 } ) {
@@ -1373,6 +1381,7 @@ where
13731381 // After we exit, ensure we persist the ChannelManager one final time - this avoids
13741382 // some races where users quit while channel updates were in-flight, with
13751383 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1384+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
13761385 kv_store
13771386 . write (
13781387 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -1381,6 +1390,10 @@ where
13811390 channel_manager. get_cm ( ) . encode ( ) ,
13821391 )
13831392 . await ?;
1393+
1394+ // Flush monitor operations that were pending before final persistence.
1395+ let _ = chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1396+
13841397 if let Some ( ref scorer) = scorer {
13851398 kv_store
13861399 . write (
@@ -1684,6 +1697,11 @@ impl BackgroundProcessor {
16841697 channel_manager. get_cm ( ) . timer_tick_occurred ( ) ;
16851698 last_freshness_call = Instant :: now ( ) ;
16861699 }
1700+
1701+ // Capture the pending count before persisting. Only this many writes will be
1702+ // flushed afterward, so that updates arriving after persist aren't included.
1703+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
1704+
16871705 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
16881706 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
16891707 ( kv_store. write (
@@ -1695,6 +1713,10 @@ impl BackgroundProcessor {
16951713 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
16961714 }
16971715
1716+ // Flush monitor operations that were pending before we persisted. New
1717+ // updates that arrived after are left for the next iteration.
1718+ let _ = chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1719+
16981720 if let Some ( liquidity_manager) = liquidity_manager. as_ref ( ) {
16991721 log_trace ! ( logger, "Persisting LiquidityManager..." ) ;
17001722 let _ = liquidity_manager. get_lm ( ) . persist ( ) . map_err ( |e| {
@@ -1809,12 +1831,17 @@ impl BackgroundProcessor {
18091831 // After we exit, ensure we persist the ChannelManager one final time - this avoids
18101832 // some races where users quit while channel updates were in-flight, with
18111833 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1834+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
18121835 kv_store. write (
18131836 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
18141837 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
18151838 CHANNEL_MANAGER_PERSISTENCE_KEY ,
18161839 channel_manager. get_cm ( ) . encode ( ) ,
18171840 ) ?;
1841+
1842+ // Flush monitor operations that were pending before final persistence.
1843+ let _ = chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1844+
18181845 if let Some ( ref scorer) = scorer {
18191846 kv_store. write (
18201847 SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -1896,9 +1923,10 @@ mod tests {
18961923 use bitcoin:: transaction:: { Transaction , TxOut } ;
18971924 use bitcoin:: { Amount , ScriptBuf , Txid } ;
18981925 use core:: sync:: atomic:: { AtomicBool , Ordering } ;
1926+ use lightning:: chain:: chainmonitor;
18991927 use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
19001928 use lightning:: chain:: transaction:: OutPoint ;
1901- use lightning:: chain:: { chainmonitor , BestBlock , Confirm , Filter } ;
1929+ use lightning:: chain:: { BestBlock , Confirm , Filter } ;
19021930 use lightning:: events:: { Event , PathFailure , ReplayEvent } ;
19031931 use lightning:: ln:: channelmanager;
19041932 use lightning:: ln:: channelmanager:: {
@@ -2444,6 +2472,7 @@ mod tests {
24442472 Arc :: clone ( & kv_store) ,
24452473 Arc :: clone ( & keys_manager) ,
24462474 keys_manager. get_peer_storage_key ( ) ,
2475+ true ,
24472476 ) ) ;
24482477 let best_block = BestBlock :: from_network ( network) ;
24492478 let params = ChainParameters { network, best_block } ;
@@ -2567,6 +2596,8 @@ mod tests {
25672596 ( persist_dir, nodes)
25682597 }
25692598
2599+ /// Opens a channel between two nodes without a running `BackgroundProcessor`,
2600+ /// so deferred monitor operations are flushed manually at each step.
25702601 macro_rules! open_channel {
25712602 ( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
25722603 begin_open_channel!( $node_a, $node_b, $channel_value) ;
@@ -2582,19 +2613,33 @@ mod tests {
25822613 tx. clone( ) ,
25832614 )
25842615 . unwrap( ) ;
2616+ // funding_transaction_generated does not call watch_channel, so no
2617+ // deferred op is queued and FundingCreated is available immediately.
25852618 let msg_a = get_event_msg!(
25862619 $node_a,
25872620 MessageSendEvent :: SendFundingCreated ,
25882621 $node_b. node. get_our_node_id( )
25892622 ) ;
25902623 $node_b. node. handle_funding_created( $node_a. node. get_our_node_id( ) , & msg_a) ;
2624+ // Flush node_b's new monitor (watch_channel) so it releases the
2625+ // FundingSigned message.
2626+ $node_b
2627+ . chain_monitor
2628+ . flush( $node_b. chain_monitor. pending_operation_count( ) , & $node_b. logger)
2629+ . unwrap( ) ;
25912630 get_event!( $node_b, Event :: ChannelPending ) ;
25922631 let msg_b = get_event_msg!(
25932632 $node_b,
25942633 MessageSendEvent :: SendFundingSigned ,
25952634 $node_a. node. get_our_node_id( )
25962635 ) ;
25972636 $node_a. node. handle_funding_signed( $node_b. node. get_our_node_id( ) , & msg_b) ;
2637+ // Flush node_a's new monitor (watch_channel) queued by
2638+ // handle_funding_signed.
2639+ $node_a
2640+ . chain_monitor
2641+ . flush( $node_a. chain_monitor. pending_operation_count( ) , & $node_a. logger)
2642+ . unwrap( ) ;
25982643 get_event!( $node_a, Event :: ChannelPending ) ;
25992644 tx
26002645 } } ;
@@ -2720,6 +2765,20 @@ mod tests {
27202765 confirm_transaction_depth ( node, tx, ANTI_REORG_DELAY ) ;
27212766 }
27222767
2768+ /// Waits until the background processor has flushed all pending deferred monitor
2769+ /// operations for the given node. Panics if the pending count does not reach zero
2770+ /// within `EVENT_DEADLINE`.
2771+ fn wait_for_flushed ( chain_monitor : & ChainMonitor ) {
2772+ let start = std:: time:: Instant :: now ( ) ;
2773+ while chain_monitor. pending_operation_count ( ) > 0 {
2774+ assert ! (
2775+ start. elapsed( ) < EVENT_DEADLINE ,
2776+ "Pending monitor operations were not flushed within deadline"
2777+ ) ;
2778+ std:: thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
2779+ }
2780+ }
2781+
27232782 #[ test]
27242783 fn test_background_processor ( ) {
27252784 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -3060,11 +3119,22 @@ mod tests {
30603119 . node
30613120 . funding_transaction_generated ( temporary_channel_id, node_1_id, funding_tx. clone ( ) )
30623121 . unwrap ( ) ;
3122+ // funding_transaction_generated does not call watch_channel, so no deferred op is
3123+ // queued and the FundingCreated message is available immediately.
30633124 let msg_0 = get_event_msg ! ( nodes[ 0 ] , MessageSendEvent :: SendFundingCreated , node_1_id) ;
30643125 nodes[ 1 ] . node . handle_funding_created ( node_0_id, & msg_0) ;
3126+ // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
3127+ // events and FundingSigned are released.
3128+ nodes[ 1 ]
3129+ . chain_monitor
3130+ . flush ( nodes[ 1 ] . chain_monitor . pending_operation_count ( ) , & nodes[ 1 ] . logger )
3131+ . unwrap ( ) ;
30653132 get_event ! ( nodes[ 1 ] , Event :: ChannelPending ) ;
30663133 let msg_1 = get_event_msg ! ( nodes[ 1 ] , MessageSendEvent :: SendFundingSigned , node_0_id) ;
30673134 nodes[ 0 ] . node . handle_funding_signed ( node_1_id, & msg_1) ;
3135+ // Wait for the bg processor to flush the new monitor (watch_channel) queued by
3136+ // handle_funding_signed.
3137+ wait_for_flushed ( & nodes[ 0 ] . chain_monitor ) ;
30683138 channel_pending_recv
30693139 . recv_timeout ( EVENT_DEADLINE )
30703140 . expect ( "ChannelPending not handled within deadline" ) ;
@@ -3125,6 +3195,9 @@ mod tests {
31253195 error_message. to_string ( ) ,
31263196 )
31273197 . unwrap ( ) ;
3198+ // Wait for the bg processor to flush the monitor update triggered by force close
3199+ // so the commitment tx is broadcast.
3200+ wait_for_flushed ( & nodes[ 0 ] . chain_monitor ) ;
31283201 let commitment_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
31293202 confirm_transaction_depth ( & mut nodes[ 0 ] , & commitment_tx, BREAKDOWN_TIMEOUT as u32 ) ;
31303203
0 commit comments