diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index 30b7263de56..86c934b9135 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -48,6 +48,17 @@ ## Functional Enhancements +* Introduced a new `AttemptStore` interface within `htlcswitch`, and expanded + its `kvdb` implementation, `networkResultStore`. A [new `InitAttempt` method](https://github.com/lightningnetwork/lnd/pull/10049), + which serves as a "durable write of intent" or "write-ahead log" to checkpoint + an attempt in a new `PENDING` state prior to dispatch, now provides the + foundational durable storage required for external tracking of the HTLC + attempt lifecycle. This is a preparatory step that enables a future + idempotent `switchrpc.SendOnion` RPC, which will offer "at most once" + processing of htlc dispatch requests for remote clients. Care was taken to + avoid modifications to the existing flows for dispatching local payments, + preserving the existing battle-tested logic. + ## RPC Additions * [Added support for coordinator-based MuSig2 signing diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 4739afff6ec..88ae4407825 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -530,3 +530,71 @@ type AuxTrafficShaper interface { // meaning that it's from a custom channel. IsCustomHTLC(htlcRecords lnwire.CustomRecords) bool } + +// AttemptStore defines the interface for initializing, managing, and tracking +// the lifecycle of HTLC payment attempts. It supports both local and remote +// payment lifecycle controllers. +// +// NOTE: This store is designed to be managed by a *single* logical dispatcher +// (e.g., a ChannelRouter instance, or a remote payment orchestrator) at a time +// to avoid collisions of attemptIDs and ensure an unambiguous source of truth +// for payment state. Concurrent usage by multiple dispatchers is NOT supported. +type AttemptStore interface { + // InitAttempt persists the intent to dispatch a payment attempt, + // creating a durable record that serves as an idempotency key. This + // method should be called before the HTLC is dispatched to the network. + // + // NOTE: This method provides a guarantee that only one HTLC can be + // initialized for a given attempt ID until the ID is explicitly cleaned + // from the store. + InitAttempt(attemptID uint64) error + + // StoreResult stores the result of a given payment attempt (identified + // by attemptID). This will be called when a result is received from the + // network. + StoreResult(attemptID uint64, result *networkResult) error + + // FailPendingAttempt transitions an initialized attempt from an + // initialized to a failed state and records the provided reason. This + // is the synchronous rollback mechanism for attempts that fail before + // being committed to the forwarding engine. It returns an error if the + // underlying storage fails. + FailPendingAttempt(attemptID uint64, reason *LinkError) error + + // FetchPendingAttempts returns a list of all attempt IDs that are + // currently in the pending state. + // + // NOTE: This function is primarily used by the Switch's startup logic + // to identify and clean up internally orphaned payment attempts. These + // occur when an attempt is initialized via InitAttempt but a crash + // prevents its full dispatch to the network (e.g., between InitAttempt + // and CommitCircuits). By fetching these pending attempts, the Switch + // can transition their state to FAILED, preventing external callers + // from indefinitely hanging on GetAttemptResult for an un-dispatched + // attempt. + FetchPendingAttempts() ([]uint64, error) + + // GetResult returns the network result for the specified attempt ID if + // it's available. + // + // NOTE: This method should return ErrAttemptResultPending for attempts + // that have been initialized via InitAttempt but for which a final + // result (settle/fail) has not yet been stored. ErrPaymentIDNotFound is + // returned for attempts that are unknown. + GetResult(attemptID uint64) (*networkResult, error) + + // SubscribeResult subscribes to be notified when a result for a + // specific attempt ID becomes available. It returns a channel that will + // receive the result. + // + // NOTE: For backwards compatibility, the returned channel should only + // receive a value for attempts that have a final result (settle/fail). + // Subscribers should *not* be notified of the initial pending state + // created by InitAttempt. + SubscribeResult(attemptID uint64) (<-chan *networkResult, error) + + // CleanStore removes all attempt results from the store except for + // those listed in the keepPids map. This allows for a "delete all + // except" approach to cleanup. + CleanStore(keepPids map[uint64]struct{}) error +} diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index db959e2d1a2..80870fc021a 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" @@ -26,6 +27,27 @@ var ( // ErrPaymentIDAlreadyExists is returned if we try to write a pending // payment whose paymentID already exists. ErrPaymentIDAlreadyExists = errors.New("paymentID already exists") + + // ErrAttemptResultPending is returned if we try to get a result + // for a pending payment whose result is not yet available. + ErrAttemptResultPending = errors.New( + "attempt result not yet available", + ) + + // ErrAmbiguousAttemptInit is returned when any internal error (eg: + // with db read or write) occurs during an InitAttempt call that + // prevents a definitive outcome. This indicates that the state of the + // payment attempt's inititialization is unknown. Callers should retry + // the operation to resolve the ambiguity. + ErrAmbiguousAttemptInit = errors.New( + "ambiguous result for payment attempt registration", + ) +) + +const ( + // pendingHtlcMsgType is a custom message type used to represent a + // pending HTLC in the network result store. + pendingHtlcMsgType lnwire.MessageType = 32768 ) // PaymentResult wraps a decoded result received from the network after a @@ -90,9 +112,10 @@ type networkResultStore struct { results map[uint64][]chan *networkResult resultsMtx sync.Mutex - // attemptIDMtx is a multimutex used to make sure the database and - // result subscribers map is consistent for each attempt ID in case of - // concurrent callers. + // attemptIDMtx is a multimutex used to serialize operations for a + // given attempt ID. It ensures InitAttempt's idempotency by protecting + // its read-then-write sequence from concurrent calls, and it maintains + // consistency between the database state and result subscribers. attemptIDMtx *multimutex.Mutex[uint64] } @@ -104,9 +127,111 @@ func newNetworkResultStore(db kvdb.Backend) *networkResultStore { } } -// storeResult stores the networkResult for the given attemptID, and notifies +// InitAttempt initializes the payment attempt with the given attemptID. +// +// If any record (even a pending result placeholder) already exists in the +// store, this method returns ErrPaymentIDAlreadyExists. This guarantees that +// only one HTLC will be initialized and dispatched for a given attempt ID until +// the ID is explicitly cleaned from attempt store. +// +// If any unexpected internal error occurs (such as a database read or write +// failure), it will be wrapped in ErrAmbiguousAttemptInit. This signals +// to the caller that the state of the registration is uncertain and that the +// operation MUST be retried to resolve the ambiguity. +// +// NOTE: This is part of the AttemptStore interface. Subscribed clients do not +// receive notice of this initialization. +func (store *networkResultStore) InitAttempt(attemptID uint64) error { + // We get a mutex for this attempt ID to serialize init, store, and + // subscribe operations. This is needed to ensure consistency between + // the database state and the subscribers in case of concurrent calls. + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) + + err := kvdb.Update(store.backend, func(tx kvdb.RwTx) error { + // Check if any attempt by this ID is already initialized or + // whether a result for the attempt exists in the store. + existingResult, err := fetchResult(tx, attemptID) + if err != nil && !errors.Is(err, ErrPaymentIDNotFound) { + return err + } + + // If the result is already in-progress, return an error + // indicating that the attempt already exists. + if existingResult != nil { + log.Warnf("Already initialized attempt for ID=%v", + attemptID) + + return ErrPaymentIDAlreadyExists + } + + // Create an empty networkResult to serve as place holder until + // a result from the network is received. + // + // TODO(calvin): When migrating to native SQL storage, replace + // this custom message placeholder with a proper status enum or + // struct to represent the pending state of an attempt. + pendingMsg, err := lnwire.NewCustom(pendingHtlcMsgType, nil) + if err != nil { + // This should not happen with a static message type, + // but if it does, it's an internal error that prevents + // a definitive outcome, so we must treat it as + // ambiguous. + return err + } + inProgressResult := &networkResult{ + msg: pendingMsg, + unencrypted: true, + isResolution: false, + } + + var b bytes.Buffer + err = serializeNetworkResult(&b, inProgressResult) + if err != nil { + return err + } + + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) + + // Mark an attempt with this ID as having been seen by storing + // the pending placeholder. No network result is available yet, + // so we do not notify subscribers. + bucket, err := tx.CreateTopLevelBucket( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + return bucket.Put(attemptIDBytes[:], b.Bytes()) + }, func() { + // No need to reset existingResult here as it's scoped to the + // transaction. + }) + + if err != nil { + if errors.Is(err, ErrPaymentIDAlreadyExists) { + return ErrPaymentIDAlreadyExists + } + + // If any unexpected internal error occurs (such as a database + // failure), it will be wrapped in ErrAmbiguousAttemptInit. + // This signals to the caller that the state of the attempt + // initialization is uncertain and that the operation MUST be + // retried to resolve the ambiguity. + return fmt.Errorf("%w: %w", ErrAmbiguousAttemptInit, err) + } + + log.Debugf("Initialized attempt for local payment with attemptID=%v", + attemptID) + + return nil +} + +// StoreResult stores the networkResult for the given attemptID, and notifies // any subscribers. -func (store *networkResultStore) storeResult(attemptID uint64, +func (store *networkResultStore) StoreResult(attemptID uint64, result *networkResult) error { // We get a mutex for this attempt ID. This is needed to ensure @@ -117,7 +242,19 @@ func (store *networkResultStore) storeResult(attemptID uint64, log.Debugf("Storing result for attemptID=%v", attemptID) - // Serialize the payment result. + if err := store.storeResult(attemptID, result); err != nil { + return err + } + + store.notifySubscribers(attemptID, result) + + return nil +} + +// storeResult persists the given result to the database. +func (store *networkResultStore) storeResult(attemptID uint64, + result *networkResult) error { + var b bytes.Buffer if err := serializeNetworkResult(&b, result); err != nil { return err @@ -126,7 +263,7 @@ func (store *networkResultStore) storeResult(attemptID uint64, var attemptIDBytes [8]byte binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) - err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { + return kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( networkResultStoreBucketKey, ) @@ -136,25 +273,25 @@ func (store *networkResultStore) storeResult(attemptID uint64, return networkResults.Put(attemptIDBytes[:], b.Bytes()) }) - if err != nil { - return err - } +} + +// notifySubscribers notifies any subscribers of the final result for the given +// attempt ID. +func (store *networkResultStore) notifySubscribers(attemptID uint64, + result *networkResult) { - // Now that the result is stored in the database, we can notify any - // active subscribers. store.resultsMtx.Lock() for _, res := range store.results[attemptID] { res <- result } + delete(store.results, attemptID) store.resultsMtx.Unlock() - - return nil } -// subscribeResult is used to get the HTLC attempt result for the given attempt +// SubscribeResult is used to get the HTLC attempt result for the given attempt // ID. It returns a channel on which the result will be delivered when ready. -func (store *networkResultStore) subscribeResult(attemptID uint64) ( +func (store *networkResultStore) SubscribeResult(attemptID uint64) ( <-chan *networkResult, error) { // We get a mutex for this payment ID. This is needed to ensure @@ -194,11 +331,21 @@ func (store *networkResultStore) subscribeResult(attemptID uint64) ( return nil, err } - // If the result was found, we can send it on the result channel - // imemdiately. + // If a result is back from the network, we can send it on the result + // channel immediately. If the result is still our initialized place + // holder, then treat it as not yet available. if result != nil { - resultChan <- result - return resultChan, nil + if result.msg.MsgType() != pendingHtlcMsgType { + log.Debugf("Obtained full result for attemptID=%v", + attemptID) + + resultChan <- result + + return resultChan, nil + } + + log.Debugf("Awaiting result (settle/fail) for attemptID=%v", + attemptID) } // Otherwise we store the result channel for when the result is @@ -212,16 +359,32 @@ func (store *networkResultStore) subscribeResult(attemptID uint64) ( return resultChan, nil } -// getResult attempts to immediately fetch the result for the given pid from -// the store. If no result is available, ErrPaymentIDNotFound is returned. -func (store *networkResultStore) getResult(pid uint64) ( +// GetResult attempts to immediately fetch the *final* network result for the +// given attempt ID from the store. +// +// NOTE: This method will return ErrAttemptResultPending for attempts +// that have been initialized via InitAttempt but for which a final result +// (settle/fail) has not yet been stored. ErrPaymentIDNotFound is returned +// for attempts that are unknown. +func (store *networkResultStore) GetResult(pid uint64) ( *networkResult, error) { var result *networkResult err := kvdb.View(store.backend, func(tx kvdb.RTx) error { var err error result, err = fetchResult(tx, pid) - return err + if err != nil { + return err + } + + // If the attempt is still in-flight, treat it as not yet + // available to preserve existing expectation for the behavior + // of this method. + if result.msg.MsgType() == pendingHtlcMsgType { + return ErrAttemptResultPending + } + + return nil }, func() { result = nil }) @@ -253,12 +416,12 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { return deserializeNetworkResult(r) } -// cleanStore removes all entries from the store, except the payment IDs given. +// CleanStore removes all entries from the store, except the payment IDs given. // NOTE: Since every result not listed in the keep map will be deleted, care // should be taken to ensure no new payment attempts are being made // concurrently while this process is ongoing, as its result might end up being // deleted. -func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { +func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error { return kvdb.Update(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( networkResultStoreBucketKey, @@ -297,3 +460,153 @@ func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { return nil }, func() {}) } + +// FetchPendingAttempts returns a list of all attempt IDs that are currently in +// the pending state. +// +// NOTE: This function is NOT safe for concurrent access. +func (store *networkResultStore) FetchPendingAttempts() ([]uint64, error) { + var pending []uint64 + err := kvdb.View(store.backend, func(tx kvdb.RTx) error { + bucket := tx.ReadBucket(networkResultStoreBucketKey) + if bucket == nil { + return nil + } + + return bucket.ForEach(func(k, v []byte) error { + // If the key is not 8 bytes, it's not a valid attempt + // ID. + if len(k) != 8 { + log.Warnf("Found invalid key of length %d in "+ + "network result store", len(k)) + + return nil + } + + // Deserialize the result to check its type. + r := bytes.NewReader(v) + result, err := deserializeNetworkResult(r) + if err != nil { + // If we can't deserialize, we'll log it and + // continue. The result will be removed by a + // call to CleanStore. + log.Warnf("Unable to deserialize result for "+ + "key %x: %v", k, err) + + return nil + } + + // If the result is a pending result, add the attempt + // ID to our list. + if result.msg.MsgType() == pendingHtlcMsgType { + attemptID := binary.BigEndian.Uint64(k) + pending = append(pending, attemptID) + } + + return nil + }) + }, func() { + pending = nil + }) + if err != nil { + return nil, err + } + + return pending, nil +} + +// FailPendingAttempt transitions an initialized attempt from a `pending` to a +// `failed` state, recording the provided reason. This ensures that attempts +// which fail before being committed to the forwarding engine are properly +// finalized. This transition unblocks any subscribers that might be waiting on +// a final outcome for an initialized but un-dispatched attempt. +// +// NOTE: This method is specifically for attempts that have been initialized +// via InitAttempt but fail *before* being dispatched to the network. Normal +// failures (e.g., from an HTLC being failed on-chain or by a peer) are +// recorded via the StoreResult method and should not use FailPendingAttempt. +func (store *networkResultStore) FailPendingAttempt(attemptID uint64, + linkErr *LinkError) error { + + // We get a mutex for this attempt ID to ensure consistency between the + // database state and the subscribers in case of concurrent calls. + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) + + // First, create the failure result. + failureResult, err := newInternalFailureResult(linkErr) + if err != nil { + return fmt.Errorf("failed to create failure message for "+ + "attempt %d: %w", attemptID, err) + } + + var b bytes.Buffer + if err := serializeNetworkResult(&b, failureResult); err != nil { + return fmt.Errorf("failed to serialize failure result: %w", err) + } + serializedFailureResult := b.Bytes() + + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) + + err = kvdb.Update(store.backend, func(tx kvdb.RwTx) error { + // Verify that the attempt exists and is in the pending + // state, otherwise we should not fail it. + existingResult, err := fetchResult(tx, attemptID) + if err != nil { + return err + } + + if existingResult.msg.MsgType() != pendingHtlcMsgType { + return fmt.Errorf("attempt %d not in pending state", + attemptID) + } + + // Write the failure result to the store to unblock any + // subscribers awaiting a final result. + bucket, err := tx.CreateTopLevelBucket( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + return bucket.Put(attemptIDBytes[:], serializedFailureResult) + }, func() { + }) + + if err != nil { + return fmt.Errorf("failed to fail pending attempt %d: %w", + attemptID, err) + } + + // Lastly, update any subscribers which may be waiting on the result + // of this attempt. + store.notifySubscribers(attemptID, failureResult) + + return nil +} + +// newInternalFailureResult creates a networkResult representing a terminal, +// internally generated failure. +func newInternalFailureResult(linkErr *LinkError) (*networkResult, error) { + // First, we need to serialize the wire message from our link error + // into a byte slice. This is what the downstream parsers expect. + var reasonBytes bytes.Buffer + wireMsg := linkErr.WireMessage() + if err := lnwire.EncodeFailure(&reasonBytes, wireMsg, 0); err != nil { + return nil, err + } + + // We'll create a synthetic UpdateFailHTLC to represent this internal + // failure, following the pattern used by the contract resolver. + failMsg := &lnwire.UpdateFailHTLC{ + Reason: lnwire.OpaqueReason(reasonBytes.Bytes()), + } + + return &networkResult{ + msg: failMsg, + // This is a local failure. + unencrypted: true, + }, nil +} diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index f6def146528..6ea42f0d141 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -4,6 +4,8 @@ import ( "bytes" "math/rand" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -118,7 +120,7 @@ func TestNetworkResultStore(t *testing.T) { // Subscribe to 2 of them. var subs []<-chan *networkResult for i := uint64(0); i < 2; i++ { - sub, err := store.subscribeResult(i) + sub, err := store.SubscribeResult(i) if err != nil { t.Fatalf("unable to subscribe: %v", err) } @@ -127,7 +129,7 @@ func TestNetworkResultStore(t *testing.T) { // Store three of them. for i := uint64(0); i < 3; i++ { - err := store.storeResult(i, results[i]) + err := store.StoreResult(i, results[i]) if err != nil { t.Fatalf("unable to store result: %v", err) } @@ -144,7 +146,7 @@ func TestNetworkResultStore(t *testing.T) { // Let the third one subscribe now. THe result should be received // immediately. - sub, err := store.subscribeResult(2) + sub, err := store.SubscribeResult(2) require.NoError(t, err, "unable to subscribe") select { case <-sub: @@ -154,22 +156,22 @@ func TestNetworkResultStore(t *testing.T) { // Try fetching the result directly for the non-stored one. This should // fail. - _, err = store.getResult(3) + _, err = store.GetResult(3) if err != ErrPaymentIDNotFound { t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) } // Add the result and try again. - err = store.storeResult(3, results[3]) + err = store.StoreResult(3, results[3]) require.NoError(t, err, "unable to store result") - _, err = store.getResult(3) + _, err = store.GetResult(3) require.NoError(t, err, "unable to get result") // Since we don't delete results from the store (yet), make sure we // will get subscriptions for all of them. for i := uint64(0); i < numResults; i++ { - sub, err := store.subscribeResult(i) + sub, err := store.SubscribeResult(i) if err != nil { t.Fatalf("unable to subscribe: %v", err) } @@ -187,12 +189,12 @@ func TestNetworkResultStore(t *testing.T) { 1: {}, } // Finally, delete the result. - err = store.cleanStore(toKeep) + err = store.CleanStore(toKeep) require.NoError(t, err) // Payment IDs 0 and 1 should be found, 2 and 3 should be deleted. for i := uint64(0); i < numResults; i++ { - _, err = store.getResult(i) + _, err = store.GetResult(i) if i <= 1 { require.NoError(t, err, "unable to get result") } @@ -200,4 +202,206 @@ func TestNetworkResultStore(t *testing.T) { t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) } } + + t.Run("InitAttempt duplicate prevention", func(t *testing.T) { + var id uint64 = 100 + + // Fetch the result directly. We expect to observe + // ErrPaymentIDNotFound since we've not yet initialized this + // payment attempt. + _, err = store.GetResult(id) + require.ErrorIs(t, err, ErrPaymentIDNotFound, + "expected not found error for uninitialized attempt") + + // First initialization should succeed. + err := store.InitAttempt(id) + require.NoError(t, err, "unexpected InitAttempt failure") + + // Now, try fetching the result directly. We expect to observe + // ErrAttemptResultPending since it's initialized but not + // yet finalized. + _, err = store.GetResult(id) + require.ErrorIs(t, err, ErrAttemptResultPending, + "expected result unavailable error for pending attempt") + + // Subscribe for the result following the initialization. No + // result should be received immediately as StoreResult has not + // yet updated the initialized attempt into a finalized attempt. + sub, err := store.SubscribeResult(id) + require.NoError(t, err, "unable to subscribe") + select { + case <-sub: + t.Fatalf("unexpected non-final result notification " + + "received") + case <-time.After(1 * time.Second): + } + + // Second initialization should fail (already initialized). + // Try initializing an attempt with the same ID before a full + // settle or fail result is back from the network (simulated by + // StoreResult). + err = store.InitAttempt(id) + require.ErrorIs(t, err, ErrPaymentIDAlreadyExists, + "expected duplicate InitAttempt to fail") + + // Store a result to simulate a full settle or fail HTLC result + // coming back from the network. + netResult := &networkResult{ + msg: &lnwire.UpdateFulfillHTLC{}, + unencrypted: true, + isResolution: true, + } + err = store.StoreResult(id, netResult) + require.NoError(t, err, "unable to store result after init") + + // Try InitAttempt again — still should fail even after a full + // result is back from the network. + err = store.InitAttempt(id) + require.ErrorIs(t, err, ErrPaymentIDAlreadyExists, + "expected InitAttempt to fail after result stored") + + // Now we confirm that the subscriber is notified of the final + // attempt result. + select { + case <-sub: + case <-time.After(1 * time.Second): + t.Fatalf("failed to receive final (settle/fail) result") + } + + // Verify that ID can be re-used only after explicit deletion of + // attempts via CleanStore or a DeleteAttempts style method. + err = store.CleanStore(map[uint64]struct{}{}) + require.NoError(t, err) + + // Now InitAttempt should succeed again. + err = store.InitAttempt(id) + require.NoError(t, err, "InitAttempt should succeed after "+ + "cleanup") + }) + + t.Run("Concurrent InitAttempt", func(t *testing.T) { + var ( + id uint64 = 999 + wg sync.WaitGroup + success atomic.Int32 + ) + + // Launch 10 concurrent routines trying to init the same ID. + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + if err := store.InitAttempt(id); err == nil { + success.Add(1) + } + }() + } + wg.Wait() + + // Only exactly one call should succeed. + require.EqualValues(t, 1, success.Load()) + }) +} + +// TestNetworkResultStoreFailAndFetch tests the FailPendingAttempt and +// FetchPendingAttempts methods of the networkResultStore. +func TestNetworkResultStoreFailAndFetch(t *testing.T) { + t.Parallel() + + db := channeldb.OpenForTesting(t, t.TempDir()) + store := newNetworkResultStore(db) + + // Test FetchPendingAttempts on an empty store. + pending, err := store.FetchPendingAttempts() + require.NoError(t, err, "fetch on empty store failed") + require.Empty(t, pending, "expected no pending attempts on empty store") + + // Initialize some attempts. + require.NoError(t, store.InitAttempt(1)) + require.NoError(t, store.InitAttempt(2)) + require.NoError(t, store.InitAttempt(3)) + + // Test FetchPendingAttempts with active pending attempts. + pending, err = store.FetchPendingAttempts() + require.NoError(t, err, "fetch with pending failed") + require.ElementsMatch(t, []uint64{1, 2, 3}, pending, + "unexpected pending attempts") + + // Test FailPendingAttempt. + failReason := NewLinkError(&lnwire.FailTemporaryNodeFailure{}) + err = store.FailPendingAttempt(2, failReason) + require.NoError(t, err, "FailPendingAttempt failed") + + // Verify that the failed attempt is no longer pending. + pending, err = store.FetchPendingAttempts() + require.NoError(t, err, "fetch after fail failed") + require.ElementsMatch(t, []uint64{1, 3}, pending, + "failed attempt should not be pending") + + // Verify that GetResult now returns the correct failure. + result, err := store.GetResult(2) + require.NoError(t, err, "GetResult for failed attempt failed") + require.NotNil(t, result, "result should not be nil") + + failMsg, ok := result.msg.(*lnwire.UpdateFailHTLC) + require.True(t, ok, "expected an UpdateFailHTLC message") + + // Decode the reason and check that it matches our original failure. + reason, err := lnwire.DecodeFailure( + bytes.NewReader(failMsg.Reason), 0, + ) + require.NoError(t, err, "unable to decode failure reason") + + _, ok = reason.(*lnwire.FailTemporaryNodeFailure) + require.True(t, ok, "expected temporary node failure") + + // Test misuse of FailPendingAttempt. + t.Run("FailPendingAttempt misuse", func(t *testing.T) { + // Fail a non-existent attempt. + err := store.FailPendingAttempt(999, + NewLinkError(&lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing non-existent attempt") + require.Contains(t, err.Error(), "not found", + "expected not found error") + + // Initialize and settle an attempt, then confirm that + // FailPendingAttempt cannot overwrite the successful result. + var id uint64 = 100 + require.NoError(t, store.InitAttempt(id), "init attempt failed") + settleResult := &networkResult{ + msg: &lnwire.UpdateFulfillHTLC{}, + unencrypted: true, + } + require.NoError(t, store.StoreResult(id, settleResult), + "store settle result failed") + + err = store.FailPendingAttempt(id, NewLinkError( + &lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing settled attempt") + + // Initialize and then store a HTLC failure result from the + // network. FailPendingAttempt should not overwrite the real + // failure reason. + var id2 uint64 = 101 + require.NoError(t, store.InitAttempt(id2), + "init attempt 2 failed") + + failResult := &networkResult{ + msg: &lnwire.UpdateFailHTLC{}, + unencrypted: true, + } + require.NoError(t, store.StoreResult(id2, failResult), + "store fail result failed") + + err = store.FailPendingAttempt( + id2, + NewLinkError(&lnwire.FailTemporaryNodeFailure{}), + ) + require.Error(t, err, + "expected error when failing already failed attempt") + }) } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a3aae809b93..a32ef38fc36 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -255,12 +255,12 @@ type Switch struct { // service was initialized with. cfg *Config - // networkResults stores the results of payments initiated by the user. + // attemptStore stores the results of payments initiated by the user. // The store is used to later look up the payments and notify the // user of the result when they are complete. Each payment attempt // should be given a unique integer ID when it is created, otherwise // results might be overwritten. - networkResults *networkResultStore + attemptStore AttemptStore // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. @@ -380,7 +380,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), linkStopIndex: make(map[lnwire.ChannelID]chan struct{}), - networkResults: newNetworkResultStore(cfg.DB), + attemptStore: newNetworkResultStore(cfg.DB), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), @@ -438,16 +438,21 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro // HasAttemptResult reads the network result store to fetch the specified // attempt. Returns true if the attempt result exists. func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { - _, err := s.networkResults.getResult(attemptID) + _, err := s.attemptStore.GetResult(attemptID) if err == nil { return true, nil } - if !errors.Is(err, ErrPaymentIDNotFound) { - return false, err + // If we have not heard of this attempt ID, or have dispatched the + // attempt but have not yet received the final (settle/fail) result, + // then we return a nil error. + if errors.Is(err, ErrPaymentIDNotFound) || + errors.Is(err, ErrAttemptResultPending) { + + return false, nil } - return false, nil + return false, err } // GetAttemptResult returns the result of the HTLC attempt with the given @@ -475,17 +480,33 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, // is already available. // Assumption: no one will add this attempt ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { - res, err := s.networkResults.getResult(attemptID) - if err != nil { + res, err := s.attemptStore.GetResult(attemptID) + switch { + // We have a final result, we can send it immediately. + case err == nil: + c := make(chan *networkResult, 1) + c <- res + nChan = c + + // The attempt is known, but the result is not yet available, + // so we fall through to subscribe. + case errors.Is(err, ErrAttemptResultPending): + log.Tracef("Attempt %d known, but result not yet "+ + "available. Subscribing for result.", attemptID) + + // If the error is anything else, we return it to the caller. + default: return nil, err } - c := make(chan *networkResult, 1) - c <- res - nChan = c - } else { + } + + // If nChan is still nil, it means we need to subscribe. This happens + // if the circuit was found, or if GetResult told us the result is not + // yet available. + if nChan == nil { // The HTLC was committed to the circuits, subscribe for a // result. - nChan, err = s.networkResults.subscribeResult(attemptID) + nChan, err = s.attemptStore.SubscribeResult(attemptID) if err != nil { return nil, err } @@ -537,7 +558,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, // preiodically to let the switch clean up payment results that we have // handled. func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { - return s.networkResults.cleanStore(keepPids) + return s.attemptStore.CleanStore(keepPids) } // SendHTLC is used by other subsystems which aren't belong to htlc switch @@ -964,7 +985,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // Store the result to the db. This will also notify subscribers about // the result. - if err := s.networkResults.storeResult(attemptID, n); err != nil { + if err := s.attemptStore.StoreResult(attemptID, n); err != nil { log.Errorf("Unable to store attempt result for pid=%v: %v", attemptID, err) return @@ -1771,6 +1792,13 @@ func (s *Switch) Start() error { log.Infof("HTLC Switch starting") + // Before starting the main event loop, we'll check for any orphaned + // HTLC attempts that may have been left behind by a previous crash. + if err := s.cleanupOrphanedAttempts(); err != nil { + return fmt.Errorf("failed to cleanup orphaned attempts: %w", + err) + } + blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err @@ -1796,6 +1824,88 @@ func (s *Switch) Start() error { return nil } +// cleanupOrphanedAttempts is a helper function that is called on startup to +// clean up any orphaned HTLC attempts. An orphaned attempt is one that has +// been initialized in the attempt store but for which no corresponding circuit +// exists in the circuit map. This can happen if the node crashes after +// initializing an attempt but before committing the circuit. +func (s *Switch) cleanupOrphanedAttempts() error { + pending, err := s.attemptStore.FetchPendingAttempts() + if err != nil { + return fmt.Errorf("failed to fetch pending attempts: %w", err) + } + + if len(pending) == 0 { + return nil + } + + log.Infof("Found %d pending HTLC attempts, checking for orphans", + len(pending)) + + for _, attemptID := range pending { + // For each pending attempt, we check if a corresponding circuit + // exists. + inKey := CircuitKey{ + ChanID: hop.Source, + HtlcID: attemptID, + } + circuit := s.circuits.LookupCircuit(inKey) + + // If no circuit exists, this is an orphan from a crash + // between InitAttempt and CommitCircuits. We'll fail it with + // a temporary node failure. + if circuit == nil { + log.Warnf("Found orphaned HTLC attempt with id %d "+ + "(no circuit), failing", attemptID) + + err := s.attemptStore.FailPendingAttempt( + attemptID, + NewLinkError( + &lnwire.FailTemporaryNodeFailure{}, + ), + ) + if err != nil { + log.Errorf("Unable to fail orphaned attempt "+ + "%d: %v", attemptID, err) + } + + continue + } + + // If a circuit *does* exist, we must perform a second check. + // If the circuit is still "half-open" (it has not been + // assigned a keystone by the outgoing link), then it's an + // orphan from a crash between CommitCircuits and the handoff + // to the link. We must also fail this to prevent a hang. + // + // TODO(calvin): cleanup of dangling circuits for locally + // initated payments as described here: + // https://github.com/lightningnetwork/lnd/issues/10423. + if !circuit.HasKeystone() { + log.Warnf("Found orphaned HTLC attempt with id %d "+ + "(half-open circuit), failing", attemptID) + + err := s.attemptStore.FailPendingAttempt( + attemptID, + NewLinkError( + &lnwire.FailTemporaryNodeFailure{}, + ), + ) + if err != nil { + log.Errorf("Unable to fail orphaned attempt "+ + "%d: %v", attemptID, err) + } + + continue + } + + // If the circuit exists and is fully open, it's a legitimate + // in-flight HTLC that will be resumed by the router. + } + + return nil +} + // reforwardResolutions fetches the set of resolution messages stored on-disk // and reforwards them if their circuits are still open. If the circuits have // been deleted, then we will delete the resolution message from the database. diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index e8176aaeb59..0653d1d9bed 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -1,6 +1,7 @@ package htlcswitch import ( + "bytes" "crypto/rand" "crypto/sha256" "errors" @@ -3106,7 +3107,7 @@ func TestSwitchGetAttemptResult(t *testing.T) { isResolution: true, } - err = s.networkResults.storeResult(paymentID, n) + err = s.attemptStore.StoreResult(paymentID, n) require.NoError(t, err, "unable to store result") // The result should be available. @@ -5554,3 +5555,183 @@ func testSwitchAliasInterceptFail(t *testing.T, zeroConf bool) { require.NoError(t, interceptSwitch.Stop()) } + +// TestSwitchFailOrphanedAttempt demonstrates that a caller to GetAttemptResult +// will hang if an attempt is orphaned in a state where it has been initialized +// but not actually dispatched. We then verify that a this scenario is resolved +// via FailPendingAttempt storing a final failed result, thereby unblocking the +// GetAttemptResult subscriber. +func TestSwitchFailOrphanedAttempt(t *testing.T) { + t.Parallel() + + s, err := initSwitchWithTempDB(t, 0) + require.NoError(t, err) + require.NoError(t, s.Start()) + t.Cleanup(func() { require.NoError(t, s.Stop()) }) + + // Manually initialize an attempt in the store. This simulates the "bad" + // state where an attempt is pending but has no corresponding circuit. + attemptID := uint64(1) + err = s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to initialize attempt") + + // Now, call GetAttemptResult. This function returns a channel that will + // deliver the result. + resChan, err := s.GetAttemptResult( + attemptID, lntypes.Hash{}, nil, + ) + require.NoError(t, err) + + // We expect the call to hang. We'll use a timeout to verify that no + // result is received from the channel. + select { + case <-resChan: + t.Fatalf("received result unexpectedly, should have hung") + case <-time.After(100 * time.Millisecond): + // This is the expected path, the call has "hung" for 100ms. + } + + // Now, simulate the fix by manually calling FailPendingAttempt. + err = s.attemptStore.FailPendingAttempt(attemptID, NewLinkError( + &lnwire.FailTemporaryNodeFailure{}), + ) + require.NoError(t, err, "unable to fail attempt") + + // The channel should now un-block and deliver the final failed + // result. + select { + case result := <-resChan: + // We expect a result with an error, indicating failure. + require.Error(t, result.Error, "expected a failed result") + case <-time.After(1 * time.Second): + t.Fatalf("did not receive result after manual failure") + } +} + +// TestSwitchOrphanCleanup tests that the switch's startup procedure will +// correctly identify and clean up any orphaned attempts. This includes both +// simple pending orphans (from a crash after InitAttempt) and "half-open" +// circuit orphans (from a crash after CommitCircuits). +func TestSwitchOrphanCleanup(t *testing.T) { + t.Parallel() + + const attemptID = uint64(1) + + testCases := []struct { + name string + setupOrphan func(t *testing.T, s *Switch, cdb *channeldb.DB) + }{ + { + name: "pre-commit orphan", + setupOrphan: func(t *testing.T, s *Switch, + _ *channeldb.DB) { + + // Manually initialize an attempt to simulate + // the state of the database if the node crashed + // after InitAttempt but before CommitCircuits. + err := s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to init") + }, + }, + { + name: "half-open circuit orphan", + setupOrphan: func(t *testing.T, s *Switch, + _ *channeldb.DB) { + + // Manually initialize an attempt and commit a + // circuit to simulate the state of the database + // if the node crashed after CommitCircuits but + // before the packet was handed off to the link. + err := s.attemptStore.InitAttempt(attemptID) + require.NoError(t, err, "unable to init") + + htlc := &lnwire.UpdateAddHTLC{ + PaymentHash: lntypes.Hash{0x01}, + } + + // We'll create a dummy outgoing channel ID to + // attach to the circuit. The existence of an + // outgoingChanID is what makes this a + // "half-open" orphan, as the switch knows it + // was intended to be forwarded somewhere. + const outgoingChanID = 123 + packet := &htlcPacket{ + incomingChanID: hop.Source, + incomingHTLCID: attemptID, + outgoingChanID: lnwire. + NewShortChanIDFromInt( + outgoingChanID, + ), + htlc: htlc, + amount: htlc.Amount, + } + + circuit := newPaymentCircuit( + &htlc.PaymentHash, packet, + ) + _, err = s.circuits.CommitCircuits(circuit) + require.NoError(t, err, "commit failed") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create a temporary database path that will persist + // across restarts. + tempPath := t.TempDir() + + // First, we'll create a database and a switch + // instance. + cdb1 := channeldb.OpenForTesting(t, tempPath) + s1, err := initSwitchWithDB(0, cdb1) + require.NoError(t, err) + require.NoError(t, s1.Start()) + + // Call the specific setup function for this test case + // to create the desired orphan state. + tc.setupOrphan(t, s1, cdb1) + + // We must stop the switch and close its database to + // ensure the state is flushed to disk at tempPath. + require.NoError(t, s1.Stop()) + require.NoError(t, cdb1.Close()) + + // Now, we'll create a new database instance from the + // same path and a new switch. This simulates a node + // restart. + cdb2 := channeldb.OpenForTesting(t, tempPath) + t.Cleanup(func() { cdb2.Close() }) + + s2, err := initSwitchWithDB(0, cdb2) + require.NoError(t, err) + require.NoError(t, s2.Start()) + t.Cleanup(func() { require.NoError(t, s2.Stop()) }) + + // After startup, we query the store for our orphaned + // attempt ID. We expect to find a final FAILED + // result, as the janitor should have cleaned it up. + result, err := s2.attemptStore.GetResult(attemptID) + require.NoError(t, err, "expected final result") + require.NotNil(t, result, "result should not be nil") + + // The result should be a failure message. + failMsg, ok := result.msg.(*lnwire.UpdateFailHTLC) + require.True(t, ok, "expected fail message") + + // The cleanup routine uses a generic failure reason. + require.True(t, result.unencrypted, "unencrypted") + reason, err := lnwire.DecodeFailure( + bytes.NewReader(failMsg.Reason), 0, + ) + require.NoError(t, err, "unable to decode failure") + + // We expect a FailTemporaryNodeFailure. + _, ok = reason.(*lnwire.FailTemporaryNodeFailure) + require.True(t, ok, "expected temp node failure") + }) + } +}