diff --git a/cmd/migrate_valid.go b/cmd/migrate_valid.go index 23a81c3..3469f33 100644 --- a/cmd/migrate_valid.go +++ b/cmd/migrate_valid.go @@ -348,12 +348,19 @@ func (m *Migrator) DetermineMigrationBoundaries(targetStartBlock, targetEndBlock if err != nil { log.Fatal().Err(err).Msg("Failed to get latest block from main storage") } + latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background()) + if err != nil { + log.Fatal().Err(err).Msg("Failed to get latest block from RPC") + } log.Info().Msgf("Latest block in main storage: %d", latestBlockStored) endBlock := latestBlockStored - if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockStored) < 0 { + if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 { endBlock = targetEndBlock } + if targetEndBlock.Uint64() == 0 { + endBlock = latestBlockRPC + } startBlock := targetStartBlock diff --git a/configs/config.go b/configs/config.go index 6b5c10e..74f71cd 100644 --- a/configs/config.go +++ b/configs/config.go @@ -93,6 +93,7 @@ type S3StorageConfig struct { BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers) + FlushTimeout int `mapstructure:"flushTimeoutSeconds"` // Timeout in seconds for flush operations (default: 60) } type ParquetConfig struct { diff --git a/internal/storage/s3.go b/internal/storage/s3.go index a0b51ae..e69b796 100644 --- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -103,6 +103,9 @@ func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error) { if cfg.BufferTimeout == 0 { cfg.BufferTimeout = 1 * 60 * 60 // 1 hour in seconds default } + if cfg.FlushTimeout == 0 { + cfg.FlushTimeout = 300 // 5 mins default + } // Create formatter based on format var formatter DataFormatter @@ -309,8 +312,8 @@ func (s *S3Connector) Flush() error { select { case <-s.flushDoneCh: return nil - case <-time.After(60 * time.Second): - return fmt.Errorf("flush timeout after 60 seconds") + case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second): + return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout) } default: // Flush channel is full, likely a flush is already in progress @@ -318,8 +321,8 @@ func (s *S3Connector) Flush() error { select { case <-s.flushDoneCh: return nil - case <-time.After(60 * time.Second): - return fmt.Errorf("flush timeout after 60 seconds") + case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second): + return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout) } } }