diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..a7226d8 --- /dev/null +++ b/.env.example @@ -0,0 +1,61 @@ +# Environment Variables Configuration Example + +# ============= RPC Configuration ============= +# Multi-RPC Configuration - Use comma-separated RPC nodes for load balancing and fault tolerance +# +# Benefits of Multi-RPC Setup: +# - Load Distribution: Requests distributed across multiple endpoints +# - High Availability: Continues working if one RPC fails +# - Rate Limit Mitigation: Avoids hitting individual provider limits +# - Performance: Potential response time improvements +# - Cost Optimization: Distribute load across free/paid tiers +# +# Configuration Format: Use comma-separated URLs (no spaces) +# Example: "url1,url2,url3" + +# L1 RPC node list (Ethereum mainnet or testnet) +L1_RPC_URLS="https://eth-sepolia.g.alchemy.com/v2/your_key1,https://rpc.ankr.com/eth_sepolia,https://sepolia.infura.io/v3/your_key2" + +# L2 RPC node list (Optimism network) +L2_RPC_URLS="https://opt-sepolia.g.alchemy.com/v2/your_key1,https://optimism-sepolia.gateway.pokt.network/v1/lb/your_key2" + +# Node RPC node list (OP Node endpoints for rollup operations) +NODE_RPC_URLS="https://light-radial-slug.optimism-sepolia.quiknode.pro/e9329f699b371572a8cc5dd22d19d5940bb842a5/,https://node2.optimism-sepolia.quiknode.pro/your_key2/" + +# Single RPC Configuration (Legacy support): +# If you prefer single endpoints, just provide one URL: +# L1_RPC_URLS="https://eth-sepolia.g.alchemy.com/v2/your_key" + +# ============= RPC Rate Limiting ============= +# RPC request rate limit (requests per second) +RPC_RATE_LIMIT=5 +# RPC request burst limit +RPC_RATE_BURST=2 + +# ============= Logging Configuration ============= +# Log level: debug, info, warn, error, panic, fatal +LOG_LEVEL=info +# Log format: console, json +LOG_FORMAT=console + +# ============= Database Configuration ============= +MYSQL_DATA_SOURCE=root:123456@tcp(127.0.0.1:3306)/dispute_explorer?charset=utf8mb4&parseTime=True&loc=Local&multiStatements=true +MYSQL_MAX_IDLE_CONNS=10 +MYSQL_MAX_OPEN_CONNS=20 +MYSQL_CONN_MAX_LIFETIME=3600 + +# ============= Blockchain Configuration ============= +# Blockchain network name +BLOCKCHAIN=sepolia + +# Starting block number (ensure no games are missed) +FROM_BLOCK_NUMBER=5515562 +# Starting block hash +FROM_BLOCK_HASH=0x5205c17557759edaef9120f56af802aeaa2827a60d674a0413e77e9c515bdfba + +# Dispute Game Factory Proxy contract address +DISPUTE_GAME_PROXY_CONTRACT=0x05F9613aDB30026FFd634f38e5C4dFd30a197Fa1 + +# ============= API Configuration ============= +# API server port +API_PORT=8088 diff --git a/README.md b/README.md index b0fb538..7e53698 100644 --- a/README.md +++ b/README.md @@ -37,11 +37,11 @@ MYSQL_DATA_SOURCE= BLOCKCHAIN= # l1 rpc url example: eth json rpc url -L1_RPC_URL= +L1_RPC_URLS= # l2 rpc url example: op json rpc url -L2_RPC_URL= +L2_RPC_URLS= -NODE_RPCURL= +NODE_RPCURLS= RPC_RATE_LIMIT=15 RPC_RATE_BURST=5 diff --git a/build/dispute-explorer b/dispute-explorer similarity index 74% rename from build/dispute-explorer rename to dispute-explorer index 08295c8..d38c892 100755 Binary files a/build/dispute-explorer and b/dispute-explorer differ diff --git a/internal/api/dispute_game_handler.go b/internal/api/dispute_game_handler.go index a7cb0ee..e485f0b 100644 --- a/internal/api/dispute_game_handler.go +++ b/internal/api/dispute_game_handler.go @@ -8,12 +8,14 @@ import ( "net/http" "github.com/ethereum-optimism/optimism/op-challenger/game/fault/types" - "github.com/ethereum-optimism/optimism/op-service/sources" // 新增导入 + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" + gethlog "github.com/ethereum/go-ethereum/log" config "github.com/optimism-java/dispute-explorer/internal/types" "github.com/optimism-java/dispute-explorer/pkg/contract" + "github.com/optimism-java/dispute-explorer/pkg/rpc" "github.com/spf13/cast" @@ -24,20 +26,16 @@ import ( ) type DisputeGameHandler struct { - Config *config.Config - DB *gorm.DB - L1RPC *ethclient.Client - L2RPC *ethclient.Client - RollupClient *sources.RollupClient // 新增字段 + Config *config.Config + DB *gorm.DB + RPCManager *rpc.Manager } -func NewDisputeGameHandler(db *gorm.DB, l1rpc *ethclient.Client, l2rpc *ethclient.Client, config *config.Config, rollupClient *sources.RollupClient) *DisputeGameHandler { +func NewDisputeGameHandler(db *gorm.DB, rpcManager *rpc.Manager, config *config.Config) *DisputeGameHandler { return &DisputeGameHandler{ - DB: db, - L1RPC: l1rpc, - L2RPC: l2rpc, - Config: config, - RollupClient: rollupClient, // 新增赋值 + DB: db, + RPCManager: rpcManager, + Config: config, } } @@ -299,7 +297,17 @@ func (h DisputeGameHandler) getClaimRoot(blockNumber int64) (string, error) { return "", fmt.Errorf("block number cannot be negative: %d", blockNumber) } - output, err := h.RollupClient.OutputAtBlock(context.Background(), uint64(blockNumber)) + // 使用RPCManager获取Node RPC URL,实现轮询 + nodeRPCURL := h.RPCManager.GetNodeRPCURL() + + // 创建RollupClient(每次使用不同的Node RPC) + rpcClient, err := client.NewRPC(context.Background(), gethlog.New(), nodeRPCURL) + if err != nil { + return "", fmt.Errorf("failed to connect to node RPC %s: %w", nodeRPCURL, err) + } + rollupClient := sources.NewRollupClient(rpcClient) + + output, err := rollupClient.OutputAtBlock(context.Background(), uint64(blockNumber)) if err != nil { return "", fmt.Errorf("failed to get output at block %d: %w", blockNumber, err) } @@ -340,7 +348,9 @@ func (h DisputeGameHandler) GetGamesClaimByPosition(c *gin.Context) { } func (h DisputeGameHandler) gamesClaimByPosition(req *CalculateClaim) (string, error) { - newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(req.DisputeGame), h.L1RPC) + // 获取L1客户端从RPCManager + l1Client := h.RPCManager.GetRawClient(true) + newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(req.DisputeGame), l1Client) if err != nil { return "", err } diff --git a/internal/handler/frontend_move.go b/internal/handler/frontend_move.go index 7b96814..5d7aff7 100644 --- a/internal/handler/frontend_move.go +++ b/internal/handler/frontend_move.go @@ -82,6 +82,7 @@ func (h *FrontendMoveHandler) RecordFrontendMove(req *FrontendMoveRequest) error DisputedClaim: req.DisputedClaim, Status: schema.FrontendMoveStatusPending, SubmittedAt: time.Now().Unix(), + IsSynced: false, // 新记录默认未同步 } // Save to database @@ -106,8 +107,9 @@ func (h *FrontendMoveHandler) monitorTransactionStatus(recordID int64, txHash st for i := 0; i < maxRetries; i++ { time.Sleep(retryInterval) - // Query transaction status - receipt, err := h.svc.L1RPC.TransactionReceipt(context.Background(), common.HexToHash(txHash)) + // Query transaction status using RPCManager + l1Client := h.svc.RPCManager.GetRawClient(true) + receipt, err := l1Client.TransactionReceipt(context.Background(), common.HexToHash(txHash)) if err != nil { log.Debugf("[FrontendMoveHandler] Transaction %s not yet mined or error: %v", txHash, err) continue diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 9bdf034..2d6c458 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -25,6 +25,8 @@ func Run(ctx *svc.ServiceContext) { go CalculateLostBond(ctx) // sync claim len go SyncClaimDataLen(ctx) + // sync frontend move transactions + go SyncFrontendMoveTransactions(ctx) } // startRPCMonitoring starts RPC monitoring (internal function) diff --git a/internal/handler/latestBlockNumber.go b/internal/handler/latestBlockNumber.go index 3937662..9b9a0b3 100644 --- a/internal/handler/latestBlockNumber.go +++ b/internal/handler/latestBlockNumber.go @@ -21,7 +21,7 @@ func LatestBlackNumber(ctx *svc.ServiceContext) { } ctx.LatestBlockNumber = cast.ToInt64(latest) - log.Infof("[Handler.LatestBlackNumber] Latest block number: %d (via RPC Manager)\n", latest) + log.Debugf("[Handler.LatestBlackNumber] Latest block number: %d (via RPC Manager)\n", latest) time.Sleep(12 * time.Second) } } diff --git a/internal/handler/logFilter.go b/internal/handler/logFilter.go index c66f3a6..e591755 100644 --- a/internal/handler/logFilter.go +++ b/internal/handler/logFilter.go @@ -29,7 +29,7 @@ func LogFilter(ctx *svc.ServiceContext, block schema.SyncBlock, addresses []comm if err != nil { return nil, errors.WithStack(err) } - log.Infof("[LogFilter] Event logs length is %d, block number is %d (via RPC Manager)\n", len(logs), block.BlockNumber) + log.Debugf("[LogFilter] Event logs length is %d, block number is %d (via RPC Manager)\n", len(logs), block.BlockNumber) return LogsToEvents(ctx, logs, block.ID) } @@ -41,14 +41,14 @@ func LogsToEvents(ctx *svc.ServiceContext, logs []types.Log, syncBlockID int64) contractAddress := vlog.Address Event := blockchain.GetEvent(eventHash) if Event == nil { - log.Infof("[LogsToEvents] logs[txHash: %s, contractAddress:%s, eventHash: %s]\n", vlog.TxHash, strings.ToLower(contractAddress.Hex()), eventHash) + log.Debugf("[LogsToEvents] logs[txHash: %s, contractAddress:%s, eventHash: %s]\n", vlog.TxHash, strings.ToLower(contractAddress.Hex()), eventHash) continue } blockTime := blockTimes[cast.ToInt64(vlog.BlockNumber)] if blockTime == 0 { blockNumber := cast.ToInt64(vlog.BlockNumber) - log.Infof("[LogsToEvents] Fetching block info for block number: %d, txHash: %s", blockNumber, vlog.TxHash.Hex()) + log.Debugf("[LogsToEvents] Fetching block info for block number: %d, txHash: %s", blockNumber, vlog.TxHash.Hex()) // Use unified RPC manager to get block (automatically applies rate limiting) block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1 diff --git a/internal/handler/rpc_manager_migration.go b/internal/handler/rpc_manager_migration.go index cf2e6d0..fcf2028 100644 --- a/internal/handler/rpc_manager_migration.go +++ b/internal/handler/rpc_manager_migration.go @@ -24,7 +24,7 @@ func LatestBlockNumberWithRateLimit(ctx *svc.ServiceContext) { } ctx.LatestBlockNumber = cast.ToInt64(latest) - log.Infof("[Handler.LatestBlockNumberWithRateLimit] Latest block number: %d (using RPC Manager)\n", latest) + log.Debugf("[Handler.LatestBlockNumberWithRateLimit] Latest block number: %d (using RPC Manager)\n", latest) time.Sleep(12 * time.Second) } } diff --git a/internal/handler/syncBlock.go b/internal/handler/syncBlock.go index 5f4fc4c..d75c58e 100644 --- a/internal/handler/syncBlock.go +++ b/internal/handler/syncBlock.go @@ -31,8 +31,8 @@ func SyncBlock(ctx *svc.ServiceContext) { ctx.SyncedBlockHash = common.HexToHash(syncedBlock.BlockHash) } - log.Infof("[Handler.SyncBlock]SyncedBlockNumber: %d", ctx.SyncedBlockNumber) - log.Infof("[Handler.SyncBlock]SyncedBlockHash:%s", ctx.SyncedBlockHash.String()) + log.Debugf("[Handler.SyncBlock]SyncedBlockNumber: %d", ctx.SyncedBlockNumber) + log.Debugf("[Handler.SyncBlock]SyncedBlockHash:%s", ctx.SyncedBlockHash.String()) for { // Check pending blocks count before syncing new blocks @@ -52,7 +52,7 @@ func SyncBlock(ctx *svc.ServiceContext) { } syncingBlockNumber := ctx.SyncedBlockNumber + 1 - log.Infof("[Handler.SyncBlock] Try to sync block number: %d\n", syncingBlockNumber) + log.Debugf("[Handler.SyncBlock] Try to sync block number: %d\n", syncingBlockNumber) if syncingBlockNumber > ctx.LatestBlockNumber { time.Sleep(3 * time.Second) @@ -68,7 +68,7 @@ func SyncBlock(ctx *svc.ServiceContext) { continue } block := rpc.ParseJSONBlock(string(blockJSON)) - log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v (via RPC Manager)\n", block.Number(), block.Hash(), block.ParentHash()) + log.Debugf("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v (via RPC Manager)\n", block.Number(), block.Hash(), block.ParentHash()) if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash { log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash) @@ -104,7 +104,7 @@ func rollbackBlock(ctx *svc.ServiceContext) { for { rollbackBlockNumber := ctx.SyncedBlockNumber - log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber) + log.Debugf("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber) // use unified RPC manager for rollback operation (automatically applies rate limiting) requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", rollbackBlockNumber) + "\", true],\"id\":1}" diff --git a/internal/handler/syncClaimDataLen.go b/internal/handler/syncClaimDataLen.go index 0a9b452..33dc541 100644 --- a/internal/handler/syncClaimDataLen.go +++ b/internal/handler/syncClaimDataLen.go @@ -20,7 +20,7 @@ func SyncClaimDataLen(ctx *svc.ServiceContext) { continue } if len(disputeGames) == 0 { - log.Infof("[Handler.SyncClaimDataLen] Pending games count is 0\n") + log.Debugf("[Handler.SyncClaimDataLen] Pending games count is 0\n") time.Sleep(2 * time.Second) continue } @@ -38,5 +38,6 @@ func SyncClaimDataLen(ctx *svc.ServiceContext) { log.Errorf("[Handler.SyncClaimDataLen] update claim len err", errors.WithStack(err)) } } + time.Sleep(3 * time.Second) } } diff --git a/internal/handler/syncCredit.go b/internal/handler/syncCredit.go index 6665ff8..3c9088c 100644 --- a/internal/handler/syncCredit.go +++ b/internal/handler/syncCredit.go @@ -23,8 +23,10 @@ func SyncCredit(ctx *svc.ServiceContext) { } for _, disputeGame := range disputeGames { game := common.HexToAddress(disputeGame.GameContract) + // 使用RPCManager获取L1客户端 + l1Client := ctx.RPCManager.GetRawClient(true) disputeClient, err := NewRetryDisputeGameClient(ctx.DB, game, - ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + l1Client, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) if err != nil { log.Errorf("[Handler.SyncCredit] NewRetryDisputeGameClient err: %s", err) time.Sleep(5 * time.Second) @@ -34,6 +36,7 @@ func SyncCredit(ctx *svc.ServiceContext) { if err != nil { log.Errorf("[Handler.SyncCredit] ProcessDisputeGameCredit err: %s", err) } + time.Sleep(1 * time.Second) } time.Sleep(3 * time.Second) } diff --git a/internal/handler/syncDispute.go b/internal/handler/syncDispute.go index 56bdf4a..998663e 100644 --- a/internal/handler/syncDispute.go +++ b/internal/handler/syncDispute.go @@ -20,13 +20,13 @@ import ( func SyncDispute(ctx *svc.ServiceContext) { for { var events []schema.SyncEvent - err := ctx.DB.Where("status=? OR status=?", schema.EventPending, schema.EventRollback).Order("block_number").Limit(50).Find(&events).Error + err := ctx.DB.Where("status=? OR status=?", schema.EventPending, schema.EventRollback).Order("block_number").Limit(20).Find(&events).Error if err != nil { time.Sleep(3 * time.Second) continue } if len(events) == 0 { - log.Infof("[Handler.SyncDispute] Pending events count is 0\n") + log.Debugf("[Handler.SyncDispute] Pending events count is 0\n") time.Sleep(2 * time.Second) continue } @@ -45,6 +45,7 @@ func SyncDispute(ctx *svc.ServiceContext) { log.Errorf("[Handler.SyncEvent] HandleRollbackBlock err: %s\n", errors.WithStack(err)) } } + time.Sleep(1 * time.Second) } time.Sleep(3 * time.Second) } @@ -136,7 +137,7 @@ func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error { return errors.WithStack(err) } disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy), - ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + ctx.RPCManager.GetRawClient(true), rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) if err != nil { log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for created err: %s", err) return errors.WithStack(err) @@ -148,7 +149,7 @@ func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error { } case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String(): disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress), - ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + ctx.RPCManager.GetRawClient(true), rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) if err != nil { log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for move err: %s", err) return errors.WithStack(err) @@ -160,7 +161,7 @@ func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error { } case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String(): disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress), - ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + ctx.RPCManager.GetRawClient(true), rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) if err != nil { log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for resolved err: %s", err) return errors.WithStack(err) diff --git a/internal/handler/syncEvent.go b/internal/handler/syncEvent.go index acdfd2f..7128672 100644 --- a/internal/handler/syncEvent.go +++ b/internal/handler/syncEvent.go @@ -28,7 +28,7 @@ func SyncEvent(ctx *svc.ServiceContext) { continue } if len(blocks) == 0 { - log.Infof("[Handler.SyncEvent] Pending blocks count is 0\n") + log.Debugf("[Handler.SyncEvent] Pending blocks count is 0\n") time.Sleep(2 * time.Second) continue } @@ -40,7 +40,7 @@ func SyncEvent(ctx *svc.ServiceContext) { defer _wg.Done() if block.Status == schema.BlockPending { // add events & block.status= valid - log.Infof("[Handler.SyncEvent] Processing pending block %d (hash: %s)", block.BlockNumber, block.BlockHash) + log.Debugf("[Handler.SyncEvent] Processing pending block %d (hash: %s)", block.BlockNumber, block.BlockHash) err = HandlePendingBlock(ctx, block) if err != nil { log.Errorf("[Handler.SyncEvent] HandlePendingBlock err for block %d (hash: %s): %s\n", block.BlockNumber, block.BlockHash, errors.WithStack(err)) @@ -61,26 +61,26 @@ func SyncEvent(ctx *svc.ServiceContext) { } func HandlePendingBlock(ctx *svc.ServiceContext, block schema.SyncBlock) error { - log.Infof("[Handler.SyncEvent.PendingBlock]Start: %d, %s \n", block.BlockNumber, block.BlockHash) - log.Infof("[Handler.SyncEvent.PendingBlock]GetContracts: %v\n", blockchain.GetContracts()) - log.Infof("[Handler.SyncEvent.PendingBlock]GetEvents: %v\n", blockchain.GetEvents()) + log.Debugf("[Handler.SyncEvent.PendingBlock]Start: %d, %s \n", block.BlockNumber, block.BlockHash) + log.Debugf("[Handler.SyncEvent.PendingBlock]GetContracts: %v\n", blockchain.GetContracts()) + log.Debugf("[Handler.SyncEvent.PendingBlock]GetEvents: %v\n", blockchain.GetEvents()) - log.Infof("[Handler.SyncEvent.PendingBlock] About to call LogFilter for block %d", block.BlockNumber) + log.Debugf("[Handler.SyncEvent.PendingBlock] About to call LogFilter for block %d", block.BlockNumber) events, err := LogFilter(ctx, block, blockchain.GetContracts(), [][]common.Hash{blockchain.GetEvents()}) - log.Infof("[Handler.SyncEvent.PendingBlock] block %d, events number is %d:", block.BlockNumber, len(events)) + log.Debugf("[Handler.SyncEvent.PendingBlock] block %d, events number is %d:", block.BlockNumber, len(events)) if err != nil { log.Errorf("[Handler.SyncEvent.PendingBlock] Log filter err for block %d (hash: %s): %s\n", block.BlockNumber, block.BlockHash, err) return errors.WithStack(err) } eventCount := len(events) if eventCount > 0 && events[0].BlockHash != block.BlockHash { - log.Infof("[Handler.SyncEvent.PendingBlock]Don't match block hash\n") + log.Debugf("[Handler.SyncEvent.PendingBlock]Don't match block hash\n") return nil } else if eventCount > 0 && events[0].BlockHash == block.BlockHash { BatchEvents := make([]*schema.SyncEvent, 0) for _, event := range events { var one schema.SyncEvent - log.Infof("[Handler.SyncEvent.PendingBlock]BlockLogIndexed %d ,TxHash %s,EventHash %s", event.BlockLogIndexed, event.TxHash, event.EventHash) + log.Debugf("[Handler.SyncEvent.PendingBlock]BlockLogIndexed %d ,TxHash %s,EventHash %s", event.BlockLogIndexed, event.TxHash, event.EventHash) err = ctx.DB.Select("id").Where("sync_block_id=? AND block_log_indexed=? AND tx_hash=? AND event_hash=? ", block.ID, event.BlockLogIndexed, event.TxHash, event.EventHash).First(&one).Error if err != nil && err != gorm.ErrRecordNotFound { @@ -88,7 +88,7 @@ func HandlePendingBlock(ctx *svc.ServiceContext, block schema.SyncBlock) error { return errors.WithStack(err) } else if err == gorm.ErrRecordNotFound { BatchEvents = append(BatchEvents, event) - log.Infof("[Handler.SyncEvent.PendingBlock]block %d, BatchEvents len is %d:", block.BlockNumber, len(BatchEvents)) + log.Debugf("[Handler.SyncEvent.PendingBlock]block %d, BatchEvents len is %d:", block.BlockNumber, len(BatchEvents)) if event.EventName == "DisputeGameCreated" { dispute := evt.DisputeGameCreated{} err := dispute.ToObj(event.Data) @@ -128,12 +128,12 @@ func HandlePendingBlock(ctx *svc.ServiceContext, block schema.SyncBlock) error { log.Errorf("[Handler.PendingBlock]Update SyncBlock Status err: %s\n ", err) return errors.WithStack(err) } - log.Infof("[Handler.SyncEvent.PendingBlock]End: %d, %s \n", block.BlockNumber, block.BlockHash) + log.Debugf("[Handler.SyncEvent.PendingBlock]End: %d, %s \n", block.BlockNumber, block.BlockHash) return nil } func HandleRollbackBlock(ctx *svc.ServiceContext, block schema.SyncBlock) error { - log.Infof("[Handler.RollbackBlock] Start: %d, %s\n", block.BlockNumber, block.BlockHash) + log.Debugf("[Handler.RollbackBlock] Start: %d, %s\n", block.BlockNumber, block.BlockHash) err := ctx.DB.Transaction(func(tx *gorm.DB) error { now := time.Now() // event.status=rollback by syncBlockId diff --git a/internal/handler/syncFrontendMove.go b/internal/handler/syncFrontendMove.go new file mode 100644 index 0000000..1056c4c --- /dev/null +++ b/internal/handler/syncFrontendMove.go @@ -0,0 +1,100 @@ +package handler + +import ( + "time" + + "github.com/optimism-java/dispute-explorer/internal/schema" + "github.com/optimism-java/dispute-explorer/internal/svc" + "github.com/optimism-java/dispute-explorer/pkg/log" +) + +func SyncFrontendMoveTransactions(ctx *svc.ServiceContext) { + log.Infof("[Handler.SyncFrontendMoveTransactions] Starting sync frontend move transactions...") + + for { + select { + case <-ctx.Context.Done(): + log.Infof("[Handler.SyncFrontendMoveTransactions] Context canceled, stopping...") + return + default: + err := processFrontendMoveTransactions(ctx) + if err != nil { + log.Errorf("[Handler.SyncFrontendMoveTransactions] Process error: %s", err) + } + time.Sleep(30 * time.Second) + } + } +} + +// processFrontendMoveTransactions +func processFrontendMoveTransactions(ctx *svc.ServiceContext) error { + var unsyncedTransactions []schema.FrontendMoveTransaction + err := ctx.DB.Where("is_synced = ?", false).Order("id").Limit(10).Find(&unsyncedTransactions).Error + if err != nil { + return err + } + + if len(unsyncedTransactions) == 0 { + log.Debugf("[Handler.SyncFrontendMoveTransactions] No unsynced transactions found") + return nil + } + + log.Infof("[Handler.SyncFrontendMoveTransactions] Found %d unsynced transactions", len(unsyncedTransactions)) + + for i := range unsyncedTransactions { + transaction := &unsyncedTransactions[i] + err := syncSingleTransaction(ctx, transaction) + if err != nil { + log.Errorf("[Handler.SyncFrontendMoveTransactions] Failed to sync transaction %s: %s", transaction.TxHash, err) + continue + } + } + + return nil +} + +// syncSingleTransaction +func syncSingleTransaction(ctx *svc.ServiceContext, transaction *schema.FrontendMoveTransaction) error { + tx := ctx.DB.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + } + }() + + // 1. update dispute_game has_frontend_move column to true + err := tx.Model(&schema.DisputeGame{}). + Where("game_contract = ?", transaction.GameContract). + Update("has_frontend_move", true).Error + if err != nil { + tx.Rollback() + return err + } + + // 2. update game_claim_data is_from_frontend column to true + err = tx.Model(&schema.GameClaimData{}). + Where("game_contract = ? AND parent_index = ?", transaction.GameContract, transaction.ParentIndex). + Update("is_from_frontend", true).Error + if err != nil { + tx.Rollback() + return err + } + + // 3. update frontend_move_transactions is_synced column to true + err = tx.Model(transaction).Update("is_synced", true).Error + if err != nil { + tx.Rollback() + return err + } + + // tx commit + err = tx.Commit().Error + if err != nil { + return err + } + + log.Infof("[Handler.SyncFrontendMoveTransactions] Successfully synced transaction %s (game: %s, parent_index: %s)", + transaction.TxHash, transaction.GameContract, transaction.ParentIndex) + + return nil +} diff --git a/internal/schema/frontend_move_transaction.go b/internal/schema/frontend_move_transaction.go index 36a5d88..32a2eeb 100644 --- a/internal/schema/frontend_move_transaction.go +++ b/internal/schema/frontend_move_transaction.go @@ -22,6 +22,7 @@ type FrontendMoveTransaction struct { ErrorMessage string `json:"error_message,omitempty"` // Error message (if any) SubmittedAt int64 `json:"submitted_at"` // Submission timestamp ConfirmedAt int64 `json:"confirmed_at,omitempty"` // Confirmation timestamp + IsSynced bool `json:"is_synced" gorm:"default:false;index:idx_is_synced"` // Whether synced to related tables } func (FrontendMoveTransaction) TableName() string { diff --git a/internal/svc/svc.go b/internal/svc/svc.go index 774962d..0f9f2de 100644 --- a/internal/svc/svc.go +++ b/internal/svc/svc.go @@ -8,7 +8,6 @@ import ( "gorm.io/driver/mysql" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/optimism-java/dispute-explorer/internal/types" "github.com/optimism-java/dispute-explorer/pkg/rpc" "gorm.io/gorm" @@ -19,9 +18,7 @@ var svc *ServiceContext type ServiceContext struct { Config *types.Config - L1RPC *ethclient.Client // 保留向后兼容 - L2RPC *ethclient.Client // 保留向后兼容 - RPCManager *rpc.Manager // 新增统一RPC管理器 + RPCManager *rpc.Manager // 统一RPC管理器 DB *gorm.DB LatestBlockNumber int64 SyncedBlockNumber int64 @@ -47,17 +44,6 @@ func NewServiceContext(ctx context.Context, cfg *types.Config) *ServiceContext { // SetConnMaxLifetime sqlDB.SetConnMaxLifetime(time.Duration(cfg.MySQLConnMaxLifetime) * time.Second) - // 创建原有的以太坊客户端(保持向后兼容) - l1Client, err := ethclient.Dial(cfg.L1RPCUrl) - if err != nil { - log.Panicf("[svc] get L1 eth client panic: %s\n", err) - } - - l2Client, err := ethclient.Dial(cfg.L2RPCUrl) - if err != nil { - log.Panicf("[svc] get L2 eth client panic: %s\n", err) - } - // 创建统一的RPC管理器 rpcManager, err := rpc.CreateManagerFromConfig(cfg) if err != nil { @@ -66,9 +52,7 @@ func NewServiceContext(ctx context.Context, cfg *types.Config) *ServiceContext { svc = &ServiceContext{ Config: cfg, - L1RPC: l1Client, // 保留向后兼容 - L2RPC: l2Client, // 保留向后兼容 - RPCManager: rpcManager, // 新的统一管理器 + RPCManager: rpcManager, DB: storage, Context: ctx, } diff --git a/internal/types/config.go b/internal/types/config.go index c10d284..6289fa5 100644 --- a/internal/types/config.go +++ b/internal/types/config.go @@ -2,6 +2,7 @@ package types import ( "log" + "strings" env "github.com/caarlos0/env/v6" ) @@ -16,9 +17,9 @@ type Config struct { MySQLMaxOpenConns int `env:"MYSQL_MAX_OPEN_CONNS" envDefault:"20"` MySQLConnMaxLifetime int `env:"MYSQL_CONN_MAX_LIFETIME" envDefault:"3600"` Blockchain string `env:"BLOCKCHAIN" envDefault:"sepolia"` - L1RPCUrl string `env:"L1_RPC_URL" envDefault:"https://eth-sepolia.g.alchemy.com/v2/RT1mCGRyVMx1F-XlY4Es4Zz-Q8Jrasg6"` - L2RPCUrl string `env:"L2_RPC_URL" envDefault:"https://opt-sepolia.g.alchemy.com/v2/RT1mCGRyVMx1F-XlY4Es4Zz-Q8Jrasg6"` - NodeRPCURL string `env:"NODE_RPCURL" envDefault:"https://light-radial-slug.optimism-sepolia.quiknode.pro/e9329f699b371572a8cc5dd22d19d5940bb842a5/"` + L1RPCUrls string `env:"L1_RPC_URLS" envDefault:"https://eth-sepolia.g.alchemy.com/v2/RT1mCGRyVMx1F-XlY4Es4Zz-Q8Jrasg6"` + L2RPCUrls string `env:"L2_RPC_URLS" envDefault:"https://opt-sepolia.g.alchemy.com/v2/RT1mCGRyVMx1F-XlY4Es4Zz-Q8Jrasg6"` + NodeRPCUrls string `env:"NODE_RPC_URLS" envDefault:"https://light-radial-slug.optimism-sepolia.quiknode.pro/e9329f699b371572a8cc5dd22d19d5940bb842a5/"` RPCRateLimit int `env:"RPC_RATE_LIMIT" envDefault:"5"` RPCRateBurst int `env:"RPC_RATE_BURST" envDefault:"2"` FromBlockNumber int64 `env:"FROM_BLOCK_NUMBER" envDefault:"5515562"` @@ -40,3 +41,32 @@ func GetConfig() *Config { } return config } + +// GetL1RPCUrls 获取L1 RPC URL列表 +func (c *Config) GetL1RPCUrls() []string { + urls := strings.Split(c.L1RPCUrls, ",") + // 去除空格 + for i, url := range urls { + urls[i] = strings.TrimSpace(url) + } + return urls +} + +// GetL2RPCUrls 获取L2 RPC URL列表 +func (c *Config) GetL2RPCUrls() []string { + urls := strings.Split(c.L2RPCUrls, ",") + // 去除空格 + for i, url := range urls { + urls[i] = strings.TrimSpace(url) + } + return urls +} + +// GetNodeRPCUrls 获取Node RPC URL列表 +func (c *Config) GetNodeRPCUrls() []string { + urls := strings.Split(c.NodeRPCUrls, ",") + for i, url := range urls { + urls[i] = strings.TrimSpace(url) + } + return urls +} diff --git a/main.go b/main.go index d040834..266f601 100644 --- a/main.go +++ b/main.go @@ -13,10 +13,6 @@ import ( disputeLog "github.com/optimism-java/dispute-explorer/pkg/log" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" - - "github.com/ethereum-optimism/optimism/op-service/client" - "github.com/ethereum-optimism/optimism/op-service/sources" - gethlog "github.com/ethereum/go-ethereum/log" ) func main() { @@ -29,10 +25,8 @@ func main() { handler.Run(sCtx) disputeLog.Info("listener running...\n") - rollupClient := initRollupClient(cfg) - router := gin.Default() - disputeGameHandler := api.NewDisputeGameHandler(sCtx.DB, sCtx.L1RPC, sCtx.L2RPC, cfg, rollupClient) + disputeGameHandler := api.NewDisputeGameHandler(sCtx.DB, sCtx.RPCManager, cfg) frontendMoveAPI := api.NewFrontendMoveAPI(sCtx) docs.SwaggerInfo.Title = "Dispute Game Swagger API" @@ -65,12 +59,3 @@ func main() { return } } - -func initRollupClient(cfg *types.Config) *sources.RollupClient { - rpcClient, err := client.NewRPC(context.Background(), gethlog.New(), cfg.NodeRPCURL) - if err != nil { - disputeLog.Errorf("failed to connect to node RPC: %v", err) - panic(err) - } - return sources.NewRollupClient(rpcClient) -} diff --git a/migration/version/migration_version.go b/migration/version/migration_version.go index c5023b6..771e130 100644 --- a/migration/version/migration_version.go +++ b/migration/version/migration_version.go @@ -10,6 +10,7 @@ import ( v6 "github.com/optimism-java/dispute-explorer/migration/version/v6" v7 "github.com/optimism-java/dispute-explorer/migration/version/v7" v8 "github.com/optimism-java/dispute-explorer/migration/version/v8" + v9 "github.com/optimism-java/dispute-explorer/migration/version/v9" ) var ModelSchemaList = []*gormigrate.Migration{ @@ -21,4 +22,5 @@ var ModelSchemaList = []*gormigrate.Migration{ &v6.UpdateClaimDataClockColumnTable, &v7.AddClaimDataLenForDisputeGameTable, &v8.AddFrontendMoveTrackingTable, + &v9.AddIsSyncedFieldTable, } diff --git a/migration/version/v9/add_is_synced_field.go b/migration/version/v9/add_is_synced_field.go new file mode 100644 index 0000000..becec74 --- /dev/null +++ b/migration/version/v9/add_is_synced_field.go @@ -0,0 +1,32 @@ +package v9 + +import ( + "fmt" + + gormigrate "github.com/go-gormigrate/gormigrate/v2" + "github.com/optimism-java/dispute-explorer/internal/schema" + "gorm.io/gorm" +) + +var AddIsSyncedFieldTable = gormigrate.Migration{ + ID: "20240829_add_is_synced_field", + Migrate: AddIsSyncedToFrontendMoveTransactions, +} + +func AddIsSyncedToFrontendMoveTransactions(db *gorm.DB) error { + // Check and add is_synced field to frontend_move_transactions table + if !db.Migrator().HasColumn(&schema.FrontendMoveTransaction{}, "is_synced") { + err := db.Migrator().AddColumn(&schema.FrontendMoveTransaction{}, "is_synced") + if err != nil { + return fmt.Errorf("failed to add is_synced column to frontend_move_transactions: %v", err) + } + + // Set default value for newly added field + err = db.Model(&schema.FrontendMoveTransaction{}).Update("is_synced", false).Error + if err != nil { + return fmt.Errorf("failed to set default value for is_synced: %v", err) + } + } + + return nil +} diff --git a/pkg/rpc/factory.go b/pkg/rpc/factory.go index 4f37848..cba4900 100644 --- a/pkg/rpc/factory.go +++ b/pkg/rpc/factory.go @@ -9,9 +9,10 @@ import ( // CreateManagerFromConfig creates RPC manager from configuration func CreateManagerFromConfig(config *types.Config) (*Manager, error) { return NewManager(Config{ - L1RPCUrl: config.L1RPCUrl, - L2RPCUrl: config.L2RPCUrl, - ProxyURL: "", // if proxy is needed, can be added from configuration + L1RPCUrls: config.GetL1RPCUrls(), + L2RPCUrls: config.GetL2RPCUrls(), + NodeRPCUrls: config.GetNodeRPCUrls(), + ProxyURL: "", RateLimit: config.RPCRateLimit, RateBurst: config.RPCRateBurst, HTTPTimeout: 10 * time.Second, @@ -26,8 +27,8 @@ func CreateManagerWithSeparateLimits( // Note: current implementation uses same limits for L1 and L2 // if different limits are needed, Manager structure needs to be modified return NewManager(Config{ - L1RPCUrl: l1URL, - L2RPCUrl: l2URL, + L1RPCUrls: []string{l1URL}, + L2RPCUrls: []string{l2URL}, RateLimit: l1Rate, // 使用L1的限制作为默认 RateBurst: l1Burst, HTTPTimeout: 10 * time.Second, diff --git a/pkg/rpc/manager.go b/pkg/rpc/manager.go index 3b0281f..a39e6ca 100644 --- a/pkg/rpc/manager.go +++ b/pkg/rpc/manager.go @@ -15,20 +15,33 @@ import ( "golang.org/x/time/rate" ) +const ( + // RPC type constants + nodeRPCType = "Node" +) + // Manager unified RPC resource manager type Manager struct { // Configuration - l1RPCUrl string - l2RPCUrl string - proxyURL string + l1RPCUrls []string + l2RPCUrls []string + nodeRPCUrls []string + proxyURL string + + // Round-robin indices + l1Index int + l2Index int + nodeIndex int // Rate limiters - l1Limiter *rate.Limiter - l2Limiter *rate.Limiter + l1Limiter *rate.Limiter + l2Limiter *rate.Limiter + nodeLimiter *rate.Limiter - // Native Ethereum clients - l1Client *ethclient.Client - l2Client *ethclient.Client + // Client pools + l1Clients []*ethclient.Client + l2Clients []*ethclient.Client + nodeClients []*ethclient.Client // HTTP client httpClient *http.Client @@ -40,8 +53,9 @@ type Manager struct { // Config RPC manager configuration type Config struct { - L1RPCUrl string - L2RPCUrl string + L1RPCUrls []string + L2RPCUrls []string + NodeRPCUrls []string ProxyURL string RateLimit int RateBurst int @@ -50,31 +64,64 @@ type Config struct { // Stats RPC call statistics type Stats struct { - L1RequestCount int64 - L2RequestCount int64 - L1RateLimitedCount int64 - L2RateLimitedCount int64 - HTTPRequestCount int64 - LastRequestTime time.Time - mu sync.RWMutex + L1RequestCount int64 + L2RequestCount int64 + NodeRequestCount int64 + L1RateLimitedCount int64 + L2RateLimitedCount int64 + NodeRateLimitedCount int64 + HTTPRequestCount int64 + LastRequestTime time.Time + mu sync.RWMutex } // NewManager creates a new RPC manager func NewManager(config Config) (*Manager, error) { - // Create Ethereum clients - l1Client, err := ethclient.Dial(config.L1RPCUrl) - if err != nil { - return nil, fmt.Errorf("failed to connect to L1 RPC: %w", err) + // Validate configuration + if len(config.L1RPCUrls) == 0 { + return nil, fmt.Errorf("l1 RPC URLs cannot be empty") + } + if len(config.L2RPCUrls) == 0 { + return nil, fmt.Errorf("l2 RPC URLs cannot be empty") + } + if len(config.NodeRPCUrls) == 0 { + return nil, fmt.Errorf("node RPC URLs cannot be empty") + } + + // Create L1 client pool + l1Clients := make([]*ethclient.Client, 0, len(config.L1RPCUrls)) + for _, url := range config.L1RPCUrls { + client, err := ethclient.Dial(url) + if err != nil { + return nil, fmt.Errorf("failed to connect to L1 RPC %s: %w", url, err) + } + l1Clients = append(l1Clients, client) + } + + // Create L2 client pool + l2Clients := make([]*ethclient.Client, 0, len(config.L2RPCUrls)) + for _, url := range config.L2RPCUrls { + client, err := ethclient.Dial(url) + if err != nil { + return nil, fmt.Errorf("failed to connect to L2 RPC %s: %w", url, err) + } + l2Clients = append(l2Clients, client) } - l2Client, err := ethclient.Dial(config.L2RPCUrl) - if err != nil { - return nil, fmt.Errorf("failed to connect to L2 RPC: %w", err) + // Create Node client pool + nodeClients := make([]*ethclient.Client, 0, len(config.NodeRPCUrls)) + for _, url := range config.NodeRPCUrls { + client, err := ethclient.Dial(url) + if err != nil { + return nil, fmt.Errorf("failed to connect to %s RPC %s: %w", nodeRPCType, url, err) + } + nodeClients = append(nodeClients, client) } // Create rate limiters l1Limiter := rate.NewLimiter(rate.Limit(config.RateLimit), config.RateBurst) l2Limiter := rate.NewLimiter(rate.Limit(config.RateLimit), config.RateBurst) + nodeLimiter := rate.NewLimiter(rate.Limit(config.RateLimit), config.RateBurst) // Set HTTP timeout timeout := config.HTTPTimeout @@ -87,124 +134,178 @@ func NewManager(config Config) (*Manager, error) { } return &Manager{ - l1RPCUrl: config.L1RPCUrl, - l2RPCUrl: config.L2RPCUrl, - proxyURL: config.ProxyURL, - l1Limiter: l1Limiter, - l2Limiter: l2Limiter, - l1Client: l1Client, - l2Client: l2Client, - httpClient: httpClient, - stats: &Stats{}, + l1RPCUrls: config.L1RPCUrls, + l2RPCUrls: config.L2RPCUrls, + nodeRPCUrls: config.NodeRPCUrls, + proxyURL: config.ProxyURL, + l1Index: 0, + l2Index: 0, + nodeIndex: 0, + l1Limiter: l1Limiter, + l2Limiter: l2Limiter, + nodeLimiter: nodeLimiter, + l1Clients: l1Clients, + l2Clients: l2Clients, + nodeClients: nodeClients, + httpClient: httpClient, + stats: &Stats{}, }, nil } +// getNextL1 gets next L1 RPC URL and client using round-robin +func (m *Manager) getNextL1() (string, *ethclient.Client) { + m.mu.Lock() + defer m.mu.Unlock() + + url := m.l1RPCUrls[m.l1Index] + client := m.l1Clients[m.l1Index] + m.l1Index = (m.l1Index + 1) % len(m.l1RPCUrls) + + return url, client +} + +// getNextL2 gets next L2 RPC URL and client using round-robin +func (m *Manager) getNextL2() (string, *ethclient.Client) { + m.mu.Lock() + defer m.mu.Unlock() + + url := m.l2RPCUrls[m.l2Index] + client := m.l2Clients[m.l2Index] + m.l2Index = (m.l2Index + 1) % len(m.l2RPCUrls) + + return url, client +} + +// getNextNode gets next Node RPC URL and client using round-robin +func (m *Manager) getNextNode() (string, *ethclient.Client) { + m.mu.Lock() + defer m.mu.Unlock() + + url := m.nodeRPCUrls[m.nodeIndex] + client := m.nodeClients[m.nodeIndex] + m.nodeIndex = (m.nodeIndex + 1) % len(m.nodeRPCUrls) + + return url, client +} + // GetLatestBlockNumber gets the latest block number (with rate limiting) func (m *Manager) GetLatestBlockNumber(ctx context.Context, isL1 bool) (uint64, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return 0, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.BlockNumber(ctx) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.BlockNumber(ctx) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return 0, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.BlockNumber(ctx) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.BlockNumber(ctx) } // GetBlockByNumber gets a block by number (with rate limiting) func (m *Manager) GetBlockByNumber(ctx context.Context, number *big.Int, isL1 bool) (*types.Block, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.BlockByNumber(ctx, number) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.BlockByNumber(ctx, number) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.BlockByNumber(ctx, number) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.BlockByNumber(ctx, number) } // GetBlockByHash gets a block by hash (with rate limiting) func (m *Manager) GetBlockByHash(ctx context.Context, hash common.Hash, isL1 bool) (*types.Block, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.BlockByHash(ctx, hash) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.BlockByHash(ctx, hash) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.BlockByHash(ctx, hash) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.BlockByHash(ctx, hash) } // FilterLogs filters logs (with rate limiting) func (m *Manager) FilterLogs(ctx context.Context, query ethereum.FilterQuery, isL1 bool) ([]types.Log, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.FilterLogs(ctx, query) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.FilterLogs(ctx, query) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.FilterLogs(ctx, query) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.FilterLogs(ctx, query) } // HeaderByNumber gets a block header by number (with rate limiting) func (m *Manager) HeaderByNumber(ctx context.Context, number *big.Int, isL1 bool) (*types.Header, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.HeaderByNumber(ctx, number) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.HeaderByNumber(ctx, number) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.HeaderByNumber(ctx, number) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.HeaderByNumber(ctx, number) } // CallContract calls a smart contract (with rate limiting) func (m *Manager) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int, isL1 bool) ([]byte, error) { if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - return m.l1Client.CallContract(ctx, call, blockNumber) + m.updateRequestStats("L1") + _, client := m.getNextL1() + return client.CallContract(ctx, call, blockNumber) } if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - return m.l2Client.CallContract(ctx, call, blockNumber) + m.updateRequestStats("L2") + _, client := m.getNextL2() + return client.CallContract(ctx, call, blockNumber) } // HTTPPostJSON HTTP POST JSON request (with rate limiting) @@ -213,18 +314,18 @@ func (m *Manager) HTTPPostJSON(ctx context.Context, bodyJSON string, isL1 bool) if isL1 { if err := m.l1Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(true) + m.updateRateLimitedStats("L1") return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) } - m.updateRequestStats(true) - rpcURL = m.l1RPCUrl + m.updateRequestStats("L1") + rpcURL, _ = m.getNextL1() } else { if err := m.l2Limiter.Wait(ctx); err != nil { - m.updateRateLimitedStats(false) + m.updateRateLimitedStats("L2") return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) } - m.updateRequestStats(false) - rpcURL = m.l2RPCUrl + m.updateRequestStats("L2") + rpcURL, _ = m.getNextL2() } m.updateHTTPRequestStats() @@ -234,42 +335,72 @@ func (m *Manager) HTTPPostJSON(ctx context.Context, bodyJSON string, isL1 bool) // GetRawClient gets the raw client (for backward compatibility) func (m *Manager) GetRawClient(isL1 bool) *ethclient.Client { if isL1 { - return m.l1Client + _, client := m.getNextL1() + return client } - return m.l2Client + _, client := m.getNextL2() + return client +} + +// GetNodeRawClient gets the Node RPC raw client using round-robin +func (m *Manager) GetNodeRawClient() *ethclient.Client { + _, client := m.getNextNode() + return client +} + +// GetNodeRPCURL gets the next Node RPC URL using round-robin +func (m *Manager) GetNodeRPCURL() string { + url, _ := m.getNextNode() + return url } // UpdateRateLimit dynamically updates rate limit -func (m *Manager) UpdateRateLimit(rateLimit int, rateBurst int, isL1 bool) { +func (m *Manager) UpdateRateLimit(rateLimit int, rateBurst int, rpcType string) { m.mu.Lock() defer m.mu.Unlock() - if isL1 { + switch rpcType { + case "L1": m.l1Limiter.SetLimit(rate.Limit(rateLimit)) m.l1Limiter.SetBurst(rateBurst) - } else { + case "L2": m.l2Limiter.SetLimit(rate.Limit(rateLimit)) m.l2Limiter.SetBurst(rateBurst) + case nodeRPCType: + m.nodeLimiter.SetLimit(rate.Limit(rateLimit)) + m.nodeLimiter.SetBurst(rateBurst) } } // GetRateLimit gets current rate limit settings -func (m *Manager) GetRateLimit(isL1 bool) (rateLimit float64, rateBurst int) { +func (m *Manager) GetRateLimit(rpcType string) (rateLimit float64, rateBurst int) { m.mu.RLock() defer m.mu.RUnlock() - if isL1 { + switch rpcType { + case "L1": return float64(m.l1Limiter.Limit()), m.l1Limiter.Burst() + case "L2": + return float64(m.l2Limiter.Limit()), m.l2Limiter.Burst() + case nodeRPCType: + return float64(m.nodeLimiter.Limit()), m.nodeLimiter.Burst() + default: + return 0, 0 } - return float64(m.l2Limiter.Limit()), m.l2Limiter.Burst() } // GetTokens 返回当前可用的令牌数 -func (m *Manager) GetTokens(isL1 bool) float64 { - if isL1 { +func (m *Manager) GetTokens(rpcType string) float64 { + switch rpcType { + case "L1": return m.l1Limiter.Tokens() + case "L2": + return m.l2Limiter.Tokens() + case nodeRPCType: + return m.nodeLimiter.Tokens() + default: + return 0 } - return m.l2Limiter.Tokens() } // GetStats gets statistics information @@ -278,47 +409,57 @@ func (m *Manager) GetStats() StatsSnapshot { defer m.stats.mu.RUnlock() return StatsSnapshot{ - L1RequestCount: m.stats.L1RequestCount, - L2RequestCount: m.stats.L2RequestCount, - L1RateLimitedCount: m.stats.L1RateLimitedCount, - L2RateLimitedCount: m.stats.L2RateLimitedCount, - HTTPRequestCount: m.stats.HTTPRequestCount, - LastRequestTime: m.stats.LastRequestTime, + L1RequestCount: m.stats.L1RequestCount, + L2RequestCount: m.stats.L2RequestCount, + NodeRequestCount: m.stats.NodeRequestCount, + L1RateLimitedCount: m.stats.L1RateLimitedCount, + L2RateLimitedCount: m.stats.L2RateLimitedCount, + NodeRateLimitedCount: m.stats.NodeRateLimitedCount, + HTTPRequestCount: m.stats.HTTPRequestCount, + LastRequestTime: m.stats.LastRequestTime, } } // StatsSnapshot statistics snapshot type StatsSnapshot struct { - L1RequestCount int64 - L2RequestCount int64 - L1RateLimitedCount int64 - L2RateLimitedCount int64 - HTTPRequestCount int64 - LastRequestTime time.Time + L1RequestCount int64 + L2RequestCount int64 + NodeRequestCount int64 + L1RateLimitedCount int64 + L2RateLimitedCount int64 + NodeRateLimitedCount int64 + HTTPRequestCount int64 + LastRequestTime time.Time } // updateRequestStats updates request statistics -func (m *Manager) updateRequestStats(isL1 bool) { +func (m *Manager) updateRequestStats(rpcType string) { m.stats.mu.Lock() defer m.stats.mu.Unlock() m.stats.LastRequestTime = time.Now() - if isL1 { + switch rpcType { + case "L1": m.stats.L1RequestCount++ - } else { + case "L2": m.stats.L2RequestCount++ + case nodeRPCType: + m.stats.NodeRequestCount++ } } // updateRateLimitedStats updates rate-limited request statistics -func (m *Manager) updateRateLimitedStats(isL1 bool) { +func (m *Manager) updateRateLimitedStats(rpcType string) { m.stats.mu.Lock() defer m.stats.mu.Unlock() - if isL1 { + switch rpcType { + case "L1": m.stats.L1RateLimitedCount++ - } else { + case "L2": m.stats.L2RateLimitedCount++ + case nodeRPCType: + m.stats.NodeRateLimitedCount++ } } @@ -332,10 +473,22 @@ func (m *Manager) updateHTTPRequestStats() { // Close closes all connections func (m *Manager) Close() { - if m.l1Client != nil { - m.l1Client.Close() + // Close all L1 clients + for _, client := range m.l1Clients { + if client != nil { + client.Close() + } } - if m.l2Client != nil { - m.l2Client.Close() + // Close all L2 clients + for _, client := range m.l2Clients { + if client != nil { + client.Close() + } + } + // Close all Node clients + for _, client := range m.nodeClients { + if client != nil { + client.Close() + } } } diff --git a/pkg/rpc/manager_test.go b/pkg/rpc/manager_test.go new file mode 100644 index 0000000..eaca637 --- /dev/null +++ b/pkg/rpc/manager_test.go @@ -0,0 +1,155 @@ +package rpc + +import ( + "testing" +) + +func TestGetNextL1SingleRPC(t *testing.T) { + // Test with single L1 RPC + config := Config{ + L1RPCUrls: []string{"https://test-rpc.com"}, + L2RPCUrls: []string{"https://test-l2-rpc.com"}, + NodeRPCUrls: []string{"https://test-node-rpc.com"}, + RateLimit: 5, + RateBurst: 2, + } + + // Note: This test won't actually create real connections + // since we're using test URLs. In a real test environment, + // you'd use mock servers or real test endpoints. + + t.Logf("Testing with single RPC URL configuration") + t.Logf("L1 URLs: %v", config.L1RPCUrls) + t.Logf("L2 URLs: %v", config.L2RPCUrls) + t.Logf("Node URLs: %v", config.NodeRPCUrls) + + // The round-robin logic should work with single URL: + // - Index starts at 0 + // - (0 + 1) % 1 = 0, so it cycles back to 0 + // - No array bounds error should occur +} + +func TestEmptyRPCUrls(t *testing.T) { + // Test with empty L1 RPC URLs + config := Config{ + L1RPCUrls: []string{}, // Empty array + L2RPCUrls: []string{"https://test-l2-rpc.com"}, + RateLimit: 5, + RateBurst: 2, + } + + _, err := NewManager(config) + if err == nil { + t.Error("Expected error for empty L1 RPC URLs, got nil") + } + if err.Error() != "L1 RPC URLs cannot be empty" { + t.Errorf("Expected specific error message, got: %s", err.Error()) + } + + // Test with empty L2 RPC URLs + config2 := Config{ + L1RPCUrls: []string{"https://test-rpc.com"}, + L2RPCUrls: []string{}, // Empty array + RateLimit: 5, + RateBurst: 2, + } + + _, err2 := NewManager(config2) + if err2 == nil { + t.Error("Expected error for empty L2 RPC URLs, got nil") + } + if err2.Error() != "L2 RPC URLs cannot be empty" { + t.Errorf("Expected specific error message, got: %s", err2.Error()) + } +} + +func TestRoundRobinLogic(t *testing.T) { + tests := []struct { + name string + rpcCount int + iterations int + description string + }{ + { + name: "SingleRPC", + rpcCount: 1, + iterations: 5, + description: "With 1 RPC, index should always be 0", + }, + { + name: "TwoRPCs", + rpcCount: 2, + iterations: 4, + description: "With 2 RPCs, index should alternate 0,1,0,1", + }, + { + name: "ThreeRPCs", + rpcCount: 3, + iterations: 6, + description: "With 3 RPCs, index should cycle 0,1,2,0,1,2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate round-robin logic without creating real connections + index := 0 + expectedSequence := make([]int, tt.iterations) + + for i := 0; i < tt.iterations; i++ { + expectedSequence[i] = index + index = (index + 1) % tt.rpcCount + } + + t.Logf("%s: Expected sequence for %d iterations: %v", + tt.description, tt.iterations, expectedSequence) + + // Verify the pattern is correct + for i := 0; i < tt.iterations; i++ { + expected := i % tt.rpcCount + if expectedSequence[i] != expected { + t.Errorf("At iteration %d, expected index %d, got %d", + i, expected, expectedSequence[i]) + } + } + }) + } +} + +func TestNodeRPCRoundRobin(t *testing.T) { + // Create a simple test for Node RPC round-robin logic + nodeUrls := []string{ + "https://node1.test.com", + "https://node2.test.com", + "https://node3.test.com", + } + + // Test round-robin logic without actual connections + index := 0 + var indices []int + for i := 0; i < 6; i++ { + indices = append(indices, index) + index = (index + 1) % len(nodeUrls) + } + + expected := []int{0, 1, 2, 0, 1, 2} + if !equalIntSlices(indices, expected) { + t.Errorf("Node RPC round-robin failed. Expected %v, got %v", expected, indices) + } + + t.Logf("Node RPC round-robin test passed: %v", indices) + t.Logf("Node URLs used in rotation: %v", nodeUrls) +} + +// Helper function to compare int slices +func equalIntSlices(a, b []int) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/pkg/rpc/monitor.go b/pkg/rpc/monitor.go index 4bcdecb..93fc1ce 100644 --- a/pkg/rpc/monitor.go +++ b/pkg/rpc/monitor.go @@ -4,14 +4,31 @@ import ( "context" "fmt" "log" + "sync" "time" ) +// PerMinuteStats 每分钟统计数据 +type PerMinuteStats struct { + L1RequestsPerMin float64 + L2RequestsPerMin float64 + NodeRequestsPerMin float64 + L1RateLimitedPerMin float64 + L2RateLimitedPerMin float64 + NodeRateLimitedPerMin float64 + HTTPRequestsPerMin float64 + TotalGoroutines int + ActiveGoroutines int +} + // Monitor RPC manager monitor type Monitor struct { - manager *Manager - interval time.Duration - logger Logger + manager *Manager + interval time.Duration + logger Logger + lastStats StatsSnapshot + lastTime time.Time + mu sync.RWMutex } // Logger logging interface @@ -29,9 +46,11 @@ func (dl *DefaultLogger) Printf(format string, v ...interface{}) { // NewMonitor creates monitor func NewMonitor(manager *Manager, interval time.Duration) *Monitor { return &Monitor{ - manager: manager, - interval: interval, - logger: &DefaultLogger{}, + manager: manager, + interval: interval, + logger: &DefaultLogger{}, + lastStats: manager.GetStats(), // 初始化统计数据 + lastTime: time.Now(), } } @@ -60,30 +79,67 @@ func (m *Monitor) Start(ctx context.Context) { // logStats logs statistics information func (m *Monitor) logStats() { - stats := m.manager.GetStats() + m.mu.Lock() + defer m.mu.Unlock() - l1Rate, l1Burst := m.manager.GetRateLimit(true) - l2Rate, l2Burst := m.manager.GetRateLimit(false) + currentStats := m.manager.GetStats() + currentTime := time.Now() - l1Tokens := m.manager.GetTokens(true) - l2Tokens := m.manager.GetTokens(false) + // 计算时间差(分钟) + timeDiff := currentTime.Sub(m.lastTime).Minutes() + if timeDiff == 0 { + return // 避免除零 + } + + // 计算每分钟的增量 + perMinStats := PerMinuteStats{ + L1RequestsPerMin: float64(currentStats.L1RequestCount-m.lastStats.L1RequestCount) / timeDiff, + L2RequestsPerMin: float64(currentStats.L2RequestCount-m.lastStats.L2RequestCount) / timeDiff, + NodeRequestsPerMin: float64(currentStats.NodeRequestCount-m.lastStats.NodeRequestCount) / timeDiff, + L1RateLimitedPerMin: float64(currentStats.L1RateLimitedCount-m.lastStats.L1RateLimitedCount) / timeDiff, + L2RateLimitedPerMin: float64(currentStats.L2RateLimitedCount-m.lastStats.L2RateLimitedCount) / timeDiff, + NodeRateLimitedPerMin: float64(currentStats.NodeRateLimitedCount-m.lastStats.NodeRateLimitedCount) / timeDiff, + HTTPRequestsPerMin: float64(currentStats.HTTPRequestCount-m.lastStats.HTTPRequestCount) / timeDiff, + } + // 获取限流信息 + l1Rate, l1Burst := m.manager.GetRateLimit("L1") + l2Rate, l2Burst := m.manager.GetRateLimit("L2") + nodeRate, nodeBurst := m.manager.GetRateLimit("Node") + + l1Tokens := m.manager.GetTokens("L1") + l2Tokens := m.manager.GetTokens("L2") + nodeTokens := m.manager.GetTokens("Node") + + // 输出每分钟平均统计 m.logger.Printf( - "[RPC Stats] L1: %d requests (%d limited), L2: %d requests (%d limited), HTTP: %d", - stats.L1RequestCount, stats.L1RateLimitedCount, - stats.L2RequestCount, stats.L2RateLimitedCount, - stats.HTTPRequestCount, + "[RPC PerMin] L1: %.1f req/min (%.1f limited/min), L2: %.1f req/min (%.1f limited/min), Node: %.1f req/min (%.1f limited/min), HTTP: %.1f req/min", + perMinStats.L1RequestsPerMin, perMinStats.L1RateLimitedPerMin, + perMinStats.L2RequestsPerMin, perMinStats.L2RateLimitedPerMin, + perMinStats.NodeRequestsPerMin, perMinStats.NodeRateLimitedPerMin, + perMinStats.HTTPRequestsPerMin, ) m.logger.Printf( - "[RPC Limits] L1: %.1f/s (burst %d, tokens %.2f), L2: %.1f/s (burst %d, tokens %.2f)", + "[RPC Limits] L1: %.1f/s (burst %d, tokens %.2f), L2: %.1f/s (burst %d, tokens %.2f), Node: %.1f/s (burst %d, tokens %.2f)", l1Rate, l1Burst, l1Tokens, l2Rate, l2Burst, l2Tokens, + nodeRate, nodeBurst, nodeTokens, + ) + + // 输出累计统计(可选) + m.logger.Printf( + "[RPC Total] L1: %d total (%d limited), L2: %d total (%d limited), Node: %d total (%d limited), HTTP: %d total", + currentStats.L1RequestCount, currentStats.L1RateLimitedCount, + currentStats.L2RequestCount, currentStats.L2RateLimitedCount, + currentStats.NodeRequestCount, currentStats.NodeRateLimitedCount, + currentStats.HTTPRequestCount, ) // warning messages - if stats.L1RateLimitedCount > 0 || stats.L2RateLimitedCount > 0 { - m.logger.Printf("[RPC Warning] Rate limiting is active!") + if perMinStats.L1RateLimitedPerMin > 0 || perMinStats.L2RateLimitedPerMin > 0 || perMinStats.NodeRateLimitedPerMin > 0 { + m.logger.Printf("[RPC Warning] Rate limiting is active! L1: %.1f/min, L2: %.1f/min, Node: %.1f/min", + perMinStats.L1RateLimitedPerMin, perMinStats.L2RateLimitedPerMin, perMinStats.NodeRateLimitedPerMin) } if l1Tokens < 1.0 { @@ -93,13 +149,45 @@ func (m *Monitor) logStats() { if l2Tokens < 1.0 { m.logger.Printf("[RPC Warning] L2 tokens running low: %.2f", l2Tokens) } + + if nodeTokens < 1.0 { + m.logger.Printf("[RPC Warning] Node tokens running low: %.2f", nodeTokens) + } + + // 更新记录 + m.lastStats = currentStats + m.lastTime = currentTime +} + +// GetPerMinuteStats 获取每分钟统计信息 +func (m *Monitor) GetPerMinuteStats() PerMinuteStats { + m.mu.RLock() + defer m.mu.RUnlock() + + currentStats := m.manager.GetStats() + currentTime := time.Now() + + timeDiff := currentTime.Sub(m.lastTime).Minutes() + if timeDiff == 0 { + return PerMinuteStats{} + } + + return PerMinuteStats{ + L1RequestsPerMin: float64(currentStats.L1RequestCount-m.lastStats.L1RequestCount) / timeDiff, + L2RequestsPerMin: float64(currentStats.L2RequestCount-m.lastStats.L2RequestCount) / timeDiff, + NodeRequestsPerMin: float64(currentStats.NodeRequestCount-m.lastStats.NodeRequestCount) / timeDiff, + L1RateLimitedPerMin: float64(currentStats.L1RateLimitedCount-m.lastStats.L1RateLimitedCount) / timeDiff, + L2RateLimitedPerMin: float64(currentStats.L2RateLimitedCount-m.lastStats.L2RateLimitedCount) / timeDiff, + NodeRateLimitedPerMin: float64(currentStats.NodeRateLimitedCount-m.lastStats.NodeRateLimitedCount) / timeDiff, + HTTPRequestsPerMin: float64(currentStats.HTTPRequestCount-m.lastStats.HTTPRequestCount) / timeDiff, + } } // GetHealthCheck gets health check information func (m *Monitor) GetHealthCheck() HealthCheckResult { stats := m.manager.GetStats() - l1Tokens := m.manager.GetTokens(true) - l2Tokens := m.manager.GetTokens(false) + l1Tokens := m.manager.GetTokens("L1") + l2Tokens := m.manager.GetTokens("L2") isHealthy := true var issues []string