Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/migrate_valid.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,19 @@ func (m *Migrator) DetermineMigrationBoundaries(targetStartBlock, targetEndBlock
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from main storage")
}
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)

Comment on lines +351 to 356
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix log formatting for big.Int.

Using %d with *big.Int produces malformed output. Use %s or call .String().

- log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
+ log.Info().Msgf("Latest block in main storage: %s", latestBlockStored.String())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %s", latestBlockStored.String())
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 351 to 356, the log uses fmt %d with a
*big.Int which yields malformed output; change the log to use %s with
latestBlockStored.String() (or call .String() and pass that) so the big.Int is
formatted correctly in the log message.

endBlock := latestBlockStored
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockStored) < 0 {
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
}
if targetEndBlock.Uint64() == 0 {
endBlock = latestBlockRPC
}

Comment on lines +358 to 364
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

End-boundary can exceed RPC latest when targetEndBlock > RPC; also missing start<=end guard.

Ensure we never migrate beyond RPC capability and validate the range.

- endBlock := latestBlockStored
- if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
-   endBlock = targetEndBlock
- }
- if targetEndBlock.Uint64() == 0 {
-   endBlock = latestBlockRPC
- }
+ // Cap by RPC latest, optionally by user-specified target
+ endBlock := new(big.Int).Set(latestBlockRPC)
+ if targetEndBlock.Sign() > 0 {
+   if targetEndBlock.Cmp(latestBlockRPC) <= 0 {
+     endBlock = targetEndBlock
+   } else {
+     // Requested end is beyond RPC; cap at RPC latest
+     endBlock = latestBlockRPC
+   }
+ }
+
+ // Validate final range
+ if endBlock.Cmp(targetStartBlock) < 0 {
+   log.Fatal().Msgf("Invalid migration range: end block %s is less than start block %s", endBlock.String(), targetStartBlock.String())
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
}
if targetEndBlock.Uint64() == 0 {
endBlock = latestBlockRPC
}
// Cap by RPC latest, optionally by user-specified target
endBlock := new(big.Int).Set(latestBlockRPC)
if targetEndBlock.Sign() > 0 {
if targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
} else {
// Requested end is beyond RPC; cap at RPC latest
endBlock = latestBlockRPC
}
}
// Validate final range
if endBlock.Cmp(targetStartBlock) < 0 {
log.Fatal().Msgf("Invalid migration range: end block %s is less than start block %s", endBlock.String(), targetStartBlock.String())
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 358 to 364, the current logic can set
endBlock past the RPC latest and lacks a start<=end guard; change it to: if
targetEndBlock is zero use latestBlockRPC, else if targetEndBlock >
latestBlockRPC set endBlock = latestBlockRPC (cap to RPC), otherwise set
endBlock = targetEndBlock; after computing endBlock validate that
startBlock.Cmp(endBlock) <= 0 and return an error (or exit) if startBlock >
endBlock to prevent an invalid range.

startBlock := targetStartBlock

Expand Down
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type S3StorageConfig struct {
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush
MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers)
FlushTimeout int `mapstructure:"flushTimeoutSeconds"` // Timeout in seconds for flush operations (default: 60)
}

type ParquetConfig struct {
Expand Down
11 changes: 7 additions & 4 deletions internal/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error) {
if cfg.BufferTimeout == 0 {
cfg.BufferTimeout = 1 * 60 * 60 // 1 hour in seconds default
}
if cfg.FlushTimeout == 0 {
cfg.FlushTimeout = 300 // 5 mins default
}

// Create formatter based on format
var formatter DataFormatter
Expand Down Expand Up @@ -309,17 +312,17 @@ func (s *S3Connector) Flush() error {
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
Comment on lines 312 to 327
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Flush completion uses a shared unbuffered channel; concurrent Flush callers can time out or hang.

Only one waiter will receive from flushDoneCh per flush; others will block until another flush or hit timeout. Guard Flush with a mutex to serialize calls (minimal change), or switch to a broadcast/WaitGroup pattern.

Minimal mutex approach:

 type S3Connector struct {
@@
   flushDoneCh chan struct{} // Signals when flush is complete
   flushTimer  *time.Timer
   timerMu     sync.Mutex
+  flushMu     sync.Mutex
   lastAddTime time.Time

And at the beginning of Flush():

 func (s *S3Connector) Flush() error {
+  s.flushMu.Lock()
+  defer s.flushMu.Unlock()

Longer-term: replace flushDoneCh with a per-flush ack (e.g., send a chan struct{} via flushCh) or use a sync.Cond to broadcast completion to all waiters.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
type S3Connector struct {
// ...
flushDoneCh chan struct{} // Signals when flush is complete
flushTimer *time.Timer
timerMu sync.Mutex
flushMu sync.Mutex
lastAddTime time.Time
}
func (s *S3Connector) Flush() error {
s.flushMu.Lock()
defer s.flushMu.Unlock()
select {
case <-s.flushDoneCh:
return nil
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
}

}
Expand Down
Loading