From 8c2666db48e95eb7bea8f8f6a14ff70ea9a89227 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Fri, 20 Feb 2026 18:47:22 +0500 Subject: [PATCH 1/4] Temprarily disable redundant data cleanup worker --- p2p/kademlia/dht.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 41cba389..7466693f 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -430,8 +430,12 @@ func (s *DHT) Start(ctx context.Context) error { go s.StartReplicationWorker(ctx) go s.startDisabledKeysCleanupWorker(ctx) - go s.startCleanupRedundantDataWorker(ctx) - go s.startDeleteDataWorker(ctx) + // TEMPORARY: disabled to pause redundant-key classification into del_keys. + // Re-enable once deletion-safety behavior is finalized. + // go s.startCleanupRedundantDataWorker(ctx) + // TEMPORARY: disabled to prevent processing existing del_keys backlog. + // Re-enable together with redundant-data classification once safe. + // go s.startDeleteDataWorker(ctx) go s.startStoreSymbolsWorker(ctx) return nil From 2230e38fb78baec2b4d24b25e36f0f02ffbe57da Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Mon, 23 Feb 2026 17:27:30 +0500 Subject: [PATCH 2/4] DHT: Store only on ACTIVE supernodes --- p2p/kademlia/bootstrap.go | 73 ++++++++++++------ p2p/kademlia/dht.go | 141 ++++++++++++++++++++++++++++++---- p2p/kademlia/node_activity.go | 24 ++++-- 3 files changed, 196 insertions(+), 42 deletions(-) diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index 4ecf53aa..1bda9bd9 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -16,8 +16,10 @@ import ( ) const ( - bootstrapRefreshInterval = 10 * time.Minute - defaultSuperNodeP2PPort int = 4445 + bootstrapRefreshInterval = 10 * time.Minute + defaultSuperNodeP2PPort int = 4445 + superNodeStateActive int32 = 1 + superNodeStatePostponed int32 = 5 ) // seed a couple of obviously bad addrs (unless in integration tests) @@ -108,15 +110,20 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes return nil } -// loadBootstrapCandidatesFromChain returns active supernodes (by latest state) -// mapped by "ip:port". No pings here. -func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) { +// loadBootstrapCandidatesFromChain returns routing candidates (by latest state) mapped by "ip:port", +// plus two allowlists: +// - routingIDs: Active + Postponed +// - storeIDs: Active only +// +// No pings here. +func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, map[[32]byte]struct{}, error) { resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx) if err != nil { - return nil, nil, fmt.Errorf("failed to list supernodes: %w", err) + return nil, nil, nil, fmt.Errorf("failed to list supernodes: %w", err) } - activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + routingIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + storeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) mapNodes := make(map[string]*Node, len(resp.Supernodes)) for _, sn := range resp.Supernodes { if len(sn.States) == 0 { @@ -130,7 +137,9 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress latestState = int32(st.State) } } - if latestState != 1 { // SuperNodeStateActive = 1 + // Routing/read eligibility includes Active + Postponed. + // Store/write eligibility remains Active-only. + if latestState != superNodeStateActive && latestState != superNodeStatePostponed { continue } @@ -148,7 +157,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress } else if len(h) == 32 { var key [32]byte copy(key[:], h) - activeIDs[key] = struct{}{} + routingIDs[key] = struct{}{} + if latestState == superNodeStateActive { + storeIDs[key] = struct{}{} + } } // latest IP by height @@ -190,7 +202,7 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress node.ID = []byte(id) mapNodes[full] = node } - return mapNodes, activeIDs, nil + return mapNodes, routingIDs, storeIDs, nil } // upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false). @@ -245,6 +257,24 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil { return err } + allow := make(map[[32]byte]struct{}, len(s.options.BootstrapNodes)) + for _, n := range s.options.BootstrapNodes { + if n == nil || len(n.ID) == 0 { + continue + } + h, err := utils.Blake3Hash(n.ID) + if err != nil || len(h) != 32 { + continue + } + var key [32]byte + copy(key[:], h) + allow[key] = struct{}{} + } + // Config bootstrap has no chain states; treat provided peers as both routing/store-eligible. + s.setRoutingAllowlist(ctx, allow) + s.setStoreAllowlist(ctx, allow) + s.pruneIneligibleRoutingPeers(ctx) + for _, n := range s.options.BootstrapNodes { if err := s.upsertBootstrapNode(ctx, n); err != nil { logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{ @@ -265,14 +295,16 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro } selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port) - cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) + cands, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) if err != nil { return err } - // Update eligibility gate from chain Active state and prune any peers that slipped in via + // Update routing/read gate from chain state and prune any peers that slipped in via // inbound traffic before the last bootstrap refresh. - s.setRoutingAllowlist(ctx, activeIDs) + s.setRoutingAllowlist(ctx, routingIDs) + // Write/replication targets are Active-only. + s.setStoreAllowlist(ctx, storeIDs) s.pruneIneligibleRoutingPeers(ctx) // Upsert candidates to replication_info @@ -303,13 +335,6 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro // This keeps replication_info and routing table current as the validator set changes. func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) { go func() { - // Initial sync - if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - } t := time.NewTicker(bootstrapRefreshInterval) defer t.Stop() @@ -331,11 +356,15 @@ func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string // ConfigureBootstrapNodes wires to the new sync/refresher (no pings here). func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { - // One-time sync; start refresher in the background + // One-time sync attempt; keep service running if it fails and rely on refresher retries. if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - return err + logtrace.Warn(ctx, "initial bootstrap sync failed; continuing with periodic refresher", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) } + // Always start periodic retries so transient chain/API outages can recover. s.StartBootstrapRefresher(ctx, bootstrapNodes) return nil diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 7466693f..75a3bcc0 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -69,7 +69,8 @@ type DHT struct { metrics DHTMetrics // routingAllowlist is a fast in-memory gate of which peers are eligible to - // participate in the routing table (based on chain state: Active only). + // participate in routing/read lookup paths (based on chain state). + // Current policy: Active + Postponed are routing-eligible. // // Hot paths do only an atomic check + map lookup; updates happen on the // bootstrap refresh cadence. @@ -77,6 +78,14 @@ type DHT struct { routingAllow map[[32]byte]struct{} // blake3(peerID) -> exists routingAllowReady atomic.Bool routingAllowCount atomic.Int64 + + // storeAllowlist is a fast in-memory gate of which peers are eligible for + // write/replication targets. + // Current policy: Active only. + storeAllowMu sync.RWMutex + storeAllow map[[32]byte]struct{} // blake3(peerID) -> exists + storeAllowReady atomic.Bool + storeAllowCount atomic.Int64 } // bootstrapIgnoreList seeds the in-memory ignore list with nodes that are @@ -144,11 +153,11 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct // Avoid accidentally locking ourselves out due to transient chain issues. if len(allow) == 0 { if !s.routingAllowReady.Load() { - logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving gating disabled (bootstrap)", logtrace.Fields{ + logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving routing gating disabled (bootstrap)", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } else { - logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero active supernodes; retaining previous allowlist", logtrace.Fields{ + logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero routing-eligible supernodes; retaining previous allowlist", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } @@ -164,7 +173,29 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct logtrace.Debug(ctx, "routing allowlist updated", logtrace.Fields{ logtrace.FieldModule: "p2p", - "active_peers": len(allow), + "routing_peers": len(allow), + }) +} + +func (s *DHT) setStoreAllowlist(ctx context.Context, allow map[[32]byte]struct{}) { + if s == nil { + return + } + // Integration tests may use synthetic bootstrap sets; do not enforce chain-state gating. + if integrationTestEnabled() { + return + } + + s.storeAllowMu.Lock() + s.storeAllow = allow + s.storeAllowMu.Unlock() + + s.storeAllowCount.Store(int64(len(allow))) + s.storeAllowReady.Store(true) + + logtrace.Debug(ctx, "store allowlist updated", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "store_peers": len(allow), }) } @@ -176,9 +207,9 @@ func (s *DHT) eligibleForRouting(n *Node) bool { if integrationTestEnabled() { return true } - // If allowlist isn't ready (or was never populated), do not gate to avoid blocking bootstrap. + // Strict gating: only explicitly allowlisted peers can participate in read/routing. if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { - return true + return false } if n == nil || len(n.ID) == 0 { return false @@ -197,14 +228,51 @@ func (s *DHT) eligibleForRouting(n *Node) bool { return ok } +func (s *DHT) eligibleForStore(n *Node) bool { + if s == nil { + return false + } + // In integration tests allow everything; chain state gating is not stable/available there. + if integrationTestEnabled() { + return true + } + // If the store allowlist isn't ready yet, avoid blocking writes during bootstrap. + if !s.storeAllowReady.Load() { + return true + } + // Once initialized, an empty active set means no write-eligible peers. + if s.storeAllowCount.Load() == 0 { + return false + } + if n == nil || len(n.ID) == 0 { + return false + } + + n.SetHashedID() + if len(n.HashedID) != 32 { + return false + } + var key [32]byte + copy(key[:], n.HashedID) + + s.storeAllowMu.RLock() + _, ok := s.storeAllow[key] + s.storeAllowMu.RUnlock() + return ok +} + func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node { if s == nil || len(nodes) == 0 { return nodes } - // Fast path: not enforcing (integration tests / not ready / empty list) - if integrationTestEnabled() || !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Fast path for integration tests only. + if integrationTestEnabled() { return nodes } + // Strict gating: without a routing allowlist there are no eligible routing peers. + if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + return nil + } out := nodes[:0] for _, n := range nodes { @@ -2105,6 +2173,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, launched := 0 for i := 0; i < Alpha && i < nl.Len(); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { continue } @@ -2146,6 +2217,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, finalStoreCount := atomic.LoadInt32(&storeCount) for i := Alpha; i < nl.Len() && finalStoreCount < int32(Alpha); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { logtrace.Debug(ctx, "Ignore banned node during sequential store", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -2282,11 +2356,17 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) hashes := make([][]byte, len(values)) + routingNodeCount := len(s.ht.nodes()) + candidateLimit := routingNodeCount + if candidateLimit < Alpha { + candidateLimit = Alpha + } ignoreList := s.ignorelist.ToNodeList() ignoredSet := hashedIDSetFromNodes(ignoreList) + keysWithoutCandidates := 0 { - f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"} + f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": routingNodeCount, logtrace.FieldRole: "client"} if o := logtrace.OriginFromContext(ctx); o != "" { f[logtrace.FieldOrigin] = o } @@ -2295,11 +2375,39 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i for i := 0; i < len(values); i++ { target, _ := utils.Blake3Hash(values[i]) hashes[i] = target - top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil) + candidates := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(candidateLimit, target, ignoredSet, nil) - globalClosestContacts[base58.Encode(target)] = top6 - // log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin") - s.addKnownNodes(ctx, top6.Nodes, knownNodes) + writeEligible := make([]*Node, 0, Alpha) + for _, n := range candidates.Nodes { + if s.eligibleForStore(n) { + writeEligible = append(writeEligible, n) + if len(writeEligible) >= Alpha { + break + } + } + } + if len(writeEligible) == 0 { + keysWithoutCandidates++ + } + globalClosestContacts[base58.Encode(target)] = &NodeList{Nodes: writeEligible} + // log.WithContext(ctx).WithField("top 6", candidates).Info("iterate batch store begin") + s.addKnownNodes(ctx, writeEligible, knownNodes) + } + + if keysWithoutCandidates > 0 { + logtrace.Error(ctx, "dht: batch store skipped (keys without eligible store nodes)", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + "keys": len(values), + "keys_without_nodes": keysWithoutCandidates, + "len_nodes": routingNodeCount, + "banned_nodes": len(ignoreList), + "routing_allow_ready": s.routingAllowReady.Load(), + "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), + }) + return fmt.Errorf("no eligible store peers for %d/%d keys", keysWithoutCandidates, len(values)) } storageMap := make(map[string][]int) // This will store the index of the data in the values array that needs to be stored to the node @@ -2325,10 +2433,12 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), - "len_nodes": len(s.ht.nodes()), + "len_nodes": routingNodeCount, "banned_nodes": len(ignoreList), "routing_allow_ready": s.routingAllowReady.Load(), "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), }) return fmt.Errorf("no candidate nodes for batch store") } @@ -2414,6 +2524,9 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ for key, node := range nodes { logtrace.Debug(ctx, "Preparing batch store to node", logtrace.Fields{logtrace.FieldModule: "dht", "node": node.String()}) + if !s.eligibleForStore(node) { + continue + } if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{ logtrace.FieldModule: "dht", diff --git a/p2p/kademlia/node_activity.go b/p2p/kademlia/node_activity.go index 969321b9..e8b1329f 100644 --- a/p2p/kademlia/node_activity.go +++ b/p2p/kademlia/node_activity.go @@ -51,10 +51,10 @@ func (s *DHT) checkNodeActivity(ctx context.Context) { node := s.makeNode([]byte(info.ID), info.IP, info.Port) - // Chain-state gating: do not spend cycles pinging or promoting peers that - // are not eligible to participate in routing (e.g., postponed). + // Chain-state routing gate: do not spend cycles on peers that are not + // eligible to participate in routing/read paths (e.g., disabled/stopped). // Note: eligibility changes asynchronously on the bootstrap refresh cadence; - // replication_info.Active is therefore eventually consistent with chain state. + // replication_info.Active is eventually consistent with store eligibility. if !s.eligibleForRouting(node) { if info.Active { s.removeNode(ctx, node) @@ -137,7 +137,7 @@ func (s *DHT) handlePingFailure(ctx context.Context, wasActive bool, n *Node, er } func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { - // Never promote an ineligible node into routing/active replication set. + // Never keep a non-routing-eligible node in routing/replication sets. if !s.eligibleForRouting(n) { if wasActive { s.removeNode(ctx, n) @@ -155,14 +155,26 @@ func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { // clear from ignorelist and ensure presence in routing s.ignorelist.Delete(n) + s.addNode(ctx, n) - if !wasActive { + // Store/replication eligibility is stricter than routing/read eligibility. + if !s.eligibleForStore(n) { + if wasActive { + if uerr := s.store.UpdateIsActive(ctx, string(n.ID), false, false); uerr != nil { + logtrace.Error(ctx, "failed to update replication info, node is inactive (store-ineligible)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } + } + } else if !wasActive { logtrace.Debug(ctx, "node found to be active again", logtrace.Fields{ logtrace.FieldModule: "p2p", "ip": n.IP, "node_id": string(n.ID), }) - s.addNode(ctx, n) if uerr := s.store.UpdateIsActive(ctx, string(n.ID), true, false); uerr != nil { logtrace.Error(ctx, "failed to update replication info, node is active", logtrace.Fields{ logtrace.FieldModule: "p2p", From 111b9fe25d97cf29c64ec834bf61fd9bf33eeead Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Mon, 23 Feb 2026 19:26:37 +0500 Subject: [PATCH 3/4] Fix test --- p2p/kademlia/dht_batch_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/kademlia/dht_batch_store_test.go b/p2p/kademlia/dht_batch_store_test.go index 25965b3c..7dbf69e1 100644 --- a/p2p/kademlia/dht_batch_store_test.go +++ b/p2p/kademlia/dht_batch_store_test.go @@ -28,7 +28,7 @@ func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) { if err == nil { t.Fatalf("expected error, got nil") } - if !strings.Contains(err.Error(), "no candidate nodes") { + if !strings.Contains(err.Error(), "no eligible store peers") { t.Fatalf("unexpected error: %v", err) } } From e5f2d3a45f0c44f53fe97af40e02a3f2e5bebc29 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 23 Feb 2026 19:41:47 +0500 Subject: [PATCH 4/4] Self Healing --- p2p/kademlia/dht.go | 22 +- p2p/kademlia/message.go | 15 + p2p/kademlia/network.go | 42 ++- p2p/kademlia/rebalance_worker.go | 382 +++++++++++++++++++++++ p2p/kademlia/store.go | 16 + p2p/kademlia/store/mem/mem.go | 43 ++- p2p/kademlia/store/sqlite/replication.go | 74 +++++ supernode/adaptors/lumera.go | 4 + 8 files changed, 590 insertions(+), 8 deletions(-) create mode 100644 p2p/kademlia/rebalance_worker.go diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 75a3bcc0..1324bf44 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -207,8 +207,13 @@ func (s *DHT) eligibleForRouting(n *Node) bool { if integrationTestEnabled() { return true } - // Strict gating: only explicitly allowlisted peers can participate in read/routing. - if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Bootstrap fail-open: until the first successful allowlist sync, do not block routing. + // This avoids startup isolation during transient chain/API failures. + if !s.routingAllowReady.Load() { + return true + } + // Once initialized, an empty routing set means no routing-eligible peers. + if s.routingAllowCount.Load() == 0 { return false } if n == nil || len(n.ID) == 0 { @@ -269,8 +274,12 @@ func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node { if integrationTestEnabled() { return nodes } - // Strict gating: without a routing allowlist there are no eligible routing peers. - if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Bootstrap fail-open: before first allowlist sync, keep routing candidates. + if !s.routingAllowReady.Load() { + return nodes + } + // Once initialized, an empty allowlist means no routing-eligible peers. + if s.routingAllowCount.Load() == 0 { return nil } @@ -497,6 +506,7 @@ func (s *DHT) Start(ctx context.Context) error { } go s.StartReplicationWorker(ctx) + go s.startRebalanceWorker(ctx) go s.startDisabledKeysCleanupWorker(ctx) // TEMPORARY: disabled to pause redundant-key classification into del_keys. // Re-enable once deletion-safety behavior is finalized. @@ -2081,8 +2091,8 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { } node.SetHashedID() - // Chain-state gating: only allow Active supernodes into the routing table. - // This prevents postponed/disabled/stopped nodes from being admitted via inbound traffic. + // Chain-state gating: only routing-eligible supernodes (Active + Postponed) + // can be admitted into the routing table. if !s.eligibleForRouting(node) { logtrace.Debug(ctx, "Rejecting node: not eligible for routing", logtrace.Fields{ logtrace.FieldModule: "p2p", diff --git a/p2p/kademlia/message.go b/p2p/kademlia/message.go index 4f778d1f..5b20818c 100644 --- a/p2p/kademlia/message.go +++ b/p2p/kademlia/message.go @@ -30,6 +30,8 @@ const ( BatchFindNode // BatchGetValues finds values in kademlia network BatchGetValues + // BatchProbeKeys checks local key presence/status without side effects + BatchProbeKeys ) func init() { @@ -49,6 +51,8 @@ func init() { gob.Register(&BatchFindNodeResponse{}) gob.Register(&BatchGetValuesRequest{}) gob.Register(&BatchGetValuesResponse{}) + gob.Register(&BatchProbeKeysRequest{}) + gob.Register(&BatchProbeKeysResponse{}) } type MessageWithError struct { @@ -166,6 +170,17 @@ type BatchGetValuesResponse struct { Status ResponseStatus } +// BatchProbeKeysRequest defines the request data for side-effect-free local key probes. +type BatchProbeKeysRequest struct { + Keys []string // hex-encoded keys +} + +// BatchProbeKeysResponse defines the response data for local key probes. +type BatchProbeKeysResponse struct { + Status ResponseStatus + Data map[string]LocalKeyStatus // key -> local status +} + // encode the message func encode(message *Message) ([]byte, error) { var buf bytes.Buffer diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 48f8ffc9..eccbaedc 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -47,6 +47,7 @@ func init() { FindValue: 5 * time.Second, BatchFindValues: 60 * time.Second, // responder compresses BatchGetValues: 75 * time.Second, // large, sometimes cloud fetch then send back + BatchProbeKeys: 10 * time.Second, StoreData: 10 * time.Second, BatchStoreData: 75 * time.Second, // large uncompressed payloads Replicate: 90 * time.Second, @@ -235,6 +236,37 @@ func (s *Network) handleFindValue(ctx context.Context, message *Message) (res [] return s.encodeMesage(resMsg) } +func (s *Network) handleBatchProbeKeys(ctx context.Context, message *Message) (res []byte, err error) { + defer func() { + if response, err := s.handlePanic(ctx, message.Sender, BatchProbeKeys); response != nil || err != nil { + res = response + } + }() + + request, ok := message.Data.(*BatchProbeKeysRequest) + if !ok { + err := errors.New("invalid BatchProbeKeysRequest") + return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error()) + } + + // Keep routing table fresh while handling probe traffic. + s.dht.addNode(ctx, message.Sender) + + statuses, err := s.dht.store.RetrieveBatchLocalStatus(ctx, request.Keys) + if err != nil { + err = errors.Errorf("batch probe keys: %w", err) + return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error()) + } + + response := &BatchProbeKeysResponse{ + Status: ResponseStatus{Result: ResultOk}, + Data: statuses, + } + + resMsg := s.dht.newMessage(BatchProbeKeys, message.Sender, response) + return s.encodeMesage(resMsg) +} + func (s *Network) handleStoreData(ctx context.Context, message *Message) (res []byte, err error) { defer func() { if response, err := s.handlePanic(ctx, message.Sender, StoreData); response != nil || err != nil { @@ -480,6 +512,10 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { response, hErr = s.withMetrics(BatchGetValues, func() ([]byte, error) { return s.handleGetValuesRequest(ctx, request, reqID) }) + case BatchProbeKeys: + response, hErr = s.withMetrics(BatchProbeKeys, func() ([]byte, error) { + return s.handleBatchProbeKeys(ctx, request) + }) default: // count unknown types as failure and return m := s.metricsFor(mt) @@ -1321,6 +1357,8 @@ func (s *Network) generateResponseMessage(ctx context.Context, messageType int, response = &ReplicateDataResponse{Status: responseStatus} case BatchGetValues: response = &BatchGetValuesResponse{Status: responseStatus} + case BatchProbeKeys: + response = &BatchProbeKeysResponse{Status: responseStatus} default: return nil, fmt.Errorf("unsupported message type %d", messageType) } @@ -1461,6 +1499,8 @@ func msgName(t int) string { return "BatchStoreData" case Replicate: return "Replicate" + case BatchProbeKeys: + return "BatchProbeKeys" default: return fmt.Sprintf("Type_%d", t) } @@ -1501,7 +1541,7 @@ func (s *Network) HandleMetricsSnapshot() map[string]HandleCounters { func readDeadlineFor(msgType int, overall time.Duration) time.Duration { small := 10 * time.Second switch msgType { - case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData: + case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData, BatchProbeKeys: if overall > small+1*time.Second { return small } diff --git a/p2p/kademlia/rebalance_worker.go b/p2p/kademlia/rebalance_worker.go new file mode 100644 index 00000000..7eab7c88 --- /dev/null +++ b/p2p/kademlia/rebalance_worker.go @@ -0,0 +1,382 @@ +package kademlia + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "time" + + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" +) + +const ( + defaultRebalanceInterval = 10 * time.Minute + rebalanceScanPageSize = 500 + rebalanceMaxKeysPerCycle = 2000 + rebalanceProbeFanout = Alpha + rebalanceDeleteConfirmCycles = 2 + rebalanceMaxDeletesPerCycle = 100 + rebalanceMaxHealsPerCycle = 200 + rebalanceMaxConfirmEntries = 200000 + rebalanceProbeRequestTimeout = 8 * time.Second + rebalanceStoreAllowSkipLogStep = 25 +) + +func (s *DHT) startRebalanceWorker(ctx context.Context) { + if s == nil { + return + } + logtrace.Debug(ctx, "rebalance worker started", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "interval": defaultRebalanceInterval.String(), + "page_size": rebalanceScanPageSize, + "max_keys_cycle": rebalanceMaxKeysPerCycle, + "probe_fanout": rebalanceProbeFanout, + }) + + ticker := time.NewTicker(defaultRebalanceInterval) + defer ticker.Stop() + + deleteConfirm := make(map[string]int, rebalanceScanPageSize) + cursor := "" + storeAllowNotReadySkips := 0 + + for { + select { + case <-ctx.Done(): + logtrace.Info(ctx, "closing rebalance worker", logtrace.Fields{ + logtrace.FieldModule: "p2p", + }) + return + case <-ticker.C: + if !s.storeAllowReady.Load() || s.storeAllowCount.Load() == 0 { + storeAllowNotReadySkips++ + // Avoid noisy logs while still leaving breadcrumbs during prolonged bootstrap/chain issues. + if storeAllowNotReadySkips == 1 || storeAllowNotReadySkips%rebalanceStoreAllowSkipLogStep == 0 { + logtrace.Debug(ctx, "rebalance skipped: store allowlist not ready", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), + "skips": storeAllowNotReadySkips, + }) + } + continue + } + storeAllowNotReadySkips = 0 + + if len(deleteConfirm) > rebalanceMaxConfirmEntries { + deleteConfirm = make(map[string]int, rebalanceScanPageSize) + logtrace.Warn(ctx, "rebalance confirmation cache reset due to size", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "max_entries": rebalanceMaxConfirmEntries, + }) + } + + cursor = s.rebalanceOnce(ctx, cursor, deleteConfirm) + } + } +} + +func (s *DHT) rebalanceOnce(ctx context.Context, startCursor string, deleteConfirm map[string]int) string { + if s == nil || s.ht == nil || s.store == nil { + return startCursor + } + if integrationTestEnabled() { + return startCursor + } + + cursor := startCursor + processed := 0 + underReplicated := 0 + healed := 0 + deleted := 0 + decodeErrors := 0 + + for processed < rebalanceMaxKeysPerCycle { + keys, err := s.store.ListLocalKeysPage(ctx, cursor, rebalanceScanPageSize) + if err != nil { + logtrace.Error(ctx, "rebalance: list local keys failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + return cursor + } + if len(keys) == 0 { + // Wrap around at end of keyspace. + cursor = "" + break + } + + for _, keyHex := range keys { + cursor = keyHex + if processed >= rebalanceMaxKeysPerCycle { + break + } + processed++ + + keyBytes, err := hex.DecodeString(keyHex) + if err != nil { + decodeErrors++ + delete(deleteConfirm, keyHex) + continue + } + + candidates := s.storeEligibleCandidatesForKey(keyBytes, rebalanceProbeFanout) + if len(candidates) == 0 { + delete(deleteConfirm, keyHex) + continue + } + + ownerN := Alpha + if ownerN > len(candidates) { + ownerN = len(candidates) + } + owners := candidates[:ownerN] + isOwner := containsNodeID(owners, s.ht.self.ID) + + probeStatuses, holders := s.probeKeyAcrossCandidates(ctx, keyHex, candidates) + selfID := string(s.ht.self.ID) + selfStatus, ok := probeStatuses[selfID] + if !ok { + local, lerr := s.store.RetrieveBatchLocalStatus(ctx, []string{keyHex}) + if lerr != nil { + delete(deleteConfirm, keyHex) + continue + } + selfStatus = local[keyHex] + probeStatuses[selfID] = selfStatus + if selfStatus.Exists && selfStatus.HasLocalBlob { + holders++ + } + } + if !selfStatus.Exists || !selfStatus.HasLocalBlob { + // This worker only manages local key copies; skip cloud-only placeholders or stale rows. + delete(deleteConfirm, keyHex) + continue + } + + if holders < Alpha { + underReplicated++ + delete(deleteConfirm, keyHex) + + if isOwner && healed < rebalanceMaxHealsPerCycle { + before := holders + holders = s.healKeyToMinimumReplicas(ctx, keyHex, keyBytes, candidates, probeStatuses, holders, selfStatus.Datatype) + if holders > before { + healed += holders - before + } + } + continue + } + + if !isOwner && holders >= Alpha { + deleteConfirm[keyHex]++ + if deleteConfirm[keyHex] >= rebalanceDeleteConfirmCycles && deleted < rebalanceMaxDeletesPerCycle { + if err := s.store.BatchDeleteRecords([]string{keyHex}); err != nil { + logtrace.Error(ctx, "rebalance: local delete failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + "key": keyHex, + }) + continue + } + deleted++ + delete(deleteConfirm, keyHex) + } + } else { + delete(deleteConfirm, keyHex) + } + } + + if len(keys) < rebalanceScanPageSize { + // Reached end of keyspace in this cycle. + cursor = "" + break + } + } + + logtrace.Info(ctx, "rebalance cycle complete", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "cursor": cursor, + "processed": processed, + "under_replicated": underReplicated, + "healed": healed, + "deleted": deleted, + "decode_errors": decodeErrors, + "confirm_entries": len(deleteConfirm), + }) + + return cursor +} + +func (s *DHT) storeEligibleCandidatesForKey(target []byte, want int) []*Node { + if s == nil || s.ht == nil || len(target) == 0 { + return nil + } + + candidateLimit := len(s.ht.nodes()) + 1 + if candidateLimit < want { + candidateLimit = want + } + ignoredSet := hashedIDSetFromNodes(s.ignorelist.ToNodeList()) + self := &Node{ID: s.ht.self.ID, IP: s.ht.self.IP, Port: s.ht.self.Port} + self.SetHashedID() + nl := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(candidateLimit, target, ignoredSet, self) + + out := make([]*Node, 0, want) + seen := make(map[string]struct{}, len(nl.Nodes)) + for _, n := range nl.Nodes { + if n == nil || len(n.ID) == 0 { + continue + } + id := string(n.ID) + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + if s.eligibleForStore(n) { + out = append(out, n) + } + } + return out +} + +func (s *DHT) probeKeyAcrossCandidates(ctx context.Context, keyHex string, candidates []*Node) (map[string]LocalKeyStatus, int) { + statuses := make(map[string]LocalKeyStatus, len(candidates)) + holders := 0 + + for i, n := range candidates { + if i >= rebalanceProbeFanout { + break + } + if n == nil || len(n.ID) == 0 { + continue + } + + var st LocalKeyStatus + var err error + if bytes.Equal(n.ID, s.ht.self.ID) { + local, lerr := s.store.RetrieveBatchLocalStatus(ctx, []string{keyHex}) + if lerr != nil { + err = lerr + } else { + st = local[keyHex] + } + } else { + st, err = s.probeKeyOnNode(ctx, n, keyHex) + } + if err != nil { + continue + } + + statuses[string(n.ID)] = st + if st.Exists && st.HasLocalBlob { + holders++ + } + } + + return statuses, holders +} + +func (s *DHT) probeKeyOnNode(ctx context.Context, n *Node, keyHex string) (LocalKeyStatus, error) { + pctx, cancel := context.WithTimeout(ctx, rebalanceProbeRequestTimeout) + defer cancel() + + req := &BatchProbeKeysRequest{Keys: []string{keyHex}} + resp, err := s.sendBatchProbeKeys(pctx, n, req) + if err != nil { + return LocalKeyStatus{}, err + } + st, ok := resp.Data[keyHex] + if !ok { + return LocalKeyStatus{}, nil + } + return st, nil +} + +func (s *DHT) sendBatchProbeKeys(ctx context.Context, n *Node, request *BatchProbeKeysRequest) (*BatchProbeKeysResponse, error) { + reqMsg := s.newMessage(BatchProbeKeys, n, request) + rspMsg, err := s.network.Call(ctx, reqMsg, false) + if err != nil { + return nil, fmt.Errorf("probe network call: %w", err) + } + + response, ok := rspMsg.Data.(*BatchProbeKeysResponse) + if !ok { + return nil, fmt.Errorf("invalid BatchProbeKeysResponse") + } + if response.Status.Result != ResultOk { + return nil, fmt.Errorf("probe failed: %s", response.Status.ErrMsg) + } + if response.Data == nil { + response.Data = map[string]LocalKeyStatus{} + } + return response, nil +} + +func (s *DHT) healKeyToMinimumReplicas( + ctx context.Context, + keyHex string, + key []byte, + candidates []*Node, + probeStatuses map[string]LocalKeyStatus, + holders int, + datatype int, +) int { + if holders >= Alpha { + return holders + } + + value, err := s.store.Retrieve(ctx, key) + if err != nil || len(value) == 0 { + return holders + } + + for _, n := range candidates { + if holders >= Alpha { + break + } + if n == nil || len(n.ID) == 0 || bytes.Equal(n.ID, s.ht.self.ID) { + continue + } + id := string(n.ID) + if st, ok := probeStatuses[id]; ok && st.Exists && st.HasLocalBlob { + continue + } + + response, err := s.sendStoreData(ctx, n, &StoreDataRequest{Data: value, Type: datatype}) + if err != nil || response == nil || response.Status.Result != ResultOk { + continue + } + + probeStatuses[id] = LocalKeyStatus{ + Exists: true, + HasLocalBlob: true, + DataLen: len(value), + Datatype: datatype, + } + holders++ + + logtrace.Debug(ctx, "rebalance healed key replica", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "key": keyHex, + "node": n.String(), + "holders": holders, + }) + } + + return holders +} + +func containsNodeID(nodes []*Node, id []byte) bool { + for _, n := range nodes { + if n == nil { + continue + } + if bytes.Equal(n.ID, id) { + return true + } + } + return false +} diff --git a/p2p/kademlia/store.go b/p2p/kademlia/store.go index ee1f53c6..f7531926 100644 --- a/p2p/kademlia/store.go +++ b/p2p/kademlia/store.go @@ -12,6 +12,16 @@ type DatabaseStats struct { P2PDbRecordsCount int64 } +// LocalKeyStatus is a side-effect-free local presence/status view for a key. +type LocalKeyStatus struct { + Exists bool + HasLocalBlob bool + DataLen int + Datatype int + IsOriginal bool + IsOnCloud bool +} + // Store is the interface for implementing the storage mechanism for the DHT type Store interface { // Store a key/value pair for the queries node with the replication @@ -72,6 +82,12 @@ type Store interface { RetrieveBatchValues(ctx context.Context, keys []string, getFromCloud bool) ([][]byte, int, error) + // RetrieveBatchLocalStatus returns local key status without cloud fetches or writes. + RetrieveBatchLocalStatus(ctx context.Context, keys []string) (map[string]LocalKeyStatus, error) + + // ListLocalKeysPage returns keys strictly greater than afterKey, ordered ascending, bounded by limit. + ListLocalKeysPage(ctx context.Context, afterKey string, limit int) ([]string, error) + BatchDeleteRepKeys(keys []string) error IncrementAttempts(keys []string) error diff --git a/p2p/kademlia/store/mem/mem.go b/p2p/kademlia/store/mem/mem.go index 014ef0cb..447ed949 100644 --- a/p2p/kademlia/store/mem/mem.go +++ b/p2p/kademlia/store/mem/mem.go @@ -3,6 +3,7 @@ package mem import ( "context" "errors" + "sort" "sync" "time" @@ -134,7 +135,7 @@ func (s *Store) GetOwnCreatedAt(_ context.Context) (t time.Time, err error) { } // StoreBatchRepKeys ... -func (s *Store) StoreBatchRepKeys(_ []string, _ string, _ string, _ int) error { +func (s *Store) StoreBatchRepKeys(_ []string, _ string, _ string, _ uint16) error { return nil } @@ -163,6 +164,46 @@ func (s *Store) RetrieveBatchValues(_ context.Context, _ []string, _ bool) ([][] return nil, 0, nil } +func (s *Store) RetrieveBatchLocalStatus(_ context.Context, keys []string) (map[string]kademlia.LocalKeyStatus, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + out := make(map[string]kademlia.LocalKeyStatus, len(keys)) + for _, key := range keys { + v, ok := s.data[key] + if !ok { + out[key] = kademlia.LocalKeyStatus{} + continue + } + out[key] = kademlia.LocalKeyStatus{ + Exists: true, + HasLocalBlob: len(v) > 0, + DataLen: len(v), + } + } + return out, nil +} + +func (s *Store) ListLocalKeysPage(_ context.Context, afterKey string, limit int) ([]string, error) { + if limit <= 0 { + limit = 1000 + } + s.mutex.RLock() + defer s.mutex.RUnlock() + + keys := make([]string, 0, len(s.data)) + for k := range s.data { + if k > afterKey { + keys = append(keys, k) + } + } + sort.Strings(keys) + if len(keys) > limit { + keys = keys[:limit] + } + return keys, nil +} + // BatchDeleteRepKeys deletes a batch of keys from the replication table func (s *Store) BatchDeleteRepKeys(_ []string) error { return nil diff --git a/p2p/kademlia/store/sqlite/replication.go b/p2p/kademlia/store/sqlite/replication.go index 3616553e..cac9a4a2 100644 --- a/p2p/kademlia/store/sqlite/replication.go +++ b/p2p/kademlia/store/sqlite/replication.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/cenkalti/backoff/v4" @@ -485,6 +486,79 @@ func (s Store) RetrieveBatchValues(ctx context.Context, keys []string, getFromCl return retrieveBatchValues(ctx, s.db, keys, getFromCloud, s) } +// RetrieveBatchLocalStatus returns local key status without cloud fetches or writes. +func (s *Store) RetrieveBatchLocalStatus(ctx context.Context, keys []string) (map[string]kademlia.LocalKeyStatus, error) { + out := make(map[string]kademlia.LocalKeyStatus, len(keys)) + if len(keys) == 0 { + return out, nil + } + for _, k := range keys { + out[k] = kademlia.LocalKeyStatus{} + } + + for _, batchKeys := range chunkStrings(keys, sqliteMaxVars) { + placeholders := make([]string, len(batchKeys)) + args := make([]interface{}, len(batchKeys)) + for i, key := range batchKeys { + placeholders[i] = "?" + args[i] = key + } + + query := fmt.Sprintf(`SELECT key, length(data) AS data_len, datatype, is_original, is_on_cloud FROM data WHERE key IN (%s)`, strings.Join(placeholders, ",")) + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("retrieve local key status: %w", err) + } + + for rows.Next() { + var key string + var dataLen sql.NullInt64 + var datatype sql.NullInt64 + var isOriginal bool + var isOnCloud bool + if err := rows.Scan(&key, &dataLen, &datatype, &isOriginal, &isOnCloud); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("scan local key status: %w", err) + } + dl := 0 + if dataLen.Valid { + dl = int(dataLen.Int64) + } + dt := 0 + if datatype.Valid { + dt = int(datatype.Int64) + } + out[key] = kademlia.LocalKeyStatus{ + Exists: true, + HasLocalBlob: dl > 0, + DataLen: dl, + Datatype: dt, + IsOriginal: isOriginal, + IsOnCloud: isOnCloud, + } + } + + if err := rows.Err(); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("rows local key status: %w", err) + } + _ = rows.Close() + } + + return out, nil +} + +func (s *Store) ListLocalKeysPage(ctx context.Context, afterKey string, limit int) ([]string, error) { + if limit <= 0 { + limit = 1000 + } + keys := make([]string, 0, limit) + if err := s.db.SelectContext(ctx, &keys, `SELECT key FROM data WHERE key > ? ORDER BY key ASC LIMIT ?`, afterKey, limit); err != nil { + return nil, fmt.Errorf("list local keys page: %w", err) + } + return keys, nil +} + // RetrieveBatchValues returns a list of values for the given keys (hex-encoded). func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFromCloud bool, s Store) ([][]byte, int, error) { // Early return diff --git a/supernode/adaptors/lumera.go b/supernode/adaptors/lumera.go index aa093d36..2e55f856 100644 --- a/supernode/adaptors/lumera.go +++ b/supernode/adaptors/lumera.go @@ -9,6 +9,8 @@ import ( sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) +const topSupernodesState = "SUPERNODE_STATE_ACTIVE" + type LumeraClient interface { GetAction(ctx context.Context, actionID string) (*actiontypes.QueryGetActionResponse, error) GetTopSupernodes(ctx context.Context, blockHeight uint64) (*sntypes.QueryGetTopSuperNodesForBlockResponse, error) @@ -30,6 +32,8 @@ func (l *lumeraImpl) GetAction(ctx context.Context, actionID string) (*actiontyp func (l *lumeraImpl) GetTopSupernodes(ctx context.Context, blockHeight uint64) (*sntypes.QueryGetTopSuperNodesForBlockResponse, error) { return l.c.SuperNode().GetTopSuperNodesForBlock(ctx, &sntypes.QueryGetTopSuperNodesForBlockRequest{ BlockHeight: int32(blockHeight), + // Top-set selection is used for write/finalization eligibility, so keep it Active-only. + State: topSupernodesState, }) }