Skip to content

Commit 1b28b6c

Browse files
authored
Merge pull request #13 from optimism-java/add-rpc-manage
add rpc manager
2 parents 9e8acb0 + 07926fb commit 1b28b6c

File tree

11 files changed

+738
-30
lines changed

11 files changed

+738
-30
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ mysql1
2727
meili_data
2828
postgres
2929
mysql
30+
MIGRATION_STATUS.md
31+
RPC_MANAGER_GUIDE.md
3032

build/dispute-explorer

45.7 MB
Binary file not shown.

internal/handler/handler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
package handler
22

33
import (
4+
"time"
5+
46
"github.com/optimism-java/dispute-explorer/internal/svc"
7+
"github.com/optimism-java/dispute-explorer/pkg/rpc"
58
)
69

710
func Run(ctx *svc.ServiceContext) {
11+
// Start RPC monitoring
12+
go startRPCMonitoring(ctx)
13+
814
// query last block number
915
go LatestBlackNumber(ctx)
1016
// sync blocks
@@ -20,3 +26,12 @@ func Run(ctx *svc.ServiceContext) {
2026
// sync claim len
2127
go SyncClaimDataLen(ctx)
2228
}
29+
30+
// startRPCMonitoring starts RPC monitoring (internal function)
31+
func startRPCMonitoring(ctx *svc.ServiceContext) {
32+
// Create monitor, output statistics every 30 seconds
33+
monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second)
34+
35+
// Start monitoring
36+
monitor.Start(ctx.Context)
37+
}

internal/handler/latestBlockNumber.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ import (
1212

1313
func LatestBlackNumber(ctx *svc.ServiceContext) {
1414
for {
15-
latest, err := ctx.L1RPC.BlockNumber(context.Background())
15+
// use unified RPC manager to get latest block number (automatically applies rate limiting)
16+
latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true) // true indicates L1
1617
if err != nil {
17-
log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err))
18+
log.Errorf("[Handler.LatestBlackNumber] Get latest block number error (with rate limit): %s\n", errors.WithStack(err))
1819
time.Sleep(12 * time.Second)
1920
continue
2021
}
2122

2223
ctx.LatestBlockNumber = cast.ToInt64(latest)
23-
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest)
24+
log.Infof("[Handler.LatestBlackNumber] Latest block number: %d (via RPC Manager)\n", latest)
2425
time.Sleep(12 * time.Second)
2526
}
2627
}

internal/handler/logFilter.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ func LogFilter(ctx *svc.ServiceContext, block schema.SyncBlock, addresses []comm
2424
Topics: topics,
2525
Addresses: addresses,
2626
}
27-
logs, err := ctx.L1RPC.FilterLogs(context.Background(), query)
27+
// use unified RPC manager to filter logs (automatically applies rate limiting)
28+
logs, err := ctx.RPCManager.FilterLogs(context.Background(), query, true) // true indicates L1
2829
if err != nil {
2930
return nil, errors.WithStack(err)
3031
}
31-
log.Infof("[CancelOrder.Handle] Cancel Pending List Length is %d ,block number is %d \n", len(logs), block.BlockNumber)
32+
log.Infof("[LogFilter] Event logs length is %d, block number is %d (via RPC Manager)\n", len(logs), block.BlockNumber)
3233
return LogsToEvents(ctx, logs, block.ID)
3334
}
3435

@@ -49,22 +50,22 @@ func LogsToEvents(ctx *svc.ServiceContext, logs []types.Log, syncBlockID int64)
4950
blockNumber := cast.ToInt64(vlog.BlockNumber)
5051
log.Infof("[LogsToEvents] Fetching block info for block number: %d, txHash: %s", blockNumber, vlog.TxHash.Hex())
5152

52-
// Try to get block using L1RPC client first
53-
block, err := ctx.L1RPC.BlockByNumber(context.Background(), big.NewInt(blockNumber))
53+
// Use unified RPC manager to get block (automatically applies rate limiting)
54+
block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1
5455
if err != nil {
55-
log.Errorf("[LogsToEvents] BlockByNumber failed for block %d, txHash: %s, error: %s", blockNumber, vlog.TxHash.Hex(), err.Error())
56+
log.Errorf("[LogsToEvents] GetBlockByNumber failed for block %d, txHash: %s, error: %s (via RPC Manager)", blockNumber, vlog.TxHash.Hex(), err.Error())
5657

5758
// If error contains "transaction type not supported", try alternative approach
5859
if strings.Contains(err.Error(), "transaction type not supported") {
5960
log.Infof("[LogsToEvents] Attempting to get block timestamp using header only for block %d", blockNumber)
60-
header, headerErr := ctx.L1RPC.HeaderByNumber(context.Background(), big.NewInt(blockNumber))
61+
header, headerErr := ctx.RPCManager.HeaderByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1
6162
if headerErr != nil {
62-
log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s", blockNumber, headerErr.Error())
63+
log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s (via RPC Manager)", blockNumber, headerErr.Error())
6364
return nil, errors.WithStack(err)
6465
}
6566
blockTime = cast.ToInt64(header.Time)
6667
blockTimes[blockNumber] = blockTime
67-
log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header", blockTime, blockNumber)
68+
log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header (via RPC Manager)", blockTime, blockNumber)
6869
} else {
6970
return nil, errors.WithStack(err)
7071
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"time"
7+
8+
"github.com/optimism-java/dispute-explorer/internal/svc"
9+
"github.com/optimism-java/dispute-explorer/pkg/log"
10+
"github.com/optimism-java/dispute-explorer/pkg/rpc"
11+
"github.com/pkg/errors"
12+
"github.com/spf13/cast"
13+
)
14+
15+
// LatestBlockNumberWithRateLimit uses unified RPC manager for latest block number retrieval
16+
func LatestBlockNumberWithRateLimit(ctx *svc.ServiceContext) {
17+
for {
18+
// use unified RPC manager to get L1 latest block number (with rate limiting)
19+
latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true)
20+
if err != nil {
21+
log.Errorf("[Handler.LatestBlockNumberWithRateLimit] Get latest block number error: %s\n", errors.WithStack(err))
22+
time.Sleep(12 * time.Second)
23+
continue
24+
}
25+
26+
ctx.LatestBlockNumber = cast.ToInt64(latest)
27+
log.Infof("[Handler.LatestBlockNumberWithRateLimit] Latest block number: %d (using RPC Manager)\n", latest)
28+
time.Sleep(12 * time.Second)
29+
}
30+
}
31+
32+
// SyncBlockWithRateLimit uses unified RPC manager for block synchronization
33+
func SyncBlockWithRateLimit(ctx *svc.ServiceContext) {
34+
for {
35+
// Check pending block count
36+
// ... existing check logic ...
37+
38+
syncingBlockNumber := ctx.SyncedBlockNumber + 1
39+
log.Infof("[Handler.SyncBlockWithRateLimit] Try to sync block number: %d using RPC Manager\n", syncingBlockNumber)
40+
41+
if syncingBlockNumber > ctx.LatestBlockNumber {
42+
time.Sleep(3 * time.Second)
43+
continue
44+
}
45+
46+
// Use unified RPC manager to get block (automatically handles rate limiting)
47+
block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(syncingBlockNumber), true)
48+
if err != nil {
49+
log.Errorf("[Handler.SyncBlockWithRateLimit] Get block by number error: %s\n", errors.WithStack(err))
50+
time.Sleep(3 * time.Second)
51+
continue
52+
}
53+
54+
log.Infof("[Handler.SyncBlockWithRateLimit] Got block number: %d, hash: %v, parent hash: %v\n",
55+
block.Number().Int64(), block.Hash().Hex(), block.ParentHash().Hex())
56+
57+
// Verify parent hash
58+
if block.ParentHash() != ctx.SyncedBlockHash {
59+
log.Errorf("[Handler.SyncBlockWithRateLimit] ParentHash mismatch: expected %s, got %s\n",
60+
ctx.SyncedBlockHash.Hex(), block.ParentHash().Hex())
61+
// Can call rollback logic here
62+
time.Sleep(3 * time.Second)
63+
continue
64+
}
65+
66+
// Save block to database
67+
// ... existing database save logic ...
68+
69+
// Update sync status
70+
ctx.SyncedBlockNumber = block.Number().Int64()
71+
ctx.SyncedBlockHash = block.Hash()
72+
73+
log.Infof("[Handler.SyncBlockWithRateLimit] Successfully synced block %d\n", block.Number().Int64())
74+
}
75+
}
76+
77+
// GetBlockByNumberHTTP gets block using HTTP method (with rate limiting)
78+
func GetBlockByNumberHTTP(ctx *svc.ServiceContext, blockNumber int64) ([]byte, error) {
79+
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" +
80+
cast.ToString(blockNumber) + "\", true],\"id\":1}"
81+
82+
// Use unified RPC manager for HTTP calls (automatically handles rate limiting)
83+
return ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true)
84+
}
85+
86+
// MigrateExistingHandlers example of migrating existing handlers
87+
func MigrateExistingHandlers(ctx *svc.ServiceContext) {
88+
log.Infof("[Handler.Migration] Starting migration to RPC Manager")
89+
90+
// Start handlers with rate limiting
91+
go LatestBlockNumberWithRateLimit(ctx)
92+
go SyncBlockWithRateLimit(ctx)
93+
94+
// Start RPC monitoring
95+
go StartRPCMonitoring(ctx)
96+
97+
log.Infof("[Handler.Migration] All handlers migrated to use RPC Manager")
98+
}
99+
100+
// StartRPCMonitoring starts RPC monitoring
101+
func StartRPCMonitoring(ctx *svc.ServiceContext) {
102+
// Create monitor
103+
monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second)
104+
105+
// Start monitoring
106+
monitor.Start(ctx.Context)
107+
}
108+
109+
// Compatibility functions: provide smooth migration for existing code
110+
func GetL1Client(ctx *svc.ServiceContext) interface{} {
111+
// Return rate-limited client wrapper
112+
return ctx.RPCManager.GetRawClient(true)
113+
}
114+
115+
func GetL2Client(ctx *svc.ServiceContext) interface{} {
116+
// Return rate-limited client wrapper
117+
return ctx.RPCManager.GetRawClient(false)
118+
}

internal/handler/syncBlock.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handler
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -58,15 +59,16 @@ func SyncBlock(ctx *svc.ServiceContext) {
5859
continue
5960
}
6061

61-
// block, err := ctx.RPC.BlockByNumber(context.Background(), big.NewInt(syncingBlockNumber))
62-
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", syncingBlockNumber)+"\", true],\"id\":1}")
62+
// use unified RPC manager to get block (automatically applies rate limiting)
63+
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", syncingBlockNumber) + "\", true],\"id\":1}"
64+
blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1
6365
if err != nil {
64-
log.Errorf("[Handler.SyncBlock] Syncing block by number error: %s\n", errors.WithStack(err))
66+
log.Errorf("[Handler.SyncBlock] Syncing block by number error (with rate limit): %s\n", errors.WithStack(err))
6567
time.Sleep(3 * time.Second)
6668
continue
6769
}
6870
block := rpc.ParseJSONBlock(string(blockJSON))
69-
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v \n", block.Number(), block.Hash(), block.ParentHash())
71+
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v (via RPC Manager)\n", block.Number(), block.Hash(), block.ParentHash())
7072

7173
if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash {
7274
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash)
@@ -102,16 +104,18 @@ func rollbackBlock(ctx *svc.ServiceContext) {
102104
for {
103105
rollbackBlockNumber := ctx.SyncedBlockNumber
104106

105-
log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)
107+
log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)
106108

107-
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}")
109+
// use unified RPC manager for rollback operation (automatically applies rate limiting)
110+
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", rollbackBlockNumber) + "\", true],\"id\":1}"
111+
blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1
108112
if err != nil {
109-
log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err))
113+
log.Errorf("[Handler.SyncBlock.RollBackBlock] Rollback block by number error (with rate limit): %s\n", errors.WithStack(err))
110114
continue
111115
}
112116

113117
rollbackBlock := rpc.ParseJSONBlock(string(blockJSON))
114-
log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash)
118+
log.Errorf("[Handler.SyncBlock.RollBackBlock] rollbackBlock: %s, syncedBlockHash: %s (via RPC Manager)\n", rollbackBlock.Hash(), ctx.SyncedBlockHash)
115119

116120
if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash {
117121
err = ctx.DB.Transaction(func(tx *gorm.DB) error {

internal/svc/svc.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ethereum/go-ethereum/common"
1111
"github.com/ethereum/go-ethereum/ethclient"
1212
"github.com/optimism-java/dispute-explorer/internal/types"
13+
"github.com/optimism-java/dispute-explorer/pkg/rpc"
1314
"gorm.io/gorm"
1415
"gorm.io/gorm/logger"
1516
)
@@ -18,8 +19,9 @@ var svc *ServiceContext
1819

1920
type ServiceContext struct {
2021
Config *types.Config
21-
L1RPC *ethclient.Client
22-
L2RPC *ethclient.Client
22+
L1RPC *ethclient.Client // 保留向后兼容
23+
L2RPC *ethclient.Client // 保留向后兼容
24+
RPCManager *rpc.Manager // 新增统一RPC管理器
2325
DB *gorm.DB
2426
LatestBlockNumber int64
2527
SyncedBlockNumber int64
@@ -45,22 +47,30 @@ func NewServiceContext(ctx context.Context, cfg *types.Config) *ServiceContext {
4547
// SetConnMaxLifetime
4648
sqlDB.SetConnMaxLifetime(time.Duration(cfg.MySQLConnMaxLifetime) * time.Second)
4749

48-
rpc, err := ethclient.Dial(cfg.L1RPCUrl)
50+
// 创建原有的以太坊客户端(保持向后兼容)
51+
l1Client, err := ethclient.Dial(cfg.L1RPCUrl)
4952
if err != nil {
50-
log.Panicf("[svc] get eth client panic: %s\n", err)
53+
log.Panicf("[svc] get L1 eth client panic: %s\n", err)
5154
}
5255

53-
rpc2, err := ethclient.Dial(cfg.L2RPCUrl)
56+
l2Client, err := ethclient.Dial(cfg.L2RPCUrl)
5457
if err != nil {
55-
log.Panicf("[svc] get eth client panic: %s\n", err)
58+
log.Panicf("[svc] get L2 eth client panic: %s\n", err)
59+
}
60+
61+
// 创建统一的RPC管理器
62+
rpcManager, err := rpc.CreateManagerFromConfig(cfg)
63+
if err != nil {
64+
log.Panicf("[svc] create RPC manager panic: %s\n", err)
5665
}
5766

5867
svc = &ServiceContext{
59-
Config: cfg,
60-
L1RPC: rpc,
61-
L2RPC: rpc2,
62-
DB: storage,
63-
Context: ctx,
68+
Config: cfg,
69+
L1RPC: l1Client, // 保留向后兼容
70+
L2RPC: l2Client, // 保留向后兼容
71+
RPCManager: rpcManager, // 新的统一管理器
72+
DB: storage,
73+
Context: ctx,
6474
}
6575
return svc
6676
}

pkg/rpc/factory.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package rpc
2+
3+
import (
4+
"time"
5+
6+
"github.com/optimism-java/dispute-explorer/internal/types"
7+
)
8+
9+
// CreateManagerFromConfig creates RPC manager from configuration
10+
func CreateManagerFromConfig(config *types.Config) (*Manager, error) {
11+
return NewManager(Config{
12+
L1RPCUrl: config.L1RPCUrl,
13+
L2RPCUrl: config.L2RPCUrl,
14+
ProxyURL: "", // if proxy is needed, can be added from configuration
15+
RateLimit: config.RPCRateLimit,
16+
RateBurst: config.RPCRateBurst,
17+
HTTPTimeout: 10 * time.Second,
18+
})
19+
}
20+
21+
// CreateManagerWithSeparateLimits creates manager with different L1/L2 limits
22+
func CreateManagerWithSeparateLimits(
23+
l1URL, l2URL string,
24+
l1Rate, l1Burst, _, _ int, // l2Rate, l2Burst unused for now
25+
) (*Manager, error) {
26+
// Note: current implementation uses same limits for L1 and L2
27+
// if different limits are needed, Manager structure needs to be modified
28+
return NewManager(Config{
29+
L1RPCUrl: l1URL,
30+
L2RPCUrl: l2URL,
31+
RateLimit: l1Rate, // 使用L1的限制作为默认
32+
RateBurst: l1Burst,
33+
HTTPTimeout: 10 * time.Second,
34+
})
35+
}
36+
37+
// WrapExistingClient wraps existing ethclient.Client (for backward compatibility)
38+
func WrapExistingClient(config *types.Config, _, _ interface{}) (*Manager, error) {
39+
// create new manager but maintain backward compatibility
40+
manager, err := CreateManagerFromConfig(config)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
// logic can be added here to integrate existing clients
46+
// for now, return newly created manager
47+
return manager, nil
48+
}

0 commit comments

Comments
 (0)