-
Notifications
You must be signed in to change notification settings - Fork 27
migration boundry fix #288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -348,12 +348,19 @@ func (m *Migrator) DetermineMigrationBoundaries(targetStartBlock, targetEndBlock | |||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||
| log.Fatal().Err(err).Msg("Failed to get latest block from main storage") | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background()) | ||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||
| log.Fatal().Err(err).Msg("Failed to get latest block from RPC") | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| log.Info().Msgf("Latest block in main storage: %d", latestBlockStored) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| endBlock := latestBlockStored | ||||||||||||||||||||||||||||||||||||||||||||
| if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockStored) < 0 { | ||||||||||||||||||||||||||||||||||||||||||||
| if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 { | ||||||||||||||||||||||||||||||||||||||||||||
| endBlock = targetEndBlock | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| if targetEndBlock.Uint64() == 0 { | ||||||||||||||||||||||||||||||||||||||||||||
| endBlock = latestBlockRPC | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+358
to
364
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. End-boundary can exceed RPC latest when targetEndBlock > RPC; also missing start<=end guard. Ensure we never migrate beyond RPC capability and validate the range. - endBlock := latestBlockStored
- if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
- endBlock = targetEndBlock
- }
- if targetEndBlock.Uint64() == 0 {
- endBlock = latestBlockRPC
- }
+ // Cap by RPC latest, optionally by user-specified target
+ endBlock := new(big.Int).Set(latestBlockRPC)
+ if targetEndBlock.Sign() > 0 {
+ if targetEndBlock.Cmp(latestBlockRPC) <= 0 {
+ endBlock = targetEndBlock
+ } else {
+ // Requested end is beyond RPC; cap at RPC latest
+ endBlock = latestBlockRPC
+ }
+ }
+
+ // Validate final range
+ if endBlock.Cmp(targetStartBlock) < 0 {
+ log.Fatal().Msgf("Invalid migration range: end block %s is less than start block %s", endBlock.String(), targetStartBlock.String())
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||
| startBlock := targetStartBlock | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -103,6 +103,9 @@ func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cfg.BufferTimeout == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg.BufferTimeout = 1 * 60 * 60 // 1 hour in seconds default | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cfg.FlushTimeout == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg.FlushTimeout = 300 // 5 mins default | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create formatter based on format | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var formatter DataFormatter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -309,17 +312,17 @@ func (s *S3Connector) Flush() error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-s.flushDoneCh: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(60 * time.Second): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("flush timeout after 60 seconds") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Flush channel is full, likely a flush is already in progress | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Wait for it to complete | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-s.flushDoneCh: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(60 * time.Second): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("flush timeout after 60 seconds") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
312
to
327
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Flush completion uses a shared unbuffered channel; concurrent Flush callers can time out or hang. Only one waiter will receive from flushDoneCh per flush; others will block until another flush or hit timeout. Guard Flush with a mutex to serialize calls (minimal change), or switch to a broadcast/WaitGroup pattern. Minimal mutex approach: type S3Connector struct {
@@
flushDoneCh chan struct{} // Signals when flush is complete
flushTimer *time.Timer
timerMu sync.Mutex
+ flushMu sync.Mutex
lastAddTime time.TimeAnd at the beginning of Flush(): func (s *S3Connector) Flush() error {
+ s.flushMu.Lock()
+ defer s.flushMu.Unlock()Longer-term: replace flushDoneCh with a per-flush ack (e.g., send a chan struct{} via flushCh) or use a sync.Cond to broadcast completion to all waiters. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix log formatting for big.Int.
Using %d with *big.Int produces malformed output. Use %s or call .String().
📝 Committable suggestion
🤖 Prompt for AI Agents