Skip to content

Commit 4800a47

Browse files
joostjagerclaude
andcommitted
Add AChainMonitor trait and use it in background processor
Add a new `AChainMonitor` trait following the same pattern as `AChannelManager`. This trait provides associated types for all generic parameters of `ChainMonitor` and a `get_cm()` method to access the underlying `ChainMonitor`. Update the background processor to use `AChainMonitor` trait bounds instead of spelling out the full `ChainMonitor` generic parameters. This simplifies the function signatures by removing 5-6 explicit generic parameters (CF, T, F, P, ES) per function. This is preparation for adding a flush method to the AChainMonitor trait. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 35ab03f commit 4800a47

File tree

2 files changed

+118
-43
lines changed

2 files changed

+118
-43
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 58 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ mod fwd_batch;
3030

3131
use fwd_batch::BatchDelay;
3232

33+
#[cfg(not(c_bindings))]
3334
use lightning::chain;
35+
#[cfg(not(c_bindings))]
3436
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
37+
use lightning::chain::chainmonitor::AChainMonitor;
3638
#[cfg(feature = "std")]
3739
use lightning::events::EventHandler;
3840
#[cfg(feature = "std")]
@@ -50,9 +52,9 @@ use lightning::onion_message::messenger::AOnionMessenger;
5052
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
5153
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
5254
use lightning::routing::utxo::UtxoLookup;
53-
use lightning::sign::{
54-
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
55-
};
55+
#[cfg(not(c_bindings))]
56+
use lightning::sign::EntropySource;
57+
use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender};
5658
#[cfg(not(c_bindings))]
5759
use lightning::util::async_poll::MaybeSend;
5860
use lightning::util::logger::Logger;
@@ -118,6 +120,7 @@ use alloc::vec::Vec;
118120
///
119121
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
120122
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
123+
/// [`ChainMonitor::rebroadcast_pending_claims`]: lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims
121124
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
122125
/// [`Event`]: lightning::events::Event
123126
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
@@ -923,16 +926,11 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
923926
pub async fn process_events_async<
924927
'a,
925928
UL: UtxoLookup,
926-
CF: chain::Filter,
927-
T: BroadcasterInterface,
928-
F: FeeEstimator,
929929
G: Deref<Target = NetworkGraph<L>>,
930930
L: Logger,
931-
P: Deref,
932931
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
933932
EventHandler: Fn(Event) -> EventHandlerFuture,
934-
ES: EntropySource,
935-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
933+
M: Deref,
936934
CM: Deref,
937935
OM: Deref,
938936
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -942,7 +940,17 @@ pub async fn process_events_async<
942940
D: Deref,
943941
O: OutputSpender,
944942
K: KVStore,
945-
OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
943+
OS: Deref<
944+
Target = OutputSweeper<
945+
<M::Target as AChainMonitor>::Broadcaster,
946+
D,
947+
<M::Target as AChainMonitor>::FeeEstimator,
948+
<M::Target as AChainMonitor>::Filter,
949+
K,
950+
L,
951+
O,
952+
>,
953+
>,
946954
S: Deref<Target = SC>,
947955
SC: for<'b> WriteableScore<'b>,
948956
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -955,7 +963,7 @@ pub async fn process_events_async<
955963
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
956964
) -> Result<(), lightning::io::Error>
957965
where
958-
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
966+
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
959967
CM::Target: AChannelManager,
960968
OM::Target: AOnionMessenger,
961969
PM::Target: APeerManager,
@@ -1004,7 +1012,7 @@ where
10041012
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
10051013
channel_manager.get_cm().timer_tick_occurred();
10061014
log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1007-
chain_monitor.rebroadcast_pending_claims();
1015+
chain_monitor.get_cm().rebroadcast_pending_claims();
10081016

10091017
let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
10101018
let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
@@ -1022,7 +1030,7 @@ where
10221030

10231031
loop {
10241032
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
1025-
chain_monitor.process_pending_events_async(async_event_handler).await;
1033+
chain_monitor.get_cm().process_pending_events_async(async_event_handler).await;
10261034
if let Some(om) = &onion_messenger {
10271035
om.get_om().process_pending_events_async(async_event_handler).await
10281036
}
@@ -1072,7 +1080,7 @@ where
10721080
let fut = Selector {
10731081
a: sleeper(sleep_delay),
10741082
b: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1075-
c: chain_monitor.get_update_future(),
1083+
c: chain_monitor.get_cm().get_update_future(),
10761084
d: om_fut,
10771085
e: lm_fut,
10781086
f: gv_fut,
@@ -1164,7 +1172,7 @@ where
11641172
};
11651173
if archive_timer_elapsed {
11661174
log_trace!(logger, "Archiving stale ChannelMonitors.");
1167-
chain_monitor.archive_fully_resolved_channel_monitors();
1175+
chain_monitor.get_cm().archive_fully_resolved_channel_monitors();
11681176
have_archived = true;
11691177
log_trace!(logger, "Archived stale ChannelMonitors.");
11701178
}
@@ -1354,7 +1362,7 @@ where
13541362
match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
13551363
Some(false) => {
13561364
log_trace!(logger, "Rebroadcasting monitor's pending claims");
1357-
chain_monitor.rebroadcast_pending_claims();
1365+
chain_monitor.get_cm().rebroadcast_pending_claims();
13581366
},
13591367
Some(true) => break,
13601368
None => {},
@@ -1416,16 +1424,11 @@ fn check_and_reset_sleeper<
14161424
/// synchronous background persistence.
14171425
pub async fn process_events_async_with_kv_store_sync<
14181426
UL: UtxoLookup,
1419-
CF: chain::Filter,
1420-
T: BroadcasterInterface,
1421-
F: FeeEstimator,
14221427
G: Deref<Target = NetworkGraph<L>>,
14231428
L: Logger,
1424-
P: Deref,
14251429
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14261430
EventHandler: Fn(Event) -> EventHandlerFuture,
1427-
ES: EntropySource,
1428-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1431+
M: Deref,
14291432
CM: Deref,
14301433
OM: Deref,
14311434
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1435,7 +1438,17 @@ pub async fn process_events_async_with_kv_store_sync<
14351438
D: Deref,
14361439
O: OutputSpender,
14371440
K: Deref,
1438-
OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1441+
OS: Deref<
1442+
Target = OutputSweeperSync<
1443+
<M::Target as AChainMonitor>::Broadcaster,
1444+
D,
1445+
<M::Target as AChainMonitor>::FeeEstimator,
1446+
<M::Target as AChainMonitor>::Filter,
1447+
K,
1448+
L,
1449+
O,
1450+
>,
1451+
>,
14391452
S: Deref<Target = SC>,
14401453
SC: for<'b> WriteableScore<'b>,
14411454
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1448,7 +1461,7 @@ pub async fn process_events_async_with_kv_store_sync<
14481461
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
14491462
) -> Result<(), lightning::io::Error>
14501463
where
1451-
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1464+
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
14521465
CM::Target: AChannelManager,
14531466
OM::Target: AOnionMessenger,
14541467
PM::Target: APeerManager,
@@ -1523,20 +1536,10 @@ impl BackgroundProcessor {
15231536
pub fn start<
15241537
'a,
15251538
UL: 'static + UtxoLookup,
1526-
CF: 'static + chain::Filter,
1527-
T: 'static + BroadcasterInterface,
1528-
F: 'static + FeeEstimator + Send,
15291539
G: 'static + Deref<Target = NetworkGraph<L>>,
15301540
L: 'static + Deref + Send,
1531-
P: 'static + Deref,
15321541
EH: 'static + EventHandler + Send,
1533-
ES: 'static + EntropySource + Send,
1534-
M: 'static
1535-
+ Deref<
1536-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1537-
>
1538-
+ Send
1539-
+ Sync,
1542+
M: 'static + Deref + Send + Sync,
15401543
CM: 'static + Deref + Send,
15411544
OM: 'static + Deref + Send,
15421545
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
@@ -1548,15 +1551,27 @@ impl BackgroundProcessor {
15481551
D: 'static + Deref,
15491552
O: 'static + OutputSpender,
15501553
K: 'static + Deref + Send,
1551-
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1554+
OS: 'static
1555+
+ Deref<
1556+
Target = OutputSweeperSync<
1557+
<M::Target as AChainMonitor>::Broadcaster,
1558+
D,
1559+
<M::Target as AChainMonitor>::FeeEstimator,
1560+
<M::Target as AChainMonitor>::Filter,
1561+
K,
1562+
L,
1563+
O,
1564+
>,
1565+
>
1566+
+ Send,
15521567
>(
15531568
kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
15541569
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
15551570
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
15561571
) -> Self
15571572
where
15581573
L::Target: 'static + Logger,
1559-
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1574+
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
15601575
CM::Target: AChannelManager,
15611576
OM::Target: AOnionMessenger,
15621577
PM::Target: APeerManager,
@@ -1596,7 +1611,7 @@ impl BackgroundProcessor {
15961611
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
15971612
channel_manager.get_cm().timer_tick_occurred();
15981613
log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1599-
chain_monitor.rebroadcast_pending_claims();
1614+
chain_monitor.get_cm().rebroadcast_pending_claims();
16001615

16011616
let mut last_freshness_call = Instant::now();
16021617
let mut last_onion_message_handler_call = Instant::now();
@@ -1615,7 +1630,7 @@ impl BackgroundProcessor {
16151630

16161631
loop {
16171632
channel_manager.get_cm().process_pending_events(&event_handler);
1618-
chain_monitor.process_pending_events(&event_handler);
1633+
chain_monitor.get_cm().process_pending_events(&event_handler);
16191634
if let Some(om) = &onion_messenger {
16201635
om.get_om().process_pending_events(&event_handler)
16211636
};
@@ -1648,7 +1663,7 @@ impl BackgroundProcessor {
16481663
let gv_fut = gossip_sync.validation_completion_future();
16491664
let always_futures = [
16501665
channel_manager.get_cm().get_event_or_persistence_needed_future(),
1651-
chain_monitor.get_update_future(),
1666+
chain_monitor.get_cm().get_update_future(),
16521667
];
16531668
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut);
16541669
let sleeper = Sleeper::from_futures(futures);
@@ -1701,7 +1716,7 @@ impl BackgroundProcessor {
17011716
let archive_timer_elapsed = last_archive_call.elapsed() > archive_timer;
17021717
if archive_timer_elapsed {
17031718
log_trace!(logger, "Archiving stale ChannelMonitors.");
1704-
chain_monitor.archive_fully_resolved_channel_monitors();
1719+
chain_monitor.get_cm().archive_fully_resolved_channel_monitors();
17051720
have_archived = true;
17061721
last_archive_call = Instant::now();
17071722
log_trace!(logger, "Archived stale ChannelMonitors.");
@@ -1786,7 +1801,7 @@ impl BackgroundProcessor {
17861801
}
17871802
if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
17881803
log_trace!(logger, "Rebroadcasting monitor's pending claims");
1789-
chain_monitor.rebroadcast_pending_claims();
1804+
chain_monitor.get_cm().rebroadcast_pending_claims();
17901805
last_rebroadcast_call = Instant::now();
17911806
}
17921807
}

lightning/src/chain/chainmonitor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1488,6 +1488,66 @@ where
14881488
}
14891489
}
14901490

1491+
/// A trivial trait which describes any [`ChainMonitor`].
1492+
///
1493+
/// This is not exported to bindings users as general cover traits aren't useful in other
1494+
/// languages.
1495+
pub trait AChainMonitor {
1496+
/// A type implementing [`EcdsaChannelSigner`].
1497+
type Signer: EcdsaChannelSigner + Sized;
1498+
/// A type implementing [`chain::Filter`].
1499+
type Filter: chain::Filter;
1500+
/// A type implementing [`BroadcasterInterface`].
1501+
type Broadcaster: BroadcasterInterface;
1502+
/// A type implementing [`FeeEstimator`].
1503+
type FeeEstimator: FeeEstimator;
1504+
/// A type implementing [`Logger`].
1505+
type Logger: Logger;
1506+
/// A type that derefs to [`Persist`].
1507+
type Persister: Deref<Target = Self::PersisterTarget>;
1508+
/// The target of [`Self::Persister`].
1509+
type PersisterTarget: Persist<Self::Signer> + ?Sized;
1510+
/// A type implementing [`EntropySource`].
1511+
type EntropySource: EntropySource;
1512+
/// Returns a reference to the actual [`ChainMonitor`] object.
1513+
fn get_cm(
1514+
&self,
1515+
) -> &ChainMonitor<
1516+
Self::Signer,
1517+
Self::Filter,
1518+
Self::Broadcaster,
1519+
Self::FeeEstimator,
1520+
Self::Logger,
1521+
Self::Persister,
1522+
Self::EntropySource,
1523+
>;
1524+
}
1525+
1526+
impl<
1527+
ChannelSigner: EcdsaChannelSigner,
1528+
C: chain::Filter,
1529+
T: BroadcasterInterface,
1530+
F: FeeEstimator,
1531+
L: Logger,
1532+
P: Deref,
1533+
ES: EntropySource,
1534+
> AChainMonitor for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
1535+
where
1536+
P::Target: Persist<ChannelSigner>,
1537+
{
1538+
type Signer = ChannelSigner;
1539+
type Filter = C;
1540+
type Broadcaster = T;
1541+
type FeeEstimator = F;
1542+
type Logger = L;
1543+
type Persister = P;
1544+
type PersisterTarget = P::Target;
1545+
type EntropySource = ES;
1546+
fn get_cm(&self) -> &ChainMonitor<ChannelSigner, C, T, F, L, P, ES> {
1547+
self
1548+
}
1549+
}
1550+
14911551
#[cfg(test)]
14921552
mod tests {
14931553
use crate::chain::channelmonitor::ANTI_REORG_DELAY;

0 commit comments

Comments
 (0)