diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 0d1591b0..f58c2b41 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -147,45 +147,43 @@ func (w *PollingClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H } func (w *PollingClient) pollHeads() { - // To prevent polls from stacking up in case HTTP requests - // are slow, use a similar model to the driver in which - // polls are requested manually after each header is fetched. - reqPollAfter := func() { - if w.pollRate == 0 { - return - } - time.AfterFunc(w.pollRate, w.reqPoll) - } - reqPollAfter() + w.reqPoll() defer close(w.closedCh) for { select { case <-w.pollReqCh: - // We don't need backoff here because we'll just try again - // after the pollRate elapses. + pollStart := time.Now() head, err := w.queryHeader() + queryDur := time.Since(pollStart) + if err != nil { w.lg.Info("Error getting latest header", "err", err) - reqPollAfter() + w.scheduleNextPoll(nil, queryDur, false) continue } if w.currHead != nil && w.currHead.Hash() == head.Hash() { w.lg.Trace("No change in head, skipping notifications") - reqPollAfter() + w.scheduleNextPoll(head, queryDur, false) continue } - w.lg.Trace("Notifying subscribers of new head", "head", head.Hash()) + headTime := time.Unix(int64(head.Time), 0) + w.lg.Trace( + "Notifying subscribers of new head", + "height", head.Number, + "headTime", headTime.Format("15:04:05"), + "head", head.Hash(), + ) w.currHead = head w.mtx.RLock() for _, sub := range w.subs { sub <- head } w.mtx.RUnlock() - reqPollAfter() + w.scheduleNextPoll(head, queryDur, true) case <-w.ctx.Done(): w.Client.Close() return @@ -199,8 +197,48 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) } +// scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time: +// - Goal: align the next request to head.Time + pollRate (close to when the next block appears) +func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration, changed bool) { + if w.pollRate == 0 { + return + } + + const ( + minDelay = 200 * time.Millisecond + minDelayNoChange = 1 * time.Second + ) + + // Retry on failure + if head == nil { + time.AfterFunc(minDelay, w.reqPoll) + return + } + + // Align next poll to headTime + pollRate with a lead time. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(-queryDur + minDelay) + delay := time.Until(target) + + if delay < minDelay { + if changed { + delay = minDelay + } else { + delay = minDelayNoChange + } + } + if delay > w.pollRate { + delay = w.pollRate + } + w.lg.Trace("Scheduled next poll", "delay", delay) + time.AfterFunc(delay, w.reqPoll) +} + func (w *PollingClient) reqPoll() { - w.pollReqCh <- struct{}{} + // non-blocking send + select { + case w.pollReqCh <- struct{}{}: + default: + } } func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eventSig string) ([]types.Log, error) { diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index 180ba628..e7aa2dd3 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -191,26 +191,26 @@ func (n *EsNode) startL1(cfg *Config) { } n.lg.Error("L1 heads subscription error", "err", err) }() - - // Keep subscribed to the randao heads, which helps miner to get proper random seeds - n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { - if err != nil { - n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) - } - if n.randaoSource != nil { - return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) - } else { - return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) - } - }) - go func() { - err, ok := <-n.randaoHeadsSub.Err() - if !ok { - return - } - n.lg.Error("Randao heads subscription error", "err", err) - }() - + if n.miner != nil { + // Keep subscribed to the randao heads, which helps miner to get proper random seeds + n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) + } + if n.randaoSource != nil { + return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) + } else { + return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) + } + }) + go func() { + err, ok := <-n.randaoHeadsSub.Err() + if !ok { + return + } + n.lg.Error("Randao heads subscription error", "err", err) + }() + } // Poll for the safe L1 block and finalized block, // which only change once per epoch at most and may be delayed. n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.lg, n.l1Source, n.OnNewL1Safe, ethRPC.SafeBlockNumber,