From 71913383664dbc05ed8e5fdea400b3ca81afb23e Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Wed, 30 Apr 2025 11:59:23 -0500 Subject: [PATCH 1/6] htlcswitch: add AttemptStore interface Preperatory refactor to allow for future alteration of the store backing the Switch. --- htlcswitch/interfaces.go | 29 +++++++++++++++++++++++++++++ htlcswitch/payment_result.go | 14 +++++++------- htlcswitch/payment_result_test.go | 18 +++++++++--------- htlcswitch/switch.go | 16 ++++++++-------- htlcswitch/switch_test.go | 2 +- 5 files changed, 54 insertions(+), 25 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 4739afff6ec..26c8f4941ce 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -530,3 +530,32 @@ type AuxTrafficShaper interface { // meaning that it's from a custom channel. IsCustomHTLC(htlcRecords lnwire.CustomRecords) bool } + +// AttemptStore defines the interface for initializing, managing, and tracking +// the lifecycle of HTLC payment attempts. It supports both local and remote +// payment lifecycle controllers. +// +// NOTE: This store is designed to be managed by a *single* logical dispatcher +// (e.g., a ChannelRouter instance, or a remote payment orchestrator) at a time +// to avoid collisions of attemptIDs and ensure an unambiguous source of truth +// for payment state. Concurrent usage by multiple dispatchers is NOT supported. +type AttemptStore interface { + // StoreResult stores the result of a given payment attempt (identified + // by attemptID). This will be called when a result is received from the + // network. + StoreResult(attemptID uint64, result *networkResult) error + + // GetResult returns the network result for the specified attempt ID if + // it's available. + GetResult(attemptID uint64) (*networkResult, error) + + // SubscribeResult subscribes to be notified when a result for a + // specific attempt ID becomes available. It returns a channel that will + // receive the result. + SubscribeResult(attemptID uint64) (<-chan *networkResult, error) + + // CleanStore removes all attempt results from the store except for + // those listed in the keepPids map. This allows for a "delete all + // except" approach to cleanup. + CleanStore(keepPids map[uint64]struct{}) error +} diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index db959e2d1a2..50b463c2687 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -104,9 +104,9 @@ func newNetworkResultStore(db kvdb.Backend) *networkResultStore { } } -// storeResult stores the networkResult for the given attemptID, and notifies +// StoreResult stores the networkResult for the given attemptID, and notifies // any subscribers. -func (store *networkResultStore) storeResult(attemptID uint64, +func (store *networkResultStore) StoreResult(attemptID uint64, result *networkResult) error { // We get a mutex for this attempt ID. This is needed to ensure @@ -152,9 +152,9 @@ func (store *networkResultStore) storeResult(attemptID uint64, return nil } -// subscribeResult is used to get the HTLC attempt result for the given attempt +// SubscribeResult is used to get the HTLC attempt result for the given attempt // ID. It returns a channel on which the result will be delivered when ready. -func (store *networkResultStore) subscribeResult(attemptID uint64) ( +func (store *networkResultStore) SubscribeResult(attemptID uint64) ( <-chan *networkResult, error) { // We get a mutex for this payment ID. This is needed to ensure @@ -214,7 +214,7 @@ func (store *networkResultStore) subscribeResult(attemptID uint64) ( // getResult attempts to immediately fetch the result for the given pid from // the store. If no result is available, ErrPaymentIDNotFound is returned. -func (store *networkResultStore) getResult(pid uint64) ( +func (store *networkResultStore) GetResult(pid uint64) ( *networkResult, error) { var result *networkResult @@ -253,12 +253,12 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { return deserializeNetworkResult(r) } -// cleanStore removes all entries from the store, except the payment IDs given. +// CleanStore removes all entries from the store, except the payment IDs given. // NOTE: Since every result not listed in the keep map will be deleted, care // should be taken to ensure no new payment attempts are being made // concurrently while this process is ongoing, as its result might end up being // deleted. -func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { +func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error { return kvdb.Update(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( networkResultStoreBucketKey, diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index f6def146528..9120fd9a771 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -118,7 +118,7 @@ func TestNetworkResultStore(t *testing.T) { // Subscribe to 2 of them. var subs []<-chan *networkResult for i := uint64(0); i < 2; i++ { - sub, err := store.subscribeResult(i) + sub, err := store.SubscribeResult(i) if err != nil { t.Fatalf("unable to subscribe: %v", err) } @@ -127,7 +127,7 @@ func TestNetworkResultStore(t *testing.T) { // Store three of them. for i := uint64(0); i < 3; i++ { - err := store.storeResult(i, results[i]) + err := store.StoreResult(i, results[i]) if err != nil { t.Fatalf("unable to store result: %v", err) } @@ -144,7 +144,7 @@ func TestNetworkResultStore(t *testing.T) { // Let the third one subscribe now. THe result should be received // immediately. - sub, err := store.subscribeResult(2) + sub, err := store.SubscribeResult(2) require.NoError(t, err, "unable to subscribe") select { case <-sub: @@ -154,22 +154,22 @@ func TestNetworkResultStore(t *testing.T) { // Try fetching the result directly for the non-stored one. This should // fail. - _, err = store.getResult(3) + _, err = store.GetResult(3) if err != ErrPaymentIDNotFound { t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) } // Add the result and try again. - err = store.storeResult(3, results[3]) + err = store.StoreResult(3, results[3]) require.NoError(t, err, "unable to store result") - _, err = store.getResult(3) + _, err = store.GetResult(3) require.NoError(t, err, "unable to get result") // Since we don't delete results from the store (yet), make sure we // will get subscriptions for all of them. for i := uint64(0); i < numResults; i++ { - sub, err := store.subscribeResult(i) + sub, err := store.SubscribeResult(i) if err != nil { t.Fatalf("unable to subscribe: %v", err) } @@ -187,12 +187,12 @@ func TestNetworkResultStore(t *testing.T) { 1: {}, } // Finally, delete the result. - err = store.cleanStore(toKeep) + err = store.CleanStore(toKeep) require.NoError(t, err) // Payment IDs 0 and 1 should be found, 2 and 3 should be deleted. for i := uint64(0); i < numResults; i++ { - _, err = store.getResult(i) + _, err = store.GetResult(i) if i <= 1 { require.NoError(t, err, "unable to get result") } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a3aae809b93..6e93efed769 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -255,12 +255,12 @@ type Switch struct { // service was initialized with. cfg *Config - // networkResults stores the results of payments initiated by the user. + // attemptStore stores the results of payments initiated by the user. // The store is used to later look up the payments and notify the // user of the result when they are complete. Each payment attempt // should be given a unique integer ID when it is created, otherwise // results might be overwritten. - networkResults *networkResultStore + attemptStore AttemptStore // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. @@ -380,7 +380,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), linkStopIndex: make(map[lnwire.ChannelID]chan struct{}), - networkResults: newNetworkResultStore(cfg.DB), + attemptStore: newNetworkResultStore(cfg.DB), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), @@ -438,7 +438,7 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro // HasAttemptResult reads the network result store to fetch the specified // attempt. Returns true if the attempt result exists. func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { - _, err := s.networkResults.getResult(attemptID) + _, err := s.attemptStore.GetResult(attemptID) if err == nil { return true, nil } @@ -475,7 +475,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, // is already available. // Assumption: no one will add this attempt ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { - res, err := s.networkResults.getResult(attemptID) + res, err := s.attemptStore.GetResult(attemptID) if err != nil { return nil, err } @@ -485,7 +485,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, } else { // The HTLC was committed to the circuits, subscribe for a // result. - nChan, err = s.networkResults.subscribeResult(attemptID) + nChan, err = s.attemptStore.SubscribeResult(attemptID) if err != nil { return nil, err } @@ -537,7 +537,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, // preiodically to let the switch clean up payment results that we have // handled. func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { - return s.networkResults.cleanStore(keepPids) + return s.attemptStore.CleanStore(keepPids) } // SendHTLC is used by other subsystems which aren't belong to htlc switch @@ -964,7 +964,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // Store the result to the db. This will also notify subscribers about // the result. - if err := s.networkResults.storeResult(attemptID, n); err != nil { + if err := s.attemptStore.StoreResult(attemptID, n); err != nil { log.Errorf("Unable to store attempt result for pid=%v: %v", attemptID, err) return diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index e8176aaeb59..4f9fdf6f522 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -3106,7 +3106,7 @@ func TestSwitchGetAttemptResult(t *testing.T) { isResolution: true, } - err = s.networkResults.storeResult(paymentID, n) + err = s.attemptStore.StoreResult(paymentID, n) require.NoError(t, err, "unable to store result") // The result should be available. From 885448f9b38e08b8c8f440619aaa4089a295828f Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Fri, 12 Dec 2025 12:21:26 -0500 Subject: [PATCH 2/6] htlcswitch: implement InitAttempt for idempotent dispatch This commit adds the `InitAttempt` method to the `AttemptStore` interface. This persists the intent to dispatch an HTLC before it is sent to the network, creating a durable record that acts as an idempotency key. This allows for safe, idempotent retries from clients, as subsequent calls for the same attempt ID will be rejected. A new `pending` message type to represent an HTLC that has been initialized but not yet dispatched serves as a placeholder in the durable store. A new error, `ErrAttemptResultPending`, is also introduced to more accurately communicate the state of an attempt. This allows callers to distinguish between an unknown attempt and one that is pending but has not yet received a final result. --- htlcswitch/interfaces.go | 19 ++++ htlcswitch/payment_result.go | 147 ++++++++++++++++++++++++++++-- htlcswitch/payment_result_test.go | 101 ++++++++++++++++++++ htlcswitch/switch.go | 37 ++++++-- 4 files changed, 286 insertions(+), 18 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 26c8f4941ce..e5ee03f7643 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -540,6 +540,15 @@ type AuxTrafficShaper interface { // to avoid collisions of attemptIDs and ensure an unambiguous source of truth // for payment state. Concurrent usage by multiple dispatchers is NOT supported. type AttemptStore interface { + // InitAttempt persists the intent to dispatch a payment attempt, + // creating a durable record that serves as an idempotency key. This + // method should be called before the HTLC is dispatched to the network. + // + // NOTE: This method provides a guarantee that only one HTLC can be + // initialized for a given attempt ID until the ID is explicitly cleaned + // from the store. + InitAttempt(attemptID uint64) error + // StoreResult stores the result of a given payment attempt (identified // by attemptID). This will be called when a result is received from the // network. @@ -547,11 +556,21 @@ type AttemptStore interface { // GetResult returns the network result for the specified attempt ID if // it's available. + // + // NOTE: This method should return ErrAttemptResultPending for attempts + // that have been initialized via InitAttempt but for which a final + // result (settle/fail) has not yet been stored. ErrPaymentIDNotFound is + // returned for attempts that are unknown. GetResult(attemptID uint64) (*networkResult, error) // SubscribeResult subscribes to be notified when a result for a // specific attempt ID becomes available. It returns a channel that will // receive the result. + // + // NOTE: For backwards compatibility, the returned channel should only + // receive a value for attempts that have a final result (settle/fail). + // Subscribers should *not* be notified of the initial pending state + // created by InitAttempt. SubscribeResult(attemptID uint64) (<-chan *networkResult, error) // CleanStore removes all attempt results from the store except for diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index 50b463c2687..c009613bb5f 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -26,6 +26,18 @@ var ( // ErrPaymentIDAlreadyExists is returned if we try to write a pending // payment whose paymentID already exists. ErrPaymentIDAlreadyExists = errors.New("paymentID already exists") + + // ErrAttemptResultPending is returned if we try to get a result + // for a pending payment whose result is not yet available. + ErrAttemptResultPending = errors.New( + "attempt result not yet available", + ) +) + +const ( + // pendingHtlcMsgType is a custom message type used to represent a + // pending HTLC in the network result store. + pendingHtlcMsgType lnwire.MessageType = 32768 ) // PaymentResult wraps a decoded result received from the network after a @@ -90,9 +102,10 @@ type networkResultStore struct { results map[uint64][]chan *networkResult resultsMtx sync.Mutex - // attemptIDMtx is a multimutex used to make sure the database and - // result subscribers map is consistent for each attempt ID in case of - // concurrent callers. + // attemptIDMtx is a multimutex used to serialize operations for a + // given attempt ID. It ensures InitAttempt's idempotency by protecting + // its read-then-write sequence from concurrent calls, and it maintains + // consistency between the database state and result subscribers. attemptIDMtx *multimutex.Mutex[uint64] } @@ -104,6 +117,94 @@ func newNetworkResultStore(db kvdb.Backend) *networkResultStore { } } +// InitAttempt initializes the payment attempt with the given attemptID. +// +// If any record (even a pending result placeholder) already exists in the +// store, this method returns ErrPaymentIDAlreadyExists. This guarantees that +// only one HTLC will be initialized and dispatched for a given attempt ID until +// the ID is explicitly cleaned from attempt store. +// +// NOTE: This is part of the AttemptStore interface. Subscribed clients do not +// receive notice of this initialization. +func (store *networkResultStore) InitAttempt(attemptID uint64) error { + // We get a mutex for this attempt ID to serialize init, store, and + // subscribe operations. This is needed to ensure consistency between + // the database state and the subscribers in case of concurrent calls. + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) + + err := kvdb.Update(store.backend, func(tx kvdb.RwTx) error { + // Check if any attempt by this ID is already initialized or + // whether a result for the attempt exists in the store. + existingResult, err := fetchResult(tx, attemptID) + if err != nil && !errors.Is(err, ErrPaymentIDNotFound) { + return err + } + + // If the result is already in-progress, return an error + // indicating that the attempt already exists. + if existingResult != nil { + log.Warnf("Already initialized attempt for ID=%v", + attemptID) + + return ErrPaymentIDAlreadyExists + } + + // Create an empty networkResult to serve as place holder until + // a result from the network is received. + // + // TODO(calvin): When migrating to native SQL storage, replace + // this custom message placeholder with a proper status enum or + // struct to represent the pending state of an attempt. + pendingMsg, err := lnwire.NewCustom(pendingHtlcMsgType, nil) + if err != nil { + // This should not happen with a static message type, + // but if it does, it's an internal error that prevents + // a definitive outcome, so we must treat it as + // ambiguous. + return err + } + inProgressResult := &networkResult{ + msg: pendingMsg, + unencrypted: true, + isResolution: false, + } + + var b bytes.Buffer + err = serializeNetworkResult(&b, inProgressResult) + if err != nil { + return err + } + + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) + + // Mark an attempt with this ID as having been seen by storing + // the pending placeholder. No network result is available yet, + // so we do not notify subscribers. + bucket, err := tx.CreateTopLevelBucket( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + return bucket.Put(attemptIDBytes[:], b.Bytes()) + }, func() { + // No need to reset existingResult here as it's scoped to the + // transaction. + }) + + if err != nil { + return err + } + + log.Debugf("Initialized attempt for local payment with attemptID=%v", + attemptID) + + return nil +} + // StoreResult stores the networkResult for the given attemptID, and notifies // any subscribers. func (store *networkResultStore) StoreResult(attemptID uint64, @@ -194,11 +295,21 @@ func (store *networkResultStore) SubscribeResult(attemptID uint64) ( return nil, err } - // If the result was found, we can send it on the result channel - // imemdiately. + // If a result is back from the network, we can send it on the result + // channel immediately. If the result is still our initialized place + // holder, then treat it as not yet available. if result != nil { - resultChan <- result - return resultChan, nil + if result.msg.MsgType() != pendingHtlcMsgType { + log.Debugf("Obtained full result for attemptID=%v", + attemptID) + + resultChan <- result + + return resultChan, nil + } + + log.Debugf("Awaiting result (settle/fail) for attemptID=%v", + attemptID) } // Otherwise we store the result channel for when the result is @@ -212,8 +323,13 @@ func (store *networkResultStore) SubscribeResult(attemptID uint64) ( return resultChan, nil } -// getResult attempts to immediately fetch the result for the given pid from -// the store. If no result is available, ErrPaymentIDNotFound is returned. +// GetResult attempts to immediately fetch the *final* network result for the +// given attempt ID from the store. +// +// NOTE: This method will return ErrAttemptResultPending for attempts +// that have been initialized via InitAttempt but for which a final result +// (settle/fail) has not yet been stored. ErrPaymentIDNotFound is returned +// for attempts that are unknown. func (store *networkResultStore) GetResult(pid uint64) ( *networkResult, error) { @@ -221,7 +337,18 @@ func (store *networkResultStore) GetResult(pid uint64) ( err := kvdb.View(store.backend, func(tx kvdb.RTx) error { var err error result, err = fetchResult(tx, pid) - return err + if err != nil { + return err + } + + // If the attempt is still in-flight, treat it as not yet + // available to preserve existing expectation for the behavior + // of this method. + if result.msg.MsgType() == pendingHtlcMsgType { + return ErrAttemptResultPending + } + + return nil }, func() { result = nil }) diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index 9120fd9a771..4b8a0453fe7 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -4,6 +4,8 @@ import ( "bytes" "math/rand" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -200,4 +202,103 @@ func TestNetworkResultStore(t *testing.T) { t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) } } + + t.Run("InitAttempt duplicate prevention", func(t *testing.T) { + var id uint64 = 100 + + // Fetch the result directly. We expect to observe + // ErrPaymentIDNotFound since we've not yet initialized this + // payment attempt. + _, err = store.GetResult(id) + require.ErrorIs(t, err, ErrPaymentIDNotFound, + "expected not found error for uninitialized attempt") + + // First initialization should succeed. + err := store.InitAttempt(id) + require.NoError(t, err, "unexpected InitAttempt failure") + + // Now, try fetching the result directly. We expect to observe + // ErrAttemptResultPending since it's initialized but not + // yet finalized. + _, err = store.GetResult(id) + require.ErrorIs(t, err, ErrAttemptResultPending, + "expected result unavailable error for pending attempt") + + // Subscribe for the result following the initialization. No + // result should be received immediately as StoreResult has not + // yet updated the initialized attempt into a finalized attempt. + sub, err := store.SubscribeResult(id) + require.NoError(t, err, "unable to subscribe") + select { + case <-sub: + t.Fatalf("unexpected non-final result notification " + + "received") + case <-time.After(1 * time.Second): + } + + // Second initialization should fail (already initialized). + // Try initializing an attempt with the same ID before a full + // settle or fail result is back from the network (simulated by + // StoreResult). + err = store.InitAttempt(id) + require.ErrorIs(t, err, ErrPaymentIDAlreadyExists, + "expected duplicate InitAttempt to fail") + + // Store a result to simulate a full settle or fail HTLC result + // coming back from the network. + netResult := &networkResult{ + msg: &lnwire.UpdateFulfillHTLC{}, + unencrypted: true, + isResolution: true, + } + err = store.StoreResult(id, netResult) + require.NoError(t, err, "unable to store result after init") + + // Try InitAttempt again — still should fail even after a full + // result is back from the network. + err = store.InitAttempt(id) + require.ErrorIs(t, err, ErrPaymentIDAlreadyExists, + "expected InitAttempt to fail after result stored") + + // Now we confirm that the subscriber is notified of the final + // attempt result. + select { + case <-sub: + case <-time.After(1 * time.Second): + t.Fatalf("failed to receive final (settle/fail) result") + } + + // Verify that ID can be re-used only after explicit deletion of + // attempts via CleanStore or a DeleteAttempts style method. + err = store.CleanStore(map[uint64]struct{}{}) + require.NoError(t, err) + + // Now InitAttempt should succeed again. + err = store.InitAttempt(id) + require.NoError(t, err, "InitAttempt should succeed after "+ + "cleanup") + }) + + t.Run("Concurrent InitAttempt", func(t *testing.T) { + var ( + id uint64 = 999 + wg sync.WaitGroup + success atomic.Int32 + ) + + // Launch 10 concurrent routines trying to init the same ID. + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + if err := store.InitAttempt(id); err == nil { + success.Add(1) + } + }() + } + wg.Wait() + + // Only exactly one call should succeed. + require.EqualValues(t, 1, success.Load()) + }) } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 6e93efed769..8c3d788a94a 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -443,11 +443,16 @@ func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { return true, nil } - if !errors.Is(err, ErrPaymentIDNotFound) { - return false, err + // If we have not heard of this attempt ID, or have dispatched the + // attempt but have not yet received the final (settle/fail) result, + // then we return a nil error. + if errors.Is(err, ErrPaymentIDNotFound) || + errors.Is(err, ErrAttemptResultPending) { + + return false, nil } - return false, nil + return false, err } // GetAttemptResult returns the result of the HTLC attempt with the given @@ -476,13 +481,29 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, // Assumption: no one will add this attempt ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { res, err := s.attemptStore.GetResult(attemptID) - if err != nil { + switch { + // We have a final result, we can send it immediately. + case err == nil: + c := make(chan *networkResult, 1) + c <- res + nChan = c + + // The attempt is known, but the result is not yet available, + // so we fall through to subscribe. + case errors.Is(err, ErrAttemptResultPending): + log.Tracef("Attempt %d known, but result not yet "+ + "available. Subscribing for result.", attemptID) + + // If the error is anything else, we return it to the caller. + default: return nil, err } - c := make(chan *networkResult, 1) - c <- res - nChan = c - } else { + } + + // If nChan is still nil, it means we need to subscribe. This happens + // if the circuit was found, or if GetResult told us the result is not + // yet available. + if nChan == nil { // The HTLC was committed to the circuits, subscribe for a // result. nChan, err = s.attemptStore.SubscribeResult(attemptID) From 344c9a7e2cd1a92edb76ab9ff04a59591e089192 Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Fri, 12 Dec 2025 12:47:44 -0500 Subject: [PATCH 3/6] htlcswitch: implement FailPendingAttempt and FetchPendingAttempts This commit adds two key methods for managing the lifecycle of payment attempts: - `FailPendingAttempt`: Provides a synchronous mechanism to roll back an initialized attempt to a terminal `FAILED` state. This is crucial for preventing orphaned attempts when pre-dispatch validation fails within the `SendHTLC` logic. - `FetchPendingAttempts`: Allows the switch to query for any attempts that were initialized but not actually delivered to the network. This enables a robust cleanup routine on startup to resolve orphaned payments that could result from a node crash. --- htlcswitch/interfaces.go | 20 ++++ htlcswitch/payment_result.go | 181 ++++++++++++++++++++++++++++-- htlcswitch/payment_result_test.go | 103 +++++++++++++++++ htlcswitch/switch_test.go | 52 +++++++++ 4 files changed, 347 insertions(+), 9 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index e5ee03f7643..88ae4407825 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -554,6 +554,26 @@ type AttemptStore interface { // network. StoreResult(attemptID uint64, result *networkResult) error + // FailPendingAttempt transitions an initialized attempt from an + // initialized to a failed state and records the provided reason. This + // is the synchronous rollback mechanism for attempts that fail before + // being committed to the forwarding engine. It returns an error if the + // underlying storage fails. + FailPendingAttempt(attemptID uint64, reason *LinkError) error + + // FetchPendingAttempts returns a list of all attempt IDs that are + // currently in the pending state. + // + // NOTE: This function is primarily used by the Switch's startup logic + // to identify and clean up internally orphaned payment attempts. These + // occur when an attempt is initialized via InitAttempt but a crash + // prevents its full dispatch to the network (e.g., between InitAttempt + // and CommitCircuits). By fetching these pending attempts, the Switch + // can transition their state to FAILED, preventing external callers + // from indefinitely hanging on GetAttemptResult for an un-dispatched + // attempt. + FetchPendingAttempts() ([]uint64, error) + // GetResult returns the network result for the specified attempt ID if // it's available. // diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index c009613bb5f..29f06650bde 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" @@ -218,7 +219,19 @@ func (store *networkResultStore) StoreResult(attemptID uint64, log.Debugf("Storing result for attemptID=%v", attemptID) - // Serialize the payment result. + if err := store.storeResult(attemptID, result); err != nil { + return err + } + + store.notifySubscribers(attemptID, result) + + return nil +} + +// storeResult persists the given result to the database. +func (store *networkResultStore) storeResult(attemptID uint64, + result *networkResult) error { + var b bytes.Buffer if err := serializeNetworkResult(&b, result); err != nil { return err @@ -227,7 +240,7 @@ func (store *networkResultStore) StoreResult(attemptID uint64, var attemptIDBytes [8]byte binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) - err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { + return kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( networkResultStoreBucketKey, ) @@ -237,20 +250,20 @@ func (store *networkResultStore) StoreResult(attemptID uint64, return networkResults.Put(attemptIDBytes[:], b.Bytes()) }) - if err != nil { - return err - } +} + +// notifySubscribers notifies any subscribers of the final result for the given +// attempt ID. +func (store *networkResultStore) notifySubscribers(attemptID uint64, + result *networkResult) { - // Now that the result is stored in the database, we can notify any - // active subscribers. store.resultsMtx.Lock() for _, res := range store.results[attemptID] { res <- result } + delete(store.results, attemptID) store.resultsMtx.Unlock() - - return nil } // SubscribeResult is used to get the HTLC attempt result for the given attempt @@ -424,3 +437,153 @@ func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error { return nil }, func() {}) } + +// FetchPendingAttempts returns a list of all attempt IDs that are currently in +// the pending state. +// +// NOTE: This function is NOT safe for concurrent access. +func (store *networkResultStore) FetchPendingAttempts() ([]uint64, error) { + var pending []uint64 + err := kvdb.View(store.backend, func(tx kvdb.RTx) error { + bucket := tx.ReadBucket(networkResultStoreBucketKey) + if bucket == nil { + return nil + } + + return bucket.ForEach(func(k, v []byte) error { + // If the key is not 8 bytes, it's not a valid attempt + // ID. + if len(k) != 8 { + log.Warnf("Found invalid key of length %d in "+ + "network result store", len(k)) + + return nil + } + + // Deserialize the result to check its type. + r := bytes.NewReader(v) + result, err := deserializeNetworkResult(r) + if err != nil { + // If we can't deserialize, we'll log it and + // continue. The result will be removed by a + // call to CleanStore. + log.Warnf("Unable to deserialize result for "+ + "key %x: %v", k, err) + + return nil + } + + // If the result is a pending result, add the attempt + // ID to our list. + if result.msg.MsgType() == pendingHtlcMsgType { + attemptID := binary.BigEndian.Uint64(k) + pending = append(pending, attemptID) + } + + return nil + }) + }, func() { + pending = nil + }) + if err != nil { + return nil, err + } + + return pending, nil +} + +// FailPendingAttempt transitions an initialized attempt from a `pending` to a +// `failed` state, recording the provided reason. This ensures that attempts +// which fail before being committed to the forwarding engine are properly +// finalized. This transition unblocks any subscribers that might be waiting on +// a final outcome for an initialized but un-dispatched attempt. +// +// NOTE: This method is specifically for attempts that have been initialized +// via InitAttempt but fail *before* being dispatched to the network. Normal +// failures (e.g., from an HTLC being failed on-chain or by a peer) are +// recorded via the StoreResult method and should not use FailPendingAttempt. +func (store *networkResultStore) FailPendingAttempt(attemptID uint64, + linkErr *LinkError) error { + + // We get a mutex for this attempt ID to ensure consistency between the + // database state and the subscribers in case of concurrent calls. + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) + + // First, create the failure result. + failureResult, err := newInternalFailureResult(linkErr) + if err != nil { + return fmt.Errorf("failed to create failure message for "+ + "attempt %d: %w", attemptID, err) + } + + var b bytes.Buffer + if err := serializeNetworkResult(&b, failureResult); err != nil { + return fmt.Errorf("failed to serialize failure result: %w", err) + } + serializedFailureResult := b.Bytes() + + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) + + err = kvdb.Update(store.backend, func(tx kvdb.RwTx) error { + // Verify that the attempt exists and is in the pending + // state, otherwise we should not fail it. + existingResult, err := fetchResult(tx, attemptID) + if err != nil { + return err + } + + if existingResult.msg.MsgType() != pendingHtlcMsgType { + return fmt.Errorf("attempt %d not in pending state", + attemptID) + } + + // Write the failure result to the store to unblock any + // subscribers awaiting a final result. + bucket, err := tx.CreateTopLevelBucket( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + return bucket.Put(attemptIDBytes[:], serializedFailureResult) + }, func() { + }) + + if err != nil { + return fmt.Errorf("failed to fail pending attempt %d: %w", + attemptID, err) + } + + // Lastly, update any subscribers which may be waiting on the result + // of this attempt. + store.notifySubscribers(attemptID, failureResult) + + return nil +} + +// newInternalFailureResult creates a networkResult representing a terminal, +// internally generated failure. +func newInternalFailureResult(linkErr *LinkError) (*networkResult, error) { + // First, we need to serialize the wire message from our link error + // into a byte slice. This is what the downstream parsers expect. + var reasonBytes bytes.Buffer + wireMsg := linkErr.WireMessage() + if err := lnwire.EncodeFailure(&reasonBytes, wireMsg, 0); err != nil { + return nil, err + } + + // We'll create a synthetic UpdateFailHTLC to represent this internal + // failure, following the pattern used by the contract resolver. + failMsg := &lnwire.UpdateFailHTLC{ + Reason: lnwire.OpaqueReason(reasonBytes.Bytes()), + } + + return &networkResult{ + msg: failMsg, + // This is a local failure. + unencrypted: true, + }, nil +} diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index 4b8a0453fe7..6ea42f0d141 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -302,3 +302,106 @@ func TestNetworkResultStore(t *testing.T) { require.EqualValues(t, 1, success.Load()) }) } + +// TestNetworkResultStoreFailAndFetch tests the FailPendingAttempt and +// FetchPendingAttempts methods of the networkResultStore. +func TestNetworkResultStoreFailAndFetch(t *testing.T) { + t.Parallel() + + db := channeldb.OpenForTesting(t, t.TempDir()) + store := newNetworkResultStore(db) + + // Test FetchPendingAttempts on an empty store. + pending, err := store.FetchPendingAttempts() + require.NoError(t, err, "fetch on empty store failed") + require.Empty(t, pending, "expected no pending attempts on empty store") + + // Initialize some attempts. + require.NoError(t, store.InitAttempt(1)) + require.NoError(t, store.InitAttempt(2)) + require.NoError(t, store.InitAttempt(3)) + + // Test FetchPendingAttempts with active pending attempts. + pending, err = store.FetchPendingAttempts() + require.NoError(t, err, "fetch with pending failed") + require.ElementsMatch(t, []uint64{1, 2, 3}, pending, + "unexpected pending attempts") + + // Test FailPendingAttempt. + failReason := NewLinkError(&lnwire.FailTemporaryNodeFailure{}) + err = store.FailPendingAttempt(2, failReason) + require.NoError(t, err, "FailPendingAttempt failed") + + // Verify that the failed attempt is no longer pending. + pending, err = store.FetchPendingAttempts() + require.NoError(t, err, "fetch after fail failed") + require.ElementsMatch(t, []uint64{1, 3}, pending, + "failed attempt should not be pending") + + // Verify that GetResult now returns the correct failure. + result, err := store.GetResult(2) + require.NoError(t, err, "GetResult for failed attempt failed") + require.NotNil(t, result, "result should not be nil") + + failMsg, ok := result.msg.(*lnwire.UpdateFailHTLC) + require.True(t, ok, "expected an UpdateFailHTLC message") + + // Decode the reason and check that it matches our original failure. + reason, err := lnwire.DecodeFailure( + bytes.NewReader(failMsg.Reason), 0, + ) + require.NoError(t, err, "unable to decode failure reason") + + _, ok = reason.(*lnwire.FailTemporaryNodeFailure) + require.True(t, ok, "expected temporary node failure") + + // Test misuse of FailPendingAttempt. + t.Run("FailPendingAttempt misuse", func(t *testing.T) { + // Fail a non-existent attempt. + err := store.FailPendingAttempt(999, + NewLinkError(&lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing non-existent attempt") + require.Contains(t, err.Error(), "not found", + "expected not found error") + + // Initialize and settle an attempt, then confirm that + // FailPendingAttempt cannot overwrite the successful result. + var id uint64 = 100 + require.NoError(t, store.InitAttempt(id), "init attempt failed") + settleResult := &networkResult{ + msg: &lnwire.UpdateFulfillHTLC{}, + unencrypted: true, + } + require.NoError(t, store.StoreResult(id, settleResult), + "store settle result failed") + + err = store.FailPendingAttempt(id, NewLinkError( + &lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing settled attempt") + + // Initialize and then store a HTLC failure result from the + // network. FailPendingAttempt should not overwrite the real + // failure reason. + var id2 uint64 = 101 + require.NoError(t, store.InitAttempt(id2), + "init attempt 2 failed") + + failResult := &networkResult{ + msg: &lnwire.UpdateFailHTLC{}, + unencrypted: true, + } + require.NoError(t, store.StoreResult(id2, failResult), + "store fail result failed") + + err = store.FailPendingAttempt( + id2, + NewLinkError(&lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing already failed attempt") + }) +} diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 4f9fdf6f522..8a332735e80 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -5554,3 +5554,55 @@ func testSwitchAliasInterceptFail(t *testing.T, zeroConf bool) { require.NoError(t, interceptSwitch.Stop()) } + +// TestSwitchFailOrphanedAttempt demonstrates that a caller to GetAttemptResult +// will hang if an attempt is orphaned in a state where it has been initialized +// but not actually dispatched. We then verify that a this scenario is resolved +// via FailPendingAttempt storing a final failed result, thereby unblocking the +// GetAttemptResult subscriber. +func TestSwitchFailOrphanedAttempt(t *testing.T) { + t.Parallel() + + s, err := initSwitchWithTempDB(t, 0) + require.NoError(t, err) + require.NoError(t, s.Start()) + t.Cleanup(func() { require.NoError(t, s.Stop()) }) + + // Manually initialize an attempt in the store. This simulates the "bad" + // state where an attempt is pending but has no corresponding circuit. + attemptID := uint64(1) + err = s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to initialize attempt") + + // Now, call GetAttemptResult. This function returns a channel that will + // deliver the result. + resChan, err := s.GetAttemptResult( + attemptID, lntypes.Hash{}, nil, + ) + require.NoError(t, err) + + // We expect the call to hang. We'll use a timeout to verify that no + // result is received from the channel. + select { + case <-resChan: + t.Fatalf("received result unexpectedly, should have hung") + case <-time.After(100 * time.Millisecond): + // This is the expected path, the call has "hung" for 100ms. + } + + // Now, simulate the fix by manually calling FailPendingAttempt. + err = s.attemptStore.FailPendingAttempt(attemptID, NewLinkError( + &lnwire.FailTemporaryNodeFailure{}), + ) + require.NoError(t, err, "unable to fail attempt") + + // The channel should now un-block and deliver the final failed + // result. + select { + case result := <-resChan: + // We expect a result with an error, indicating failure. + require.Error(t, result.Error, "expected a failed result") + case <-time.After(1 * time.Second): + t.Fatalf("did not receive result after manual failure") + } +} From ce16a0e273bf086eaf327f210d23145a17589b3e Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Mon, 8 Dec 2025 21:06:08 -0500 Subject: [PATCH 4/6] htlcswitch: add ambiguous init err to attempt store This allows the Switch to communicate to a caller whether the status of an HTLC dispatch for a given attempt ID is ambiguous. This can occur when an attempt to initialize an attempt (via InitAttempt) itself fails. This is the key we use to provide idempotence or protection against duplicate processing so that we can offer "at most once" semantics when processing htlc dispatch requests. If this InitAttempt call itself fails, then we cannot definitively know if the attempt is not in-flight, or if it was successfully processed by a prior request. Callers may need to be updated to explicitly handle this new ambiguous scenario. --- htlcswitch/payment_result.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index 29f06650bde..80870fc021a 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -33,6 +33,15 @@ var ( ErrAttemptResultPending = errors.New( "attempt result not yet available", ) + + // ErrAmbiguousAttemptInit is returned when any internal error (eg: + // with db read or write) occurs during an InitAttempt call that + // prevents a definitive outcome. This indicates that the state of the + // payment attempt's inititialization is unknown. Callers should retry + // the operation to resolve the ambiguity. + ErrAmbiguousAttemptInit = errors.New( + "ambiguous result for payment attempt registration", + ) ) const ( @@ -125,6 +134,11 @@ func newNetworkResultStore(db kvdb.Backend) *networkResultStore { // only one HTLC will be initialized and dispatched for a given attempt ID until // the ID is explicitly cleaned from attempt store. // +// If any unexpected internal error occurs (such as a database read or write +// failure), it will be wrapped in ErrAmbiguousAttemptInit. This signals +// to the caller that the state of the registration is uncertain and that the +// operation MUST be retried to resolve the ambiguity. +// // NOTE: This is part of the AttemptStore interface. Subscribed clients do not // receive notice of this initialization. func (store *networkResultStore) InitAttempt(attemptID uint64) error { @@ -197,7 +211,16 @@ func (store *networkResultStore) InitAttempt(attemptID uint64) error { }) if err != nil { - return err + if errors.Is(err, ErrPaymentIDAlreadyExists) { + return ErrPaymentIDAlreadyExists + } + + // If any unexpected internal error occurs (such as a database + // failure), it will be wrapped in ErrAmbiguousAttemptInit. + // This signals to the caller that the state of the attempt + // initialization is uncertain and that the operation MUST be + // retried to resolve the ambiguity. + return fmt.Errorf("%w: %w", ErrAmbiguousAttemptInit, err) } log.Debugf("Initialized attempt for local payment with attemptID=%v", From b52c6299ce18adf8b94779a5a1484e9216e83981 Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Mon, 1 Dec 2025 14:55:03 -0600 Subject: [PATCH 5/6] htlcswitch: cleanup orphaned payment attempts on startup If the Switch crashes or restarts, an HTLC attempt can be left in an inconsistent state, causing callers awaiting a final result via GetAttemptResult to hang indefinitely. This commit expands the startup cleanup routine to handle two types of these "orphaned" attempts: 1. Initialized attempts that exist in the result store but have no corresponding circuit in the CircuitMap. 2. Attempts with a "half-open" circuit, where a circuit was committed but the HTLC was never handed off to the outgoing link. These orphaned attempts are now proactively failed on startup. This ensures that all payments reach a definitive outcome, preventing hangs and allowing local or remote clients to safely retry. --- htlcswitch/switch.go | 89 ++++++++++++++++++++++++++ htlcswitch/switch_test.go | 129 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 8c3d788a94a..a32ef38fc36 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1792,6 +1792,13 @@ func (s *Switch) Start() error { log.Infof("HTLC Switch starting") + // Before starting the main event loop, we'll check for any orphaned + // HTLC attempts that may have been left behind by a previous crash. + if err := s.cleanupOrphanedAttempts(); err != nil { + return fmt.Errorf("failed to cleanup orphaned attempts: %w", + err) + } + blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err @@ -1817,6 +1824,88 @@ func (s *Switch) Start() error { return nil } +// cleanupOrphanedAttempts is a helper function that is called on startup to +// clean up any orphaned HTLC attempts. An orphaned attempt is one that has +// been initialized in the attempt store but for which no corresponding circuit +// exists in the circuit map. This can happen if the node crashes after +// initializing an attempt but before committing the circuit. +func (s *Switch) cleanupOrphanedAttempts() error { + pending, err := s.attemptStore.FetchPendingAttempts() + if err != nil { + return fmt.Errorf("failed to fetch pending attempts: %w", err) + } + + if len(pending) == 0 { + return nil + } + + log.Infof("Found %d pending HTLC attempts, checking for orphans", + len(pending)) + + for _, attemptID := range pending { + // For each pending attempt, we check if a corresponding circuit + // exists. + inKey := CircuitKey{ + ChanID: hop.Source, + HtlcID: attemptID, + } + circuit := s.circuits.LookupCircuit(inKey) + + // If no circuit exists, this is an orphan from a crash + // between InitAttempt and CommitCircuits. We'll fail it with + // a temporary node failure. + if circuit == nil { + log.Warnf("Found orphaned HTLC attempt with id %d "+ + "(no circuit), failing", attemptID) + + err := s.attemptStore.FailPendingAttempt( + attemptID, + NewLinkError( + &lnwire.FailTemporaryNodeFailure{}, + ), + ) + if err != nil { + log.Errorf("Unable to fail orphaned attempt "+ + "%d: %v", attemptID, err) + } + + continue + } + + // If a circuit *does* exist, we must perform a second check. + // If the circuit is still "half-open" (it has not been + // assigned a keystone by the outgoing link), then it's an + // orphan from a crash between CommitCircuits and the handoff + // to the link. We must also fail this to prevent a hang. + // + // TODO(calvin): cleanup of dangling circuits for locally + // initated payments as described here: + // https://github.com/lightningnetwork/lnd/issues/10423. + if !circuit.HasKeystone() { + log.Warnf("Found orphaned HTLC attempt with id %d "+ + "(half-open circuit), failing", attemptID) + + err := s.attemptStore.FailPendingAttempt( + attemptID, + NewLinkError( + &lnwire.FailTemporaryNodeFailure{}, + ), + ) + if err != nil { + log.Errorf("Unable to fail orphaned attempt "+ + "%d: %v", attemptID, err) + } + + continue + } + + // If the circuit exists and is fully open, it's a legitimate + // in-flight HTLC that will be resumed by the router. + } + + return nil +} + // reforwardResolutions fetches the set of resolution messages stored on-disk // and reforwards them if their circuits are still open. If the circuits have // been deleted, then we will delete the resolution message from the database. diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 8a332735e80..0653d1d9bed 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -1,6 +1,7 @@ package htlcswitch import ( + "bytes" "crypto/rand" "crypto/sha256" "errors" @@ -5606,3 +5607,131 @@ func TestSwitchFailOrphanedAttempt(t *testing.T) { t.Fatalf("did not receive result after manual failure") } } + +// TestSwitchOrphanCleanup tests that the switch's startup procedure will +// correctly identify and clean up any orphaned attempts. This includes both +// simple pending orphans (from a crash after InitAttempt) and "half-open" +// circuit orphans (from a crash after CommitCircuits). +func TestSwitchOrphanCleanup(t *testing.T) { + t.Parallel() + + const attemptID = uint64(1) + + testCases := []struct { + name string + setupOrphan func(t *testing.T, s *Switch, cdb *channeldb.DB) + }{ + { + name: "pre-commit orphan", + setupOrphan: func(t *testing.T, s *Switch, + _ *channeldb.DB) { + + // Manually initialize an attempt to simulate + // the state of the database if the node crashed + // after InitAttempt but before CommitCircuits. + err := s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to init") + }, + }, + { + name: "half-open circuit orphan", + setupOrphan: func(t *testing.T, s *Switch, + _ *channeldb.DB) { + + // Manually initialize an attempt and commit a + // circuit to simulate the state of the database + // if the node crashed after CommitCircuits but + // before the packet was handed off to the link. + err := s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to init") + + htlc := &lnwire.UpdateAddHTLC{ + PaymentHash: lntypes.Hash{0x01}, + } + + // We'll create a dummy outgoing channel ID to + // attach to the circuit. The existence of an + // outgoingChanID is what makes this a + // "half-open" orphan, as the switch knows it + // was intended to be forwarded somewhere. + const outgoingChanID = 123 + packet := &htlcPacket{ + incomingChanID: hop.Source, + incomingHTLCID: attemptID, + outgoingChanID: lnwire. + NewShortChanIDFromInt( + outgoingChanID, + ), + htlc: htlc, + amount: htlc.Amount, + } + + circuit := newPaymentCircuit( + &htlc.PaymentHash, packet, + ) + _, err = s.circuits.CommitCircuits(circuit) + require.NoError(t, err, "commit failed") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create a temporary database path that will persist + // across restarts. + tempPath := t.TempDir() + + // First, we'll create a database and a switch + // instance. + cdb1 := channeldb.OpenForTesting(t, tempPath) + s1, err := initSwitchWithDB(0, cdb1) + require.NoError(t, err) + require.NoError(t, s1.Start()) + + // Call the specific setup function for this test case + // to create the desired orphan state. + tc.setupOrphan(t, s1, cdb1) + + // We must stop the switch and close its database to + // ensure the state is flushed to disk at tempPath. + require.NoError(t, s1.Stop()) + require.NoError(t, cdb1.Close()) + + // Now, we'll create a new database instance from the + // same path and a new switch. This simulates a node + // restart. + cdb2 := channeldb.OpenForTesting(t, tempPath) + t.Cleanup(func() { cdb2.Close() }) + + s2, err := initSwitchWithDB(0, cdb2) + require.NoError(t, err) + require.NoError(t, s2.Start()) + t.Cleanup(func() { require.NoError(t, s2.Stop()) }) + + // After startup, we query the store for our orphaned + // attempt ID. We expect to find a final FAILED + // result, as the janitor should have cleaned it up. + result, err := s2.attemptStore.GetResult(attemptID) + require.NoError(t, err, "expected final result") + require.NotNil(t, result, "result should not be nil") + + // The result should be a failure message. + failMsg, ok := result.msg.(*lnwire.UpdateFailHTLC) + require.True(t, ok, "expected fail message") + + // The cleanup routine uses a generic failure reason. + require.True(t, result.unencrypted, "unencrypted") + reason, err := lnwire.DecodeFailure( + bytes.NewReader(failMsg.Reason), 0, + ) + require.NoError(t, err, "unable to decode failure") + + // We expect a FailTemporaryNodeFailure. + _, ok = reason.(*lnwire.FailTemporaryNodeFailure) + require.True(t, ok, "expected temp node failure") + }) + } +} From 03fa01c8ef15cc58e9942767e3cb01b4c0410f39 Mon Sep 17 00:00:00 2001 From: Calvin Zachman Date: Tue, 28 Oct 2025 20:21:04 -0400 Subject: [PATCH 6/6] docs: update v0.21 release notes --- docs/release-notes/release-notes-0.21.0.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index 30b7263de56..86c934b9135 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -48,6 +48,17 @@ ## Functional Enhancements +* Introduced a new `AttemptStore` interface within `htlcswitch`, and expanded + its `kvdb` implementation, `networkResultStore`. A [new `InitAttempt` method](https://github.com/lightningnetwork/lnd/pull/10049), + which serves as a "durable write of intent" or "write-ahead log" to checkpoint + an attempt in a new `PENDING` state prior to dispatch, now provides the + foundational durable storage required for external tracking of the HTLC + attempt lifecycle. This is a preparatory step that enables a future + idempotent `switchrpc.SendOnion` RPC, which will offer "at most once" + processing of htlc dispatch requests for remote clients. Care was taken to + avoid modifications to the existing flows for dispatching local payments, + preserving the existing battle-tested logic. + ## RPC Additions * [Added support for coordinator-based MuSig2 signing