Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions p2p/kademlia/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
)

const (
bootstrapRefreshInterval = 10 * time.Minute
defaultSuperNodeP2PPort int = 4445
bootstrapRefreshInterval = 10 * time.Minute
defaultSuperNodeP2PPort int = 4445
superNodeStateActive int32 = 1
superNodeStatePostponed int32 = 5
)

// seed a couple of obviously bad addrs (unless in integration tests)
Expand Down Expand Up @@ -108,15 +110,20 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes
return nil
}

// loadBootstrapCandidatesFromChain returns active supernodes (by latest state)
// mapped by "ip:port". No pings here.
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) {
// loadBootstrapCandidatesFromChain returns routing candidates (by latest state) mapped by "ip:port",
// plus two allowlists:
// - routingIDs: Active + Postponed
// - storeIDs: Active only
//
// No pings here.
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, map[[32]byte]struct{}, error) {
resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to list supernodes: %w", err)
return nil, nil, nil, fmt.Errorf("failed to list supernodes: %w", err)
}

activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
routingIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
storeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
mapNodes := make(map[string]*Node, len(resp.Supernodes))
for _, sn := range resp.Supernodes {
if len(sn.States) == 0 {
Expand All @@ -130,7 +137,9 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
latestState = int32(st.State)
}
}
if latestState != 1 { // SuperNodeStateActive = 1
// Routing/read eligibility includes Active + Postponed.
// Store/write eligibility remains Active-only.
if latestState != superNodeStateActive && latestState != superNodeStatePostponed {
continue
}

Expand All @@ -148,7 +157,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
} else if len(h) == 32 {
var key [32]byte
copy(key[:], h)
activeIDs[key] = struct{}{}
routingIDs[key] = struct{}{}
if latestState == superNodeStateActive {
storeIDs[key] = struct{}{}
}
}

// latest IP by height
Expand Down Expand Up @@ -190,7 +202,7 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
node.ID = []byte(id)
mapNodes[full] = node
}
return mapNodes, activeIDs, nil
return mapNodes, routingIDs, storeIDs, nil
}

// upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false).
Expand Down Expand Up @@ -245,6 +257,24 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil {
return err
}
allow := make(map[[32]byte]struct{}, len(s.options.BootstrapNodes))
for _, n := range s.options.BootstrapNodes {
if n == nil || len(n.ID) == 0 {
continue
}
h, err := utils.Blake3Hash(n.ID)
if err != nil || len(h) != 32 {
continue
}
var key [32]byte
copy(key[:], h)
allow[key] = struct{}{}
}
// Config bootstrap has no chain states; treat provided peers as both routing/store-eligible.
s.setRoutingAllowlist(ctx, allow)
s.setStoreAllowlist(ctx, allow)
s.pruneIneligibleRoutingPeers(ctx)

for _, n := range s.options.BootstrapNodes {
if err := s.upsertBootstrapNode(ctx, n); err != nil {
logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{
Expand All @@ -265,14 +295,16 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
}
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)

cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
cands, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
if err != nil {
return err
}

// Update eligibility gate from chain Active state and prune any peers that slipped in via
// Update routing/read gate from chain state and prune any peers that slipped in via
// inbound traffic before the last bootstrap refresh.
s.setRoutingAllowlist(ctx, activeIDs)
s.setRoutingAllowlist(ctx, routingIDs)
// Write/replication targets are Active-only.
s.setStoreAllowlist(ctx, storeIDs)
s.pruneIneligibleRoutingPeers(ctx)

// Upsert candidates to replication_info
Expand Down Expand Up @@ -303,13 +335,6 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
// This keeps replication_info and routing table current as the validator set changes.
func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) {
go func() {
// Initial sync
if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil {
logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{
logtrace.FieldModule: "p2p",
logtrace.FieldError: err.Error(),
})
}
t := time.NewTicker(bootstrapRefreshInterval)
defer t.Stop()

Expand All @@ -331,11 +356,15 @@ func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string

// ConfigureBootstrapNodes wires to the new sync/refresher (no pings here).
func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error {
// One-time sync; start refresher in the background
// One-time sync attempt; keep service running if it fails and rely on refresher retries.
if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil {
return err
logtrace.Warn(ctx, "initial bootstrap sync failed; continuing with periodic refresher", logtrace.Fields{
logtrace.FieldModule: "p2p",
logtrace.FieldError: err.Error(),
})
}

// Always start periodic retries so transient chain/API outages can recover.
s.StartBootstrapRefresher(ctx, bootstrapNodes)

return nil
Expand Down
Loading