Skip to content

Commit e5f2d3a

Browse files
committed
Self Healing
1 parent 111b9fe commit e5f2d3a

File tree

8 files changed

+590
-8
lines changed

8 files changed

+590
-8
lines changed

p2p/kademlia/dht.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,13 @@ func (s *DHT) eligibleForRouting(n *Node) bool {
207207
if integrationTestEnabled() {
208208
return true
209209
}
210-
// Strict gating: only explicitly allowlisted peers can participate in read/routing.
211-
if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 {
210+
// Bootstrap fail-open: until the first successful allowlist sync, do not block routing.
211+
// This avoids startup isolation during transient chain/API failures.
212+
if !s.routingAllowReady.Load() {
213+
return true
214+
}
215+
// Once initialized, an empty routing set means no routing-eligible peers.
216+
if s.routingAllowCount.Load() == 0 {
212217
return false
213218
}
214219
if n == nil || len(n.ID) == 0 {
@@ -269,8 +274,12 @@ func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node {
269274
if integrationTestEnabled() {
270275
return nodes
271276
}
272-
// Strict gating: without a routing allowlist there are no eligible routing peers.
273-
if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 {
277+
// Bootstrap fail-open: before first allowlist sync, keep routing candidates.
278+
if !s.routingAllowReady.Load() {
279+
return nodes
280+
}
281+
// Once initialized, an empty allowlist means no routing-eligible peers.
282+
if s.routingAllowCount.Load() == 0 {
274283
return nil
275284
}
276285

@@ -497,6 +506,7 @@ func (s *DHT) Start(ctx context.Context) error {
497506
}
498507

499508
go s.StartReplicationWorker(ctx)
509+
go s.startRebalanceWorker(ctx)
500510
go s.startDisabledKeysCleanupWorker(ctx)
501511
// TEMPORARY: disabled to pause redundant-key classification into del_keys.
502512
// Re-enable once deletion-safety behavior is finalized.
@@ -2081,8 +2091,8 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node {
20812091
}
20822092
node.SetHashedID()
20832093

2084-
// Chain-state gating: only allow Active supernodes into the routing table.
2085-
// This prevents postponed/disabled/stopped nodes from being admitted via inbound traffic.
2094+
// Chain-state gating: only routing-eligible supernodes (Active + Postponed)
2095+
// can be admitted into the routing table.
20862096
if !s.eligibleForRouting(node) {
20872097
logtrace.Debug(ctx, "Rejecting node: not eligible for routing", logtrace.Fields{
20882098
logtrace.FieldModule: "p2p",

p2p/kademlia/message.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
BatchFindNode
3131
// BatchGetValues finds values in kademlia network
3232
BatchGetValues
33+
// BatchProbeKeys checks local key presence/status without side effects
34+
BatchProbeKeys
3335
)
3436

3537
func init() {
@@ -49,6 +51,8 @@ func init() {
4951
gob.Register(&BatchFindNodeResponse{})
5052
gob.Register(&BatchGetValuesRequest{})
5153
gob.Register(&BatchGetValuesResponse{})
54+
gob.Register(&BatchProbeKeysRequest{})
55+
gob.Register(&BatchProbeKeysResponse{})
5256
}
5357

5458
type MessageWithError struct {
@@ -166,6 +170,17 @@ type BatchGetValuesResponse struct {
166170
Status ResponseStatus
167171
}
168172

173+
// BatchProbeKeysRequest defines the request data for side-effect-free local key probes.
174+
type BatchProbeKeysRequest struct {
175+
Keys []string // hex-encoded keys
176+
}
177+
178+
// BatchProbeKeysResponse defines the response data for local key probes.
179+
type BatchProbeKeysResponse struct {
180+
Status ResponseStatus
181+
Data map[string]LocalKeyStatus // key -> local status
182+
}
183+
169184
// encode the message
170185
func encode(message *Message) ([]byte, error) {
171186
var buf bytes.Buffer

p2p/kademlia/network.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func init() {
4747
FindValue: 5 * time.Second,
4848
BatchFindValues: 60 * time.Second, // responder compresses
4949
BatchGetValues: 75 * time.Second, // large, sometimes cloud fetch then send back
50+
BatchProbeKeys: 10 * time.Second,
5051
StoreData: 10 * time.Second,
5152
BatchStoreData: 75 * time.Second, // large uncompressed payloads
5253
Replicate: 90 * time.Second,
@@ -235,6 +236,37 @@ func (s *Network) handleFindValue(ctx context.Context, message *Message) (res []
235236
return s.encodeMesage(resMsg)
236237
}
237238

239+
func (s *Network) handleBatchProbeKeys(ctx context.Context, message *Message) (res []byte, err error) {
240+
defer func() {
241+
if response, err := s.handlePanic(ctx, message.Sender, BatchProbeKeys); response != nil || err != nil {
242+
res = response
243+
}
244+
}()
245+
246+
request, ok := message.Data.(*BatchProbeKeysRequest)
247+
if !ok {
248+
err := errors.New("invalid BatchProbeKeysRequest")
249+
return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error())
250+
}
251+
252+
// Keep routing table fresh while handling probe traffic.
253+
s.dht.addNode(ctx, message.Sender)
254+
255+
statuses, err := s.dht.store.RetrieveBatchLocalStatus(ctx, request.Keys)
256+
if err != nil {
257+
err = errors.Errorf("batch probe keys: %w", err)
258+
return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error())
259+
}
260+
261+
response := &BatchProbeKeysResponse{
262+
Status: ResponseStatus{Result: ResultOk},
263+
Data: statuses,
264+
}
265+
266+
resMsg := s.dht.newMessage(BatchProbeKeys, message.Sender, response)
267+
return s.encodeMesage(resMsg)
268+
}
269+
238270
func (s *Network) handleStoreData(ctx context.Context, message *Message) (res []byte, err error) {
239271
defer func() {
240272
if response, err := s.handlePanic(ctx, message.Sender, StoreData); response != nil || err != nil {
@@ -480,6 +512,10 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) {
480512
response, hErr = s.withMetrics(BatchGetValues, func() ([]byte, error) {
481513
return s.handleGetValuesRequest(ctx, request, reqID)
482514
})
515+
case BatchProbeKeys:
516+
response, hErr = s.withMetrics(BatchProbeKeys, func() ([]byte, error) {
517+
return s.handleBatchProbeKeys(ctx, request)
518+
})
483519
default:
484520
// count unknown types as failure and return
485521
m := s.metricsFor(mt)
@@ -1321,6 +1357,8 @@ func (s *Network) generateResponseMessage(ctx context.Context, messageType int,
13211357
response = &ReplicateDataResponse{Status: responseStatus}
13221358
case BatchGetValues:
13231359
response = &BatchGetValuesResponse{Status: responseStatus}
1360+
case BatchProbeKeys:
1361+
response = &BatchProbeKeysResponse{Status: responseStatus}
13241362
default:
13251363
return nil, fmt.Errorf("unsupported message type %d", messageType)
13261364
}
@@ -1461,6 +1499,8 @@ func msgName(t int) string {
14611499
return "BatchStoreData"
14621500
case Replicate:
14631501
return "Replicate"
1502+
case BatchProbeKeys:
1503+
return "BatchProbeKeys"
14641504
default:
14651505
return fmt.Sprintf("Type_%d", t)
14661506
}
@@ -1501,7 +1541,7 @@ func (s *Network) HandleMetricsSnapshot() map[string]HandleCounters {
15011541
func readDeadlineFor(msgType int, overall time.Duration) time.Duration {
15021542
small := 10 * time.Second
15031543
switch msgType {
1504-
case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData:
1544+
case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData, BatchProbeKeys:
15051545
if overall > small+1*time.Second {
15061546
return small
15071547
}

0 commit comments

Comments
 (0)