Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 55 additions & 17 deletions ethstorage/eth/polling_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,45 +147,43 @@ func (w *PollingClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H
}

func (w *PollingClient) pollHeads() {
// To prevent polls from stacking up in case HTTP requests
// are slow, use a similar model to the driver in which
// polls are requested manually after each header is fetched.
reqPollAfter := func() {
if w.pollRate == 0 {
return
}
time.AfterFunc(w.pollRate, w.reqPoll)
}

reqPollAfter()
w.reqPoll()

defer close(w.closedCh)

for {
select {
case <-w.pollReqCh:
// We don't need backoff here because we'll just try again
// after the pollRate elapses.
pollStart := time.Now()
head, err := w.queryHeader()
queryDur := time.Since(pollStart)

if err != nil {
w.lg.Info("Error getting latest header", "err", err)
reqPollAfter()
w.scheduleNextPoll(nil, queryDur, false)
continue
}
if w.currHead != nil && w.currHead.Hash() == head.Hash() {
w.lg.Trace("No change in head, skipping notifications")
reqPollAfter()
w.scheduleNextPoll(head, queryDur, false)
continue
}

w.lg.Trace("Notifying subscribers of new head", "head", head.Hash())
headTime := time.Unix(int64(head.Time), 0)
w.lg.Trace(
"Notifying subscribers of new head",
"height", head.Number,
"headTime", headTime.Format("15:04:05"),
"head", head.Hash(),
)
w.currHead = head
w.mtx.RLock()
for _, sub := range w.subs {
sub <- head
}
w.mtx.RUnlock()
reqPollAfter()
w.scheduleNextPoll(head, queryDur, true)
case <-w.ctx.Done():
w.Client.Close()
return
Expand All @@ -199,8 +197,48 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) {
return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64()))
}

// scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time:
// - Goal: align the next request to head.Time + pollRate (close to when the next block appears)
func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration, changed bool) {
if w.pollRate == 0 {
return
}

const (
minDelay = 200 * time.Millisecond
minDelayNoChange = 1 * time.Second
)

// Retry on failure
if head == nil {
time.AfterFunc(minDelay, w.reqPoll)
return
}

// Align next poll to headTime + pollRate with a lead time.
target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(-queryDur + minDelay)
delay := time.Until(target)

if delay < minDelay {
if changed {
delay = minDelay
} else {
delay = minDelayNoChange
}
}
if delay > w.pollRate {
delay = w.pollRate
}
w.lg.Trace("Scheduled next poll", "delay", delay)
time.AfterFunc(delay, w.reqPoll)
}

func (w *PollingClient) reqPoll() {
w.pollReqCh <- struct{}{}
// non-blocking send
select {
case w.pollReqCh <- struct{}{}:
default:
}
}

func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eventSig string) ([]types.Log, error) {
Expand Down
40 changes: 20 additions & 20 deletions ethstorage/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,26 @@ func (n *EsNode) startL1(cfg *Config) {
}
n.lg.Error("L1 heads subscription error", "err", err)
}()

// Keep subscribed to the randao heads, which helps miner to get proper random seeds
n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
n.lg.Warn("Resubscribing after failed randao head subscription", "err", err)
}
if n.randaoSource != nil {
return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead)
} else {
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead)
}
})
go func() {
err, ok := <-n.randaoHeadsSub.Err()
if !ok {
return
}
n.lg.Error("Randao heads subscription error", "err", err)
}()

if n.miner != nil {
// Keep subscribed to the randao heads, which helps miner to get proper random seeds
n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
n.lg.Warn("Resubscribing after failed randao head subscription", "err", err)
}
if n.randaoSource != nil {
return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead)
} else {
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead)
}
})
go func() {
err, ok := <-n.randaoHeadsSub.Err()
if !ok {
return
}
n.lg.Error("Randao heads subscription error", "err", err)
}()
}
// Poll for the safe L1 block and finalized block,
// which only change once per epoch at most and may be delayed.
n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.lg, n.l1Source, n.OnNewL1Safe, ethRPC.SafeBlockNumber,
Expand Down
Loading