Skip to content

Commit 17f80ba

Browse files
committed
Add PendingPaymentStore to track pending payments and replaced txids
Track pending payments with their replaced/conflicting transaction IDs in a separate store. Pending payments are created here on WalletEvent::TxUnconfirmed, then removed once they reach ANTI_REORG_DELAY confirmations. This avoids scanning the entire payment store and enables efficient cleanup.
1 parent 08cbd8b commit 17f80ba

File tree

5 files changed

+180
-1
lines changed

5 files changed

+180
-1
lines changed

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
7878
///
7979
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
8080
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
81+
82+
/// The pending payment information will be persisted under this prefix.
83+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments";
84+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::io::{
4646
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4747
};
4848
use crate::logger::{log_error, LdkLogger, Logger};
49+
use crate::payment::PendingPaymentDetails;
4950
use crate::peer_store::PeerStore;
5051
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
5152
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
@@ -626,6 +627,83 @@ pub(crate) fn read_bdk_wallet_change_set(
626627
Ok(Some(change_set))
627628
}
628629

630+
/// Read previously persisted pending payments information from the store.
631+
pub(crate) async fn read_pending_payments<L: Deref>(
632+
kv_store: &DynStore, logger: L,
633+
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
634+
where
635+
L::Target: LdkLogger,
636+
{
637+
let mut res = Vec::new();
638+
639+
let mut stored_keys = KVStore::list(
640+
&*kv_store,
641+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
642+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
643+
)
644+
.await?;
645+
646+
const BATCH_SIZE: usize = 50;
647+
648+
let mut set = tokio::task::JoinSet::new();
649+
650+
// Fill JoinSet with tasks if possible
651+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
652+
if let Some(next_key) = stored_keys.pop() {
653+
let fut = KVStore::read(
654+
&*kv_store,
655+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
656+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
657+
&next_key,
658+
);
659+
set.spawn(fut);
660+
debug_assert!(set.len() <= BATCH_SIZE);
661+
}
662+
}
663+
664+
while let Some(read_res) = set.join_next().await {
665+
// Exit early if we get an IO error.
666+
let reader = read_res
667+
.map_err(|e| {
668+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
669+
set.abort_all();
670+
e
671+
})?
672+
.map_err(|e| {
673+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
674+
set.abort_all();
675+
e
676+
})?;
677+
678+
// Refill set for every finished future, if we still have something to do.
679+
if let Some(next_key) = stored_keys.pop() {
680+
let fut = KVStore::read(
681+
&*kv_store,
682+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
683+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
684+
&next_key,
685+
);
686+
set.spawn(fut);
687+
debug_assert!(set.len() <= BATCH_SIZE);
688+
}
689+
690+
// Handle result.
691+
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
692+
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
693+
std::io::Error::new(
694+
std::io::ErrorKind::InvalidData,
695+
"Failed to deserialize PendingPaymentDetails",
696+
)
697+
})?;
698+
res.push(pending_payment);
699+
}
700+
701+
debug_assert!(set.is_empty());
702+
debug_assert!(stored_keys.is_empty());
703+
704+
Ok(res)
705+
}
706+
629707
#[cfg(test)]
630708
mod tests {
631709
use super::read_or_generate_seed_file;

src/payment/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
1111
mod bolt11;
1212
mod bolt12;
1313
mod onchain;
14+
pub(crate) mod pending_payment_store;
1415
mod spontaneous;
1516
pub(crate) mod store;
1617
mod unified;
1718

1819
pub use bolt11::Bolt11Payment;
1920
pub use bolt12::Bolt12Payment;
2021
pub use onchain::OnchainPayment;
22+
pub use pending_payment_store::PendingPaymentDetails;
2123
pub use spontaneous::SpontaneousPayment;
2224
pub use store::{
2325
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use bitcoin::Txid;
9+
use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId};
10+
11+
use crate::{
12+
data_store::{StorableObject, StorableObjectUpdate},
13+
payment::{store::PaymentDetailsUpdate, PaymentDetails},
14+
};
15+
16+
/// Represents a pending payment
17+
#[derive(Clone, Debug, PartialEq, Eq)]
18+
pub struct PendingPaymentDetails {
19+
/// The full payment details
20+
pub details: PaymentDetails,
21+
/// Transaction IDs that have replaced or conflict with this payment.
22+
pub conflicting_txids: Vec<Txid>,
23+
}
24+
25+
impl PendingPaymentDetails {
26+
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
27+
Self { details, conflicting_txids }
28+
}
29+
30+
/// Convert to finalized payment for the main payment store
31+
pub fn into_payment_details(self) -> PaymentDetails {
32+
self.details
33+
}
34+
}
35+
36+
impl_writeable_tlv_based!(PendingPaymentDetails, {
37+
(0, details, required),
38+
(2, conflicting_txids, optional_vec),
39+
});
40+
41+
#[derive(Clone, Debug, PartialEq, Eq)]
42+
pub(crate) struct PendingPaymentDetailsUpdate {
43+
pub id: PaymentId,
44+
pub payment_update: Option<PaymentDetailsUpdate>,
45+
pub conflicting_txids: Option<Vec<Txid>>,
46+
}
47+
48+
impl StorableObject for PendingPaymentDetails {
49+
type Id = PaymentId;
50+
type Update = PendingPaymentDetailsUpdate;
51+
52+
fn id(&self) -> Self::Id {
53+
self.details.id
54+
}
55+
56+
fn update(&mut self, update: &Self::Update) -> bool {
57+
let mut updated = false;
58+
59+
// Update the underlying payment details if present
60+
if let Some(payment_update) = &update.payment_update {
61+
updated |= self.details.update(payment_update);
62+
}
63+
64+
if let Some(new_conflicting_txids) = &update.conflicting_txids {
65+
if &self.conflicting_txids != new_conflicting_txids {
66+
self.conflicting_txids = new_conflicting_txids.clone();
67+
updated = true;
68+
}
69+
}
70+
71+
updated
72+
}
73+
74+
fn to_update(&self) -> Self::Update {
75+
self.into()
76+
}
77+
}
78+
79+
impl StorableObjectUpdate<PendingPaymentDetails> for PendingPaymentDetailsUpdate {
80+
fn id(&self) -> <PendingPaymentDetails as StorableObject>::Id {
81+
self.id
82+
}
83+
}
84+
85+
impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
86+
fn from(value: &PendingPaymentDetails) -> Self {
87+
Self {
88+
id: value.id(),
89+
payment_update: Some(value.details.to_update()),
90+
conflicting_txids: Some(value.conflicting_txids.clone()),
91+
}
92+
}
93+
}

src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::data_store::DataStore;
3939
use crate::fee_estimator::OnchainFeeEstimator;
4040
use crate::logger::Logger;
4141
use crate::message_handler::NodeCustomMessageHandler;
42-
use crate::payment::PaymentDetails;
42+
use crate::payment::{PaymentDetails, PendingPaymentDetails};
4343
use crate::runtime::RuntimeSpawner;
4444

4545
/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
@@ -621,3 +621,5 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
621621
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
622622
}
623623
}
624+
625+
pub(crate) type PendingPaymentStore = DataStore<PendingPaymentDetails, Arc<Logger>>;

0 commit comments

Comments
 (0)