diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index 4ecf53aa..1bda9bd9 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -16,8 +16,10 @@ import ( ) const ( - bootstrapRefreshInterval = 10 * time.Minute - defaultSuperNodeP2PPort int = 4445 + bootstrapRefreshInterval = 10 * time.Minute + defaultSuperNodeP2PPort int = 4445 + superNodeStateActive int32 = 1 + superNodeStatePostponed int32 = 5 ) // seed a couple of obviously bad addrs (unless in integration tests) @@ -108,15 +110,20 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes return nil } -// loadBootstrapCandidatesFromChain returns active supernodes (by latest state) -// mapped by "ip:port". No pings here. -func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) { +// loadBootstrapCandidatesFromChain returns routing candidates (by latest state) mapped by "ip:port", +// plus two allowlists: +// - routingIDs: Active + Postponed +// - storeIDs: Active only +// +// No pings here. +func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, map[[32]byte]struct{}, error) { resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx) if err != nil { - return nil, nil, fmt.Errorf("failed to list supernodes: %w", err) + return nil, nil, nil, fmt.Errorf("failed to list supernodes: %w", err) } - activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + routingIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + storeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) mapNodes := make(map[string]*Node, len(resp.Supernodes)) for _, sn := range resp.Supernodes { if len(sn.States) == 0 { @@ -130,7 +137,9 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress latestState = int32(st.State) } } - if latestState != 1 { // SuperNodeStateActive = 1 + // Routing/read eligibility includes Active + Postponed. + // Store/write eligibility remains Active-only. + if latestState != superNodeStateActive && latestState != superNodeStatePostponed { continue } @@ -148,7 +157,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress } else if len(h) == 32 { var key [32]byte copy(key[:], h) - activeIDs[key] = struct{}{} + routingIDs[key] = struct{}{} + if latestState == superNodeStateActive { + storeIDs[key] = struct{}{} + } } // latest IP by height @@ -190,7 +202,7 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress node.ID = []byte(id) mapNodes[full] = node } - return mapNodes, activeIDs, nil + return mapNodes, routingIDs, storeIDs, nil } // upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false). @@ -245,6 +257,24 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil { return err } + allow := make(map[[32]byte]struct{}, len(s.options.BootstrapNodes)) + for _, n := range s.options.BootstrapNodes { + if n == nil || len(n.ID) == 0 { + continue + } + h, err := utils.Blake3Hash(n.ID) + if err != nil || len(h) != 32 { + continue + } + var key [32]byte + copy(key[:], h) + allow[key] = struct{}{} + } + // Config bootstrap has no chain states; treat provided peers as both routing/store-eligible. + s.setRoutingAllowlist(ctx, allow) + s.setStoreAllowlist(ctx, allow) + s.pruneIneligibleRoutingPeers(ctx) + for _, n := range s.options.BootstrapNodes { if err := s.upsertBootstrapNode(ctx, n); err != nil { logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{ @@ -265,14 +295,16 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro } selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port) - cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) + cands, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) if err != nil { return err } - // Update eligibility gate from chain Active state and prune any peers that slipped in via + // Update routing/read gate from chain state and prune any peers that slipped in via // inbound traffic before the last bootstrap refresh. - s.setRoutingAllowlist(ctx, activeIDs) + s.setRoutingAllowlist(ctx, routingIDs) + // Write/replication targets are Active-only. + s.setStoreAllowlist(ctx, storeIDs) s.pruneIneligibleRoutingPeers(ctx) // Upsert candidates to replication_info @@ -303,13 +335,6 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro // This keeps replication_info and routing table current as the validator set changes. func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) { go func() { - // Initial sync - if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - } t := time.NewTicker(bootstrapRefreshInterval) defer t.Stop() @@ -331,11 +356,15 @@ func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string // ConfigureBootstrapNodes wires to the new sync/refresher (no pings here). func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { - // One-time sync; start refresher in the background + // One-time sync attempt; keep service running if it fails and rely on refresher retries. if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - return err + logtrace.Warn(ctx, "initial bootstrap sync failed; continuing with periodic refresher", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) } + // Always start periodic retries so transient chain/API outages can recover. s.StartBootstrapRefresher(ctx, bootstrapNodes) return nil diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 41cba389..75a3bcc0 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -69,7 +69,8 @@ type DHT struct { metrics DHTMetrics // routingAllowlist is a fast in-memory gate of which peers are eligible to - // participate in the routing table (based on chain state: Active only). + // participate in routing/read lookup paths (based on chain state). + // Current policy: Active + Postponed are routing-eligible. // // Hot paths do only an atomic check + map lookup; updates happen on the // bootstrap refresh cadence. @@ -77,6 +78,14 @@ type DHT struct { routingAllow map[[32]byte]struct{} // blake3(peerID) -> exists routingAllowReady atomic.Bool routingAllowCount atomic.Int64 + + // storeAllowlist is a fast in-memory gate of which peers are eligible for + // write/replication targets. + // Current policy: Active only. + storeAllowMu sync.RWMutex + storeAllow map[[32]byte]struct{} // blake3(peerID) -> exists + storeAllowReady atomic.Bool + storeAllowCount atomic.Int64 } // bootstrapIgnoreList seeds the in-memory ignore list with nodes that are @@ -144,11 +153,11 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct // Avoid accidentally locking ourselves out due to transient chain issues. if len(allow) == 0 { if !s.routingAllowReady.Load() { - logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving gating disabled (bootstrap)", logtrace.Fields{ + logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving routing gating disabled (bootstrap)", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } else { - logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero active supernodes; retaining previous allowlist", logtrace.Fields{ + logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero routing-eligible supernodes; retaining previous allowlist", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } @@ -164,7 +173,29 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct logtrace.Debug(ctx, "routing allowlist updated", logtrace.Fields{ logtrace.FieldModule: "p2p", - "active_peers": len(allow), + "routing_peers": len(allow), + }) +} + +func (s *DHT) setStoreAllowlist(ctx context.Context, allow map[[32]byte]struct{}) { + if s == nil { + return + } + // Integration tests may use synthetic bootstrap sets; do not enforce chain-state gating. + if integrationTestEnabled() { + return + } + + s.storeAllowMu.Lock() + s.storeAllow = allow + s.storeAllowMu.Unlock() + + s.storeAllowCount.Store(int64(len(allow))) + s.storeAllowReady.Store(true) + + logtrace.Debug(ctx, "store allowlist updated", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "store_peers": len(allow), }) } @@ -176,9 +207,9 @@ func (s *DHT) eligibleForRouting(n *Node) bool { if integrationTestEnabled() { return true } - // If allowlist isn't ready (or was never populated), do not gate to avoid blocking bootstrap. + // Strict gating: only explicitly allowlisted peers can participate in read/routing. if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { - return true + return false } if n == nil || len(n.ID) == 0 { return false @@ -197,14 +228,51 @@ func (s *DHT) eligibleForRouting(n *Node) bool { return ok } +func (s *DHT) eligibleForStore(n *Node) bool { + if s == nil { + return false + } + // In integration tests allow everything; chain state gating is not stable/available there. + if integrationTestEnabled() { + return true + } + // If the store allowlist isn't ready yet, avoid blocking writes during bootstrap. + if !s.storeAllowReady.Load() { + return true + } + // Once initialized, an empty active set means no write-eligible peers. + if s.storeAllowCount.Load() == 0 { + return false + } + if n == nil || len(n.ID) == 0 { + return false + } + + n.SetHashedID() + if len(n.HashedID) != 32 { + return false + } + var key [32]byte + copy(key[:], n.HashedID) + + s.storeAllowMu.RLock() + _, ok := s.storeAllow[key] + s.storeAllowMu.RUnlock() + return ok +} + func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node { if s == nil || len(nodes) == 0 { return nodes } - // Fast path: not enforcing (integration tests / not ready / empty list) - if integrationTestEnabled() || !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Fast path for integration tests only. + if integrationTestEnabled() { return nodes } + // Strict gating: without a routing allowlist there are no eligible routing peers. + if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + return nil + } out := nodes[:0] for _, n := range nodes { @@ -430,8 +498,12 @@ func (s *DHT) Start(ctx context.Context) error { go s.StartReplicationWorker(ctx) go s.startDisabledKeysCleanupWorker(ctx) - go s.startCleanupRedundantDataWorker(ctx) - go s.startDeleteDataWorker(ctx) + // TEMPORARY: disabled to pause redundant-key classification into del_keys. + // Re-enable once deletion-safety behavior is finalized. + // go s.startCleanupRedundantDataWorker(ctx) + // TEMPORARY: disabled to prevent processing existing del_keys backlog. + // Re-enable together with redundant-data classification once safe. + // go s.startDeleteDataWorker(ctx) go s.startStoreSymbolsWorker(ctx) return nil @@ -2101,6 +2173,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, launched := 0 for i := 0; i < Alpha && i < nl.Len(); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { continue } @@ -2142,6 +2217,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, finalStoreCount := atomic.LoadInt32(&storeCount) for i := Alpha; i < nl.Len() && finalStoreCount < int32(Alpha); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { logtrace.Debug(ctx, "Ignore banned node during sequential store", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -2278,11 +2356,17 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) hashes := make([][]byte, len(values)) + routingNodeCount := len(s.ht.nodes()) + candidateLimit := routingNodeCount + if candidateLimit < Alpha { + candidateLimit = Alpha + } ignoreList := s.ignorelist.ToNodeList() ignoredSet := hashedIDSetFromNodes(ignoreList) + keysWithoutCandidates := 0 { - f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"} + f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": routingNodeCount, logtrace.FieldRole: "client"} if o := logtrace.OriginFromContext(ctx); o != "" { f[logtrace.FieldOrigin] = o } @@ -2291,11 +2375,39 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i for i := 0; i < len(values); i++ { target, _ := utils.Blake3Hash(values[i]) hashes[i] = target - top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil) + candidates := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(candidateLimit, target, ignoredSet, nil) - globalClosestContacts[base58.Encode(target)] = top6 - // log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin") - s.addKnownNodes(ctx, top6.Nodes, knownNodes) + writeEligible := make([]*Node, 0, Alpha) + for _, n := range candidates.Nodes { + if s.eligibleForStore(n) { + writeEligible = append(writeEligible, n) + if len(writeEligible) >= Alpha { + break + } + } + } + if len(writeEligible) == 0 { + keysWithoutCandidates++ + } + globalClosestContacts[base58.Encode(target)] = &NodeList{Nodes: writeEligible} + // log.WithContext(ctx).WithField("top 6", candidates).Info("iterate batch store begin") + s.addKnownNodes(ctx, writeEligible, knownNodes) + } + + if keysWithoutCandidates > 0 { + logtrace.Error(ctx, "dht: batch store skipped (keys without eligible store nodes)", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + "keys": len(values), + "keys_without_nodes": keysWithoutCandidates, + "len_nodes": routingNodeCount, + "banned_nodes": len(ignoreList), + "routing_allow_ready": s.routingAllowReady.Load(), + "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), + }) + return fmt.Errorf("no eligible store peers for %d/%d keys", keysWithoutCandidates, len(values)) } storageMap := make(map[string][]int) // This will store the index of the data in the values array that needs to be stored to the node @@ -2321,10 +2433,12 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), - "len_nodes": len(s.ht.nodes()), + "len_nodes": routingNodeCount, "banned_nodes": len(ignoreList), "routing_allow_ready": s.routingAllowReady.Load(), "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), }) return fmt.Errorf("no candidate nodes for batch store") } @@ -2410,6 +2524,9 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ for key, node := range nodes { logtrace.Debug(ctx, "Preparing batch store to node", logtrace.Fields{logtrace.FieldModule: "dht", "node": node.String()}) + if !s.eligibleForStore(node) { + continue + } if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{ logtrace.FieldModule: "dht", diff --git a/p2p/kademlia/dht_batch_store_test.go b/p2p/kademlia/dht_batch_store_test.go index 25965b3c..7dbf69e1 100644 --- a/p2p/kademlia/dht_batch_store_test.go +++ b/p2p/kademlia/dht_batch_store_test.go @@ -28,7 +28,7 @@ func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) { if err == nil { t.Fatalf("expected error, got nil") } - if !strings.Contains(err.Error(), "no candidate nodes") { + if !strings.Contains(err.Error(), "no eligible store peers") { t.Fatalf("unexpected error: %v", err) } } diff --git a/p2p/kademlia/node_activity.go b/p2p/kademlia/node_activity.go index 969321b9..e8b1329f 100644 --- a/p2p/kademlia/node_activity.go +++ b/p2p/kademlia/node_activity.go @@ -51,10 +51,10 @@ func (s *DHT) checkNodeActivity(ctx context.Context) { node := s.makeNode([]byte(info.ID), info.IP, info.Port) - // Chain-state gating: do not spend cycles pinging or promoting peers that - // are not eligible to participate in routing (e.g., postponed). + // Chain-state routing gate: do not spend cycles on peers that are not + // eligible to participate in routing/read paths (e.g., disabled/stopped). // Note: eligibility changes asynchronously on the bootstrap refresh cadence; - // replication_info.Active is therefore eventually consistent with chain state. + // replication_info.Active is eventually consistent with store eligibility. if !s.eligibleForRouting(node) { if info.Active { s.removeNode(ctx, node) @@ -137,7 +137,7 @@ func (s *DHT) handlePingFailure(ctx context.Context, wasActive bool, n *Node, er } func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { - // Never promote an ineligible node into routing/active replication set. + // Never keep a non-routing-eligible node in routing/replication sets. if !s.eligibleForRouting(n) { if wasActive { s.removeNode(ctx, n) @@ -155,14 +155,26 @@ func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { // clear from ignorelist and ensure presence in routing s.ignorelist.Delete(n) + s.addNode(ctx, n) - if !wasActive { + // Store/replication eligibility is stricter than routing/read eligibility. + if !s.eligibleForStore(n) { + if wasActive { + if uerr := s.store.UpdateIsActive(ctx, string(n.ID), false, false); uerr != nil { + logtrace.Error(ctx, "failed to update replication info, node is inactive (store-ineligible)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } + } + } else if !wasActive { logtrace.Debug(ctx, "node found to be active again", logtrace.Fields{ logtrace.FieldModule: "p2p", "ip": n.IP, "node_id": string(n.ID), }) - s.addNode(ctx, n) if uerr := s.store.UpdateIsActive(ctx, string(n.ID), true, false); uerr != nil { logtrace.Error(ctx, "failed to update replication info, node is active", logtrace.Fields{ logtrace.FieldModule: "p2p",