From 0761c4fa8b41a5957cf3aca070aca78ed792d99e Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 17:30:39 +0545 Subject: [PATCH 01/21] redo committer --- configs/config.go | 14 ++ internal/committer/README.md | 180 ++++++++++++++ internal/committer/committer.go | 426 ++++++++++++++++++++++++++++++++ 3 files changed, 620 insertions(+) create mode 100644 internal/committer/README.md create mode 100644 internal/committer/committer.go diff --git a/configs/config.go b/configs/config.go index 6b5c10e..4ec3b40 100644 --- a/configs/config.go +++ b/configs/config.go @@ -270,6 +270,20 @@ type Config struct { Publisher PublisherConfig `mapstructure:"publisher"` Validation ValidationConfig `mapstructure:"validation"` Migrator MigratorConfig `mapstructure:"migrator"` + + CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` + CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` + CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` + CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` + CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` + CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` + CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` + CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` + + StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` + StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` + StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` + StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` } var Cfg Config diff --git a/internal/committer/README.md b/internal/committer/README.md new file mode 100644 index 0000000..683633f --- /dev/null +++ b/internal/committer/README.md @@ -0,0 +1,180 @@ +# Committer Package + +This package implements a committer that processes block data from S3 parquet files and publishes them to Kafka. It follows the requirements specified in the original comments. + +## Features + +- **ClickHouse Integration**: Gets the maximum block number from ClickHouse for the chain +- **S3 File Discovery**: Lists parquet files from S3 with chain-specific prefixes +- **Block Range Parsing**: Extracts start and end block numbers from S3 filenames +- **File Filtering**: Skips files where end block is less than max block number from ClickHouse +- **Sequential Processing**: Processes files in ascending order by start block number +- **Memory-Efficient Streaming**: Streams parquet files row-by-row to minimize memory usage +- **Kafka Publishing**: Publishes processed block data to Kafka +- **Error Handling**: Comprehensive error handling with detailed logging + +## Usage + +### Basic Usage + +```go +package main + +import ( + "context" + "math/big" + "log" + + "github.com/thirdweb-dev/indexer/internal/committer" + "github.com/thirdweb-dev/indexer/configs" +) + +func main() { + // Load configuration + if err := configs.LoadConfig("config.yml"); err != nil { + log.Fatal("Failed to load config:", err) + } + + // Create committer for chain ID 1 (Ethereum mainnet) + chainId := big.NewInt(1) + committer, err := committer.NewCommitterFromConfig(chainId) + if err != nil { + log.Fatal("Failed to create committer:", err) + } + defer committer.Close() + + // Process blocks + ctx := context.Background() + if err := committer.ProcessBlocks(ctx); err != nil { + log.Fatal("Failed to process blocks:", err) + } +} +``` + +### Advanced Usage with Custom Configuration + +```go +package main + +import ( + "context" + "math/big" + "log" + + "github.com/thirdweb-dev/indexer/internal/committer" + "github.com/thirdweb-dev/indexer/configs" +) + +func main() { + // Custom configuration + chainId := big.NewInt(137) // Polygon + + clickhouseConfig := &configs.ClickhouseConfig{ + Host: "localhost", + Port: 9000, + Username: "default", + Password: "", + Database: "insight", + } + + s3Config := &configs.S3Config{ + Bucket: "thirdweb-insight-production", + Region: "us-east-1", + AccessKeyID: "your-access-key", + SecretAccessKey: "your-secret-key", + } + + kafkaConfig := &configs.KafkaConfig{ + Brokers: "localhost:9092", + } + + // Create committer + committer, err := committer.NewCommitter(chainId, clickhouseConfig, s3Config, kafkaConfig) + if err != nil { + log.Fatal("Failed to create committer:", err) + } + defer committer.Close() + + // Process blocks + ctx := context.Background() + if err := committer.ProcessBlocks(ctx); err != nil { + log.Fatal("Failed to process blocks:", err) + } +} +``` + +## Configuration Requirements + +The committer requires the following configuration: + +### ClickHouse Configuration +- Host, Port, Username, Password, Database +- Used to query the maximum block number for the chain + +### S3 Configuration +- Bucket name (e.g., "thirdweb-insight-production") +- Region, Access Key ID, Secret Access Key +- Used to list and download parquet files + +### Kafka Configuration +- Brokers list +- Used to publish processed block data + +## S3 File Structure + +The committer expects S3 files to follow this naming pattern: +``` +chain_${chainId}/year=2024/blocks_1000_2000.parquet +``` + +Where: +- `chain_${chainId}` is the prefix for the chain +- `year=2024` is the partitioning by year +- `blocks_1000_2000.parquet` contains blocks from 1000 to 2000 + +## Parquet File Structure + +The parquet files should contain the following columns: +- `chain_id` (uint64): Chain identifier +- `block_number` (uint64): Block number +- `block_hash` (string): Block hash +- `block_timestamp` (int64): Block timestamp +- `block_json` (bytes): Serialized block data +- `transactions_json` (bytes): Serialized transactions data +- `logs_json` (bytes): Serialized logs data +- `traces_json` (bytes): Serialized traces data + +## Processing Flow + +1. **Query ClickHouse**: Get the maximum block number for the chain +2. **List S3 Files**: Find all parquet files with the chain prefix +3. **Filter Files**: Skip files where end block ≤ max block number +4. **Sort Files**: Order by start block number (ascending) +5. **Process Sequentially**: For each file: + - Download from S3 to local storage + - Stream parquet file row-by-row + - Skip blocks < next commit block number + - Error if block > next commit block number (missing data) + - Publish found blocks to Kafka + - Increment commit block number + - Clean up local file + +## Error Handling + +The committer includes comprehensive error handling: +- Missing configuration validation +- S3 connection and download errors +- Parquet file parsing errors +- Kafka publishing errors +- Block sequence validation errors + +All errors are logged with detailed context for debugging. + +## Memory Management + +The committer is designed to be memory-efficient: +- Downloads files directly to disk (no in-memory buffering) +- Streams parquet files row-by-row +- Processes one file at a time +- Cleans up local files after processing +- Uses semaphores to limit concurrent operations diff --git a/internal/committer/committer.go b/internal/committer/committer.go new file mode 100644 index 0000000..2b442ff --- /dev/null +++ b/internal/committer/committer.go @@ -0,0 +1,426 @@ +package committer + +import ( + "context" + "encoding/json" + "fmt" + "io" + "math/big" + "os" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/parquet-go/parquet-go" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/storage" +) + +// BlockRange represents a range of blocks in an S3 parquet file +type BlockRange struct { + StartBlock *big.Int `json:"start_block"` + EndBlock *big.Int `json:"end_block"` + S3Key string `json:"s3_key"` + IsDownloaded bool `json:"is_downloaded"` + LocalPath string `json:"local_path,omitempty"` +} + +var clickhouseConn, _ = storage.NewClickHouseConnector(&config.ClickhouseConfig{ + Host: config.Cfg.CommitterClickhouseHost, + Port: config.Cfg.CommitterClickhousePort, + Username: config.Cfg.CommitterClickhouseUsername, + Password: config.Cfg.CommitterClickhousePassword, + Database: config.Cfg.CommitterClickhouseDatabase, +}) + +var awsCfg, _ = awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithCredentialsProvider(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: config.Cfg.StagingS3AccessKeyID, + SecretAccessKey: config.Cfg.StagingS3SecretAccessKey, + }, nil + })), + awsconfig.WithRegion(config.Cfg.StagingS3Region), +) + +var s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com") +}) + +var kafkaPublisher, _ = storage.NewKafkaPublisher(&config.KafkaConfig{ + Brokers: config.Cfg.CommitterKafkaBrokers, + Username: config.Cfg.CommitterKafkaUsername, + Password: config.Cfg.CommitterKafkaPassword, + EnableTLS: true, +}) + +var downloadSemaphore = make(chan struct{}, 3) +var tempDir = filepath.Join(os.TempDir(), "committer") +var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) + +// NewCommitter creates a new committer instance +func Commit(chainId *big.Int, s3Config *config.S3Config, kafkaConfig *config.KafkaConfig) error { + maxBlockNumber, err := clickhouseConn.GetMaxBlockNumber(chainId) + if err != nil { + log.Error().Err(err).Msg("Failed to get max block number from ClickHouse") + return err + } + + files, err := listS3ParquetFiles(chainId) + if err != nil { + log.Error().Err(err).Msg("Failed to list S3 parquet files") + return err + } + + blockRanges, err := filterAndSortBlockRanges(files, maxBlockNumber) + if err != nil { + log.Error().Err(err).Msg("Failed to filter and sort block ranges") + return err + } + + // need to make this into goroutine + for _, blockRange := range blockRanges { + downloadFile(&blockRange) + } + + nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) + for _, blockRange := range blockRanges { + err := streamParquetFile(chainId, blockRange.LocalPath, nextCommitBlockNumber) + if err != nil { + log.Panic().Err(err).Msg("Failed to stream parquet file") + } + // Clean up local file + if err := os.Remove(blockRange.LocalPath); err != nil { + log.Warn(). + Err(err). + Str("file", blockRange.LocalPath). + Msg("Failed to clean up local file") + } + } + + return nil +} + +// Close cleans up resources +func Close() error { + if clickhouseConn != nil { + clickhouseConn.Close() + } + if kafkaPublisher != nil { + kafkaPublisher.Close() + } + // Clean up temp directory + return os.RemoveAll(tempDir) +} + +// getMaxBlockNumberFromClickHouse gets the maximum block number for the chain from ClickHouse +func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { + return clickhouseConn.GetMaxBlockNumber(chainId) +} + +// listS3ParquetFiles lists all parquet files in S3 with the chain prefix +func listS3ParquetFiles(chainId *big.Int) ([]string, error) { + prefix := fmt.Sprintf("chain_%d/", chainId.Uint64()) + var files []string + + paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{ + Bucket: aws.String(config.Cfg.StagingS3Bucket), + Prefix: aws.String(prefix), + }) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to list S3 objects: %w", err) + } + + for _, obj := range page.Contents { + if obj.Key != nil && strings.HasSuffix(*obj.Key, ".parquet") { + files = append(files, *obj.Key) + } + } + } + + return files, nil +} + +// parseBlockRangeFromFilename extracts start and end block numbers from S3 filename +// Expected format: chain_${chainId}/year=2024/blocks_1000_2000.parquet +func parseBlockRangeFromFilename(filename string) (*big.Int, *big.Int, error) { + // Extract the filename part after the last slash + parts := strings.Split(filename, "/") + if len(parts) == 0 { + return nil, nil, fmt.Errorf("invalid filename format: %s", filename) + } + + filePart := parts[len(parts)-1] + + // Use regex to extract block numbers from filename like "blocks_1000_2000.parquet" + matches := parquetFilenameRegex.FindStringSubmatch(filePart) + if len(matches) != 3 { + return nil, nil, fmt.Errorf("could not parse block range from filename: %s", filename) + } + + startBlock, err := strconv.ParseInt(matches[1], 10, 64) + if err != nil { + return nil, nil, fmt.Errorf("invalid start block number: %s", matches[1]) + } + + endBlock, err := strconv.ParseInt(matches[2], 10, 64) + if err != nil { + return nil, nil, fmt.Errorf("invalid end block number: %s", matches[2]) + } + + return big.NewInt(startBlock), big.NewInt(endBlock), nil +} + +// filterAndSortBlockRanges filters block ranges by max block number and sorts them +func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockRange, error) { + var blockRanges []BlockRange + + for _, file := range files { + startBlock, endBlock, err := parseBlockRangeFromFilename(file) + if err != nil { + log.Warn().Err(err).Str("file", file).Msg("Skipping file with invalid format") + continue + } + + // Skip files where end block is less than max block number from ClickHouse + if endBlock.Cmp(maxBlockNumber) <= 0 { + log.Debug(). + Str("file", file). + Str("end_block", endBlock.String()). + Str("max_block", maxBlockNumber.String()). + Msg("Skipping file - end block is less than or equal to max block") + continue + } + + blockRanges = append(blockRanges, BlockRange{ + StartBlock: startBlock, + EndBlock: endBlock, + S3Key: file, + IsDownloaded: false, + }) + } + + // Sort by start block number in ascending order + sort.Slice(blockRanges, func(i, j int) bool { + return blockRanges[i].StartBlock.Cmp(blockRanges[j].StartBlock) < 0 + }) + + return blockRanges, nil +} + +// downloadFile downloads a file from S3 and saves it to local storage +func downloadFile(blockRange *BlockRange) error { + // Acquire semaphore to limit concurrent downloads + downloadSemaphore <- struct{}{} + defer func() { <-downloadSemaphore }() + + // Generate local file path + localPath := filepath.Join(tempDir, filepath.Base(blockRange.S3Key)) + + // Download from S3 + result, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String(config.Cfg.StagingS3Bucket), + Key: aws.String(blockRange.S3Key), + }) + if err != nil { + return fmt.Errorf("failed to download file from S3: %w", err) + } + defer result.Body.Close() + + // Create local file + file, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file: %w", err) + } + defer file.Close() + + // Stream download directly to file without keeping in memory + _, err = file.ReadFrom(result.Body) + if err != nil { + os.Remove(localPath) // Clean up on error + return fmt.Errorf("failed to write file: %w", err) + } + + // Update block range with local path and downloaded status + mu.Lock() + blockRange.LocalPath = localPath + blockRange.IsDownloaded = true + mu.Unlock() + + log.Info(). + Str("s3_key", blockRange.S3Key). + Str("local_path", localPath). + Msg("Successfully downloaded file from S3") + + return nil +} + +// ParquetBlockData represents the block data structure in parquet files +type ParquetBlockData struct { + ChainId uint64 `parquet:"chain_id"` + BlockNumber uint64 `parquet:"block_number"` + BlockHash string `parquet:"block_hash"` + BlockTimestamp int64 `parquet:"block_timestamp"` + Block []byte `parquet:"block_json"` + Transactions []byte `parquet:"transactions_json"` + Logs []byte `parquet:"logs_json"` + Traces []byte `parquet:"traces_json"` +} + +// streamParquetFile streams a parquet file row by row and processes blocks +func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber *big.Int) error { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open parquet file: %w", err) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to get file stats: %w", err) + } + + pFile, err := parquet.OpenFile(file, stat.Size()) + if err != nil { + return fmt.Errorf("failed to open parquet file: %w", err) + } + + for _, rg := range pFile.RowGroups() { + // Use row-by-row reading to avoid loading entire row group into memory + // read the row group row by row and get each row. for each row do the following + // if block number is less than next commit block number, continue + // if block number is greater than next commit block number, return error + // if block number is equal to next commit block number, parse the block data and publish to kafka + // increment next commit block number by 1 + + for _, row := range rg.Rows() { + blockNum := row[1].Uint64() + if blockNum.Cmp(nextCommitBlockNumber) < 0 { + continue + } + + if blockNum.Cmp(nextCommitBlockNumber) > 0 { + return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) + } + + blockData, err := parseBlockData(row) + if err != nil { + return fmt.Errorf("failed to parse block data: %w", err) + } + + kafkaPublisher.PublishBlockData([]common.BlockData{blockData}) + nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) + } + } + + return nil +} + +// readRowGroupStreamingly reads a row group row-by-row to minimize memory usage +func readRowGroupStreamingly(rg parquet.RowGroup, currentCommitBlock *big.Int, blockData *[]common.BlockData) error { + reader := parquet.NewRowGroupReader(rg) + + // Process rows one at a time instead of loading all into memory + for { + // Read single row + row := make([]parquet.Row, 1) + n, err := reader.ReadRows(row) + if err == io.EOF || n == 0 { + break + } + if err != nil { + return fmt.Errorf("failed to read row: %w", err) + } + + if len(row[0]) < 8 { + continue // Not enough columns + } + + // Extract block number first to check if we need this row + blockNum := row[0][1].Uint64() // block_number is second column + blockNumber := big.NewInt(int64(blockNum)) + + // Skip if block number is less than next commit block number + if blockNumber.Cmp(currentCommitBlock) < 0 { + continue + } + + // If block number is greater than next commit block number, exit with error + if blockNumber.Cmp(currentCommitBlock) > 0 { + return fmt.Errorf("block data not found for block number %s in S3", currentCommitBlock.String()) + } + + // Build ParquetBlockData from row + pd := ParquetBlockData{ + ChainId: row[0][0].Uint64(), + BlockNumber: blockNum, + BlockHash: row[0][2].String(), + BlockTimestamp: row[0][3].Int64(), + Block: row[0][4].ByteArray(), + Transactions: row[0][5].ByteArray(), + Logs: row[0][6].ByteArray(), + Traces: row[0][7].ByteArray(), + } + + // Parse block data + parsedBlockData, err := parseBlockData(pd) + if err != nil { + log.Warn().Err(err).Uint64("block", pd.BlockNumber).Msg("Failed to parse block data") + continue + } + + *blockData = append(*blockData, parsedBlockData) + + // Increment next commit block number by 1 + currentCommitBlock.Add(currentCommitBlock, big.NewInt(1)) + } + + return nil +} + +// parseBlockData converts ParquetBlockData to common.BlockData +func parseBlockData(pd ParquetBlockData) (common.BlockData, error) { + // Unmarshal JSON data + var block common.Block + if err := json.Unmarshal(pd.Block, &block); err != nil { + return common.BlockData{}, fmt.Errorf("failed to unmarshal block: %w", err) + } + + var transactions []common.Transaction + if len(pd.Transactions) > 0 { + if err := json.Unmarshal(pd.Transactions, &transactions); err != nil { + log.Warn().Err(err).Uint64("block", pd.BlockNumber).Msg("Failed to unmarshal transactions") + } + } + + var logs []common.Log + if len(pd.Logs) > 0 { + if err := json.Unmarshal(pd.Logs, &logs); err != nil { + log.Warn().Err(err).Uint64("block", pd.BlockNumber).Msg("Failed to unmarshal logs") + } + } + + var traces []common.Trace + if len(pd.Traces) > 0 { + if err := json.Unmarshal(pd.Traces, &traces); err != nil { + log.Warn().Err(err).Uint64("block", pd.BlockNumber).Msg("Failed to unmarshal traces") + } + } + + return common.BlockData{ + Block: block, + Transactions: transactions, + Logs: logs, + Traces: traces, + }, nil +} From 5e0891d78b0997cc025252ac65aecf10c6509416 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 17:37:46 +0545 Subject: [PATCH 02/21] a --- internal/committer/committer.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 2b442ff..9399161 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -85,18 +85,18 @@ func Commit(chainId *big.Int, s3Config *config.S3Config, kafkaConfig *config.Kaf return err } - // need to make this into goroutine - for _, blockRange := range blockRanges { - downloadFile(&blockRange) - } + go downloadFilesInBackground(blockRanges) nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) for _, blockRange := range blockRanges { + // use isDownloaded channel to wait. check blockRange.IsDownloaded == true else wait. + err := streamParquetFile(chainId, blockRange.LocalPath, nextCommitBlockNumber) if err != nil { log.Panic().Err(err).Msg("Failed to stream parquet file") } // Clean up local file + // maybe publish to fileDeleted channel after file is deleted for downloadFilesInBackground to continue if err := os.Remove(blockRange.LocalPath); err != nil { log.Warn(). Err(err). @@ -108,6 +108,15 @@ func Commit(chainId *big.Int, s3Config *config.S3Config, kafkaConfig *config.Kaf return nil } +func downloadFilesInBackground(blockRanges []BlockRange) { + // dont download all files, if there are too many files, wait for some of them to be deleted. i.e max file could downloaded should be 10. + // if there are already 10 files, just wait for file count to decrease and download more. + // use fileDeleted channel to wait. + for _, blockRange := range blockRanges { + downloadFile(&blockRange) + } +} + // Close cleans up resources func Close() error { if clickhouseConn != nil { @@ -261,6 +270,7 @@ func downloadFile(blockRange *BlockRange) error { Str("s3_key", blockRange.S3Key). Str("local_path", localPath). Msg("Successfully downloaded file from S3") + // publish to isDownloaded channel after file is downloaded return nil } From 5c67d5029fc786e55b6db3bc7229733bd76090af Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 18:12:43 +0545 Subject: [PATCH 03/21] committer redo --- cmd/committer.go | 29 +++++ cmd/root.go | 1 + internal/committer/committer.go | 191 +++++++++++++++++--------------- 3 files changed, 131 insertions(+), 90 deletions(-) create mode 100644 cmd/committer.go diff --git a/cmd/committer.go b/cmd/committer.go new file mode 100644 index 0000000..8f8225a --- /dev/null +++ b/cmd/committer.go @@ -0,0 +1,29 @@ +package cmd + +import ( + "fmt" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/thirdweb-dev/indexer/internal/committer" + "github.com/thirdweb-dev/indexer/internal/rpc" +) + +var committerCmd = &cobra.Command{ + Use: "committer", + Short: "run committer", + Long: "published data from s3 to kafka. if block is not found in s3, it will panic", + Run: RunCommitter, +} + +func RunCommitter(cmd *cobra.Command, args []string) { + fmt.Println("running committer") + + rpc, err := rpc.Initialize() + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize RPC") + } + chainId := rpc.GetChainID() + + committer.Commit(chainId) +} diff --git a/cmd/root.go b/cmd/root.go index 0140b4c..c20201f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -429,6 +429,7 @@ func init() { rootCmd.AddCommand(orchestratorCmd) rootCmd.AddCommand(apiCmd) + rootCmd.AddCommand(committerCmd) rootCmd.AddCommand(validateAndFixCmd) rootCmd.AddCommand(validateCmd) rootCmd.AddCommand(migrateValidationCmd) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 9399161..9c56f59 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" @@ -64,9 +65,16 @@ var kafkaPublisher, _ = storage.NewKafkaPublisher(&config.KafkaConfig{ var downloadSemaphore = make(chan struct{}, 3) var tempDir = filepath.Join(os.TempDir(), "committer") var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) +var mu sync.RWMutex -// NewCommitter creates a new committer instance -func Commit(chainId *big.Int, s3Config *config.S3Config, kafkaConfig *config.KafkaConfig) error { +const max_concurrent_files = 10 + +var fileDeleted = make(chan string, max_concurrent_files) +var downloadComplete = make(chan *BlockRange, max_concurrent_files) + +// Reads data from s3 and writes to Kafka +// if block is not found in s3, it will panic +func Commit(chainId *big.Int) error { maxBlockNumber, err := clickhouseConn.GetMaxBlockNumber(chainId) if err != nil { log.Error().Err(err).Msg("Failed to get max block number from ClickHouse") @@ -85,35 +93,78 @@ func Commit(chainId *big.Int, s3Config *config.S3Config, kafkaConfig *config.Kaf return err } + // Start downloading files in background go downloadFilesInBackground(blockRanges) nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) - for _, blockRange := range blockRanges { - // use isDownloaded channel to wait. check blockRange.IsDownloaded == true else wait. + for i, blockRange := range blockRanges { + // Wait for this specific file to be downloaded + for { + mu.RLock() + if blockRange.IsDownloaded { + mu.RUnlock() + break + } + mu.RUnlock() + + // Wait for a download to complete + downloadedRange := <-downloadComplete + + // Check if this is the file we're waiting for + if downloadedRange.StartBlock.Cmp(blockRange.StartBlock) == 0 { + break + } + + // If not the right file, put it back and continue waiting + downloadComplete <- downloadedRange + } err := streamParquetFile(chainId, blockRange.LocalPath, nextCommitBlockNumber) if err != nil { log.Panic().Err(err).Msg("Failed to stream parquet file") } - // Clean up local file - // maybe publish to fileDeleted channel after file is deleted for downloadFilesInBackground to continue + + // Clean up local file and notify download goroutine if err := os.Remove(blockRange.LocalPath); err != nil { log.Warn(). Err(err). Str("file", blockRange.LocalPath). Msg("Failed to clean up local file") } + + // Notify that file was deleted + fileDeleted <- blockRange.LocalPath + + log.Info(). + Int("processed", i+1). + Int("total", len(blockRanges)). + Str("file", blockRange.S3Key). + Msg("Completed processing file") } return nil } func downloadFilesInBackground(blockRanges []BlockRange) { - // dont download all files, if there are too many files, wait for some of them to be deleted. i.e max file could downloaded should be 10. - // if there are already 10 files, just wait for file count to decrease and download more. - // use fileDeleted channel to wait. - for _, blockRange := range blockRanges { - downloadFile(&blockRange) + downloadedCount := 0 + + for i := range blockRanges { + // Wait if we've reached the maximum concurrent files + if downloadedCount >= max_concurrent_files { + <-fileDeleted // Wait for a file to be deleted + downloadedCount-- + } + + go func(index int) { + err := downloadFile(&blockRanges[index]) + if err != nil { + log.Error().Err(err).Str("file", blockRanges[index].S3Key).Msg("Failed to download file") + return + } + downloadComplete <- &blockRanges[index] + }(i) + + downloadedCount++ } } @@ -129,11 +180,6 @@ func Close() error { return os.RemoveAll(tempDir) } -// getMaxBlockNumberFromClickHouse gets the maximum block number for the chain from ClickHouse -func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { - return clickhouseConn.GetMaxBlockNumber(chainId) -} - // listS3ParquetFiles lists all parquet files in S3 with the chain prefix func listS3ParquetFiles(chainId *big.Int) ([]string, error) { prefix := fmt.Sprintf("chain_%d/", chainId.Uint64()) @@ -270,7 +316,6 @@ func downloadFile(blockRange *BlockRange) error { Str("s3_key", blockRange.S3Key). Str("local_path", localPath). Msg("Successfully downloaded file from S3") - // publish to isDownloaded channel after file is downloaded return nil } @@ -307,23 +352,51 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber for _, rg := range pFile.RowGroups() { // Use row-by-row reading to avoid loading entire row group into memory - // read the row group row by row and get each row. for each row do the following - // if block number is less than next commit block number, continue - // if block number is greater than next commit block number, return error - // if block number is equal to next commit block number, parse the block data and publish to kafka - // increment next commit block number by 1 - - for _, row := range rg.Rows() { - blockNum := row[1].Uint64() - if blockNum.Cmp(nextCommitBlockNumber) < 0 { + reader := parquet.NewRowGroupReader(rg) + + for { + // Read single row + row := make([]parquet.Row, 1) + n, err := reader.ReadRows(row) + if err == io.EOF || n == 0 { + break + } + if err != nil { + return fmt.Errorf("failed to read row: %w", err) + } + + if len(row[0]) < 8 { + continue // Not enough columns + } + + // Extract block number first to check if we need this row + blockNum := row[0][1].Uint64() // block_number is second column + blockNumber := big.NewInt(int64(blockNum)) + + // Skip if block number is less than next commit block number + if blockNumber.Cmp(nextCommitBlockNumber) < 0 { continue } - if blockNum.Cmp(nextCommitBlockNumber) > 0 { + // If block number is greater than next commit block number, exit with error + if blockNumber.Cmp(nextCommitBlockNumber) > 0 { return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) } - blockData, err := parseBlockData(row) + // Build ParquetBlockData from row + pd := ParquetBlockData{ + ChainId: row[0][0].Uint64(), + BlockNumber: blockNum, + BlockHash: row[0][2].String(), + BlockTimestamp: row[0][3].Int64(), + Block: row[0][4].ByteArray(), + Transactions: row[0][5].ByteArray(), + Logs: row[0][6].ByteArray(), + Traces: row[0][7].ByteArray(), + } + + // Parse block data + blockData, err := parseBlockData(pd) if err != nil { return fmt.Errorf("failed to parse block data: %w", err) } @@ -336,68 +409,6 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber return nil } -// readRowGroupStreamingly reads a row group row-by-row to minimize memory usage -func readRowGroupStreamingly(rg parquet.RowGroup, currentCommitBlock *big.Int, blockData *[]common.BlockData) error { - reader := parquet.NewRowGroupReader(rg) - - // Process rows one at a time instead of loading all into memory - for { - // Read single row - row := make([]parquet.Row, 1) - n, err := reader.ReadRows(row) - if err == io.EOF || n == 0 { - break - } - if err != nil { - return fmt.Errorf("failed to read row: %w", err) - } - - if len(row[0]) < 8 { - continue // Not enough columns - } - - // Extract block number first to check if we need this row - blockNum := row[0][1].Uint64() // block_number is second column - blockNumber := big.NewInt(int64(blockNum)) - - // Skip if block number is less than next commit block number - if blockNumber.Cmp(currentCommitBlock) < 0 { - continue - } - - // If block number is greater than next commit block number, exit with error - if blockNumber.Cmp(currentCommitBlock) > 0 { - return fmt.Errorf("block data not found for block number %s in S3", currentCommitBlock.String()) - } - - // Build ParquetBlockData from row - pd := ParquetBlockData{ - ChainId: row[0][0].Uint64(), - BlockNumber: blockNum, - BlockHash: row[0][2].String(), - BlockTimestamp: row[0][3].Int64(), - Block: row[0][4].ByteArray(), - Transactions: row[0][5].ByteArray(), - Logs: row[0][6].ByteArray(), - Traces: row[0][7].ByteArray(), - } - - // Parse block data - parsedBlockData, err := parseBlockData(pd) - if err != nil { - log.Warn().Err(err).Uint64("block", pd.BlockNumber).Msg("Failed to parse block data") - continue - } - - *blockData = append(*blockData, parsedBlockData) - - // Increment next commit block number by 1 - currentCommitBlock.Add(currentCommitBlock, big.NewInt(1)) - } - - return nil -} - // parseBlockData converts ParquetBlockData to common.BlockData func parseBlockData(pd ParquetBlockData) (common.BlockData, error) { // Unmarshal JSON data From 72a38246fe2057ed664282b6273e374b7fc05050 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 19:28:14 +0545 Subject: [PATCH 04/21] commit blocks in stream --- cmd/committer.go | 2 +- configs/config.go | 40 ++-- go.mod | 2 + go.sum | 4 + internal/committer/committer.go | 313 ++++++++++++++++++++++++++------ 5 files changed, 287 insertions(+), 74 deletions(-) diff --git a/cmd/committer.go b/cmd/committer.go index 8f8225a..170e1db 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -18,12 +18,12 @@ var committerCmd = &cobra.Command{ func RunCommitter(cmd *cobra.Command, args []string) { fmt.Println("running committer") - rpc, err := rpc.Initialize() if err != nil { log.Fatal().Err(err).Msg("Failed to initialize RPC") } chainId := rpc.GetChainID() + committer.Init(chainId) committer.Commit(chainId) } diff --git a/configs/config.go b/configs/config.go index 4ec3b40..4f84043 100644 --- a/configs/config.go +++ b/configs/config.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/caarlos0/env" + "github.com/joho/godotenv" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) @@ -271,24 +273,36 @@ type Config struct { Validation ValidationConfig `mapstructure:"validation"` Migrator MigratorConfig `mapstructure:"migrator"` - CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` - CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` - CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` - CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` - CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` - CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` - CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` - CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` + CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` + CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` + CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` + CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` + CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` + CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"` + CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` + CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` + CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` + CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"` - StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` - StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` - StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` - StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` + StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` + StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` + StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` + StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` + S3MaxParallelFileDownload int `env:"S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` } var Cfg Config func LoadConfig(cfgFile string) error { + err := godotenv.Load() + if err != nil { + log.Info().Msg("No .env file found") + } + err = env.Parse(&Cfg) + if err != nil { + panic(err) + } + if cfgFile != "" { viper.SetConfigFile(cfgFile) if err := viper.ReadInConfig(); err != nil { @@ -315,7 +329,7 @@ func LoadConfig(cfgFile string) error { viper.AutomaticEnv() - err := viper.Unmarshal(&Cfg) + err = viper.Unmarshal(&Cfg) if err != nil { return fmt.Errorf("error unmarshalling config: %v", err) } diff --git a/go.mod b/go.mod index 9effbdb..974b741 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/bits-and-blooms/bitset v1.20.0 // indirect github.com/bytedance/sonic v1.12.6 // indirect github.com/bytedance/sonic/loader v0.2.1 // indirect + github.com/caarlos0/env v3.5.0+incompatible // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect @@ -97,6 +98,7 @@ require ( github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/go.sum b/go.sum index ea02463..923b42c 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/bytedance/sonic v1.12.6/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKz github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iCjjJv3+E= github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/caarlos0/env v3.5.0+incompatible h1:Yy0UN8o9Wtr/jGHZDpCBLpNrzcFLLM2yixi/rBrKyJs= +github.com/caarlos0/env v3.5.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= @@ -212,6 +214,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 9c56f59..d8de399 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -2,6 +2,7 @@ package committer import ( "context" + "crypto/tls" "encoding/json" "fmt" "io" @@ -14,6 +15,7 @@ import ( "strings" "sync" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -33,103 +35,190 @@ type BlockRange struct { LocalPath string `json:"local_path,omitempty"` } -var clickhouseConn, _ = storage.NewClickHouseConnector(&config.ClickhouseConfig{ - Host: config.Cfg.CommitterClickhouseHost, - Port: config.Cfg.CommitterClickhousePort, - Username: config.Cfg.CommitterClickhouseUsername, - Password: config.Cfg.CommitterClickhousePassword, - Database: config.Cfg.CommitterClickhouseDatabase, -}) - -var awsCfg, _ = awsconfig.LoadDefaultConfig(context.Background(), - awsconfig.WithCredentialsProvider(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { - return aws.Credentials{ - AccessKeyID: config.Cfg.StagingS3AccessKeyID, - SecretAccessKey: config.Cfg.StagingS3SecretAccessKey, - }, nil - })), - awsconfig.WithRegion(config.Cfg.StagingS3Region), -) - -var s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) { - o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com") -}) - -var kafkaPublisher, _ = storage.NewKafkaPublisher(&config.KafkaConfig{ - Brokers: config.Cfg.CommitterKafkaBrokers, - Username: config.Cfg.CommitterKafkaUsername, - Password: config.Cfg.CommitterKafkaPassword, - EnableTLS: true, -}) +// ParquetBlockData represents the block data structure in parquet files +type ParquetBlockData struct { + ChainId uint64 `parquet:"chain_id"` + BlockNumber uint64 `parquet:"block_number"` + BlockHash string `parquet:"block_hash"` + BlockTimestamp int64 `parquet:"block_timestamp"` + Block []byte `parquet:"block_json"` + Transactions []byte `parquet:"transactions_json"` + Logs []byte `parquet:"logs_json"` + Traces []byte `parquet:"traces_json"` +} +var clickhouseConn clickhouse.Conn +var s3Client *s3.Client +var kafkaPublisher *storage.KafkaPublisher var downloadSemaphore = make(chan struct{}, 3) var tempDir = filepath.Join(os.TempDir(), "committer") var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) var mu sync.RWMutex +var fileDeleted chan string +var downloadComplete chan *BlockRange -const max_concurrent_files = 10 +func Init(chainId *big.Int) { + tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64())) + fileDeleted = make(chan string, config.Cfg.S3MaxParallelFileDownload) + downloadComplete = make(chan *BlockRange, config.Cfg.S3MaxParallelFileDownload) -var fileDeleted = make(chan string, max_concurrent_files) -var downloadComplete = make(chan *BlockRange, max_concurrent_files) + initClickHouse() + initS3() + initKafka() +} + +func initClickHouse() { + var err error + clickhouseConn, err = clickhouse.Open(&clickhouse.Options{ + Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.CommitterClickhouseHost, config.Cfg.CommitterClickhousePort)}, + Protocol: clickhouse.Native, + TLS: func() *tls.Config { + if config.Cfg.CommitterClickhouseEnableTLS { + return &tls.Config{} + } + return nil + }(), + Auth: clickhouse.Auth{ + Username: config.Cfg.CommitterClickhouseUsername, + Password: config.Cfg.CommitterClickhousePassword, + Database: config.Cfg.CommitterClickhouseDatabase, + }, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + }) + if err != nil { + log.Fatal().Err(err).Msg("Failed to connect to ClickHouse") + } +} + +func initS3() { + awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithCredentialsProvider(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: config.Cfg.StagingS3AccessKeyID, + SecretAccessKey: config.Cfg.StagingS3SecretAccessKey, + }, nil + })), + awsconfig.WithRegion(config.Cfg.StagingS3Region), + ) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize AWS config") + } + + s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com") + }) +} + +func initKafka() { + var err error + kafkaPublisher, err = storage.NewKafkaPublisher(&config.KafkaConfig{ + Brokers: config.Cfg.CommitterKafkaBrokers, + Username: config.Cfg.CommitterKafkaUsername, + Password: config.Cfg.CommitterKafkaPassword, + EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + }) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher") + } +} // Reads data from s3 and writes to Kafka // if block is not found in s3, it will panic func Commit(chainId *big.Int) error { - maxBlockNumber, err := clickhouseConn.GetMaxBlockNumber(chainId) + log.Info().Str("chain_id", chainId.String()).Msg("Starting commit process") + + maxBlockNumber, err := getMaxBlockNumberFromClickHouse(chainId) if err != nil { log.Error().Err(err).Msg("Failed to get max block number from ClickHouse") return err } + log.Info().Str("max_block_number", maxBlockNumber.String()).Msg("Retrieved max block number from ClickHouse") files, err := listS3ParquetFiles(chainId) if err != nil { log.Error().Err(err).Msg("Failed to list S3 parquet files") return err } + log.Info().Int("total_files", len(files)).Msg("Listed S3 parquet files") blockRanges, err := filterAndSortBlockRanges(files, maxBlockNumber) if err != nil { log.Error().Err(err).Msg("Failed to filter and sort block ranges") return err } + log.Info().Int("filtered_ranges", len(blockRanges)).Msg("Filtered and sorted block ranges") // Start downloading files in background + log.Info().Msg("Starting background file downloads") go downloadFilesInBackground(blockRanges) nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) + log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting sequential processing") + for i, blockRange := range blockRanges { + log.Info(). + Int("processing", i+1). + Int("total", len(blockRanges)). + Str("file", blockRange.S3Key). + Str("start_block", blockRange.StartBlock.String()). + Str("end_block", blockRange.EndBlock.String()). + Msg("Processing file") + // Wait for this specific file to be downloaded for { mu.RLock() if blockRange.IsDownloaded { mu.RUnlock() + log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, proceeding") break } mu.RUnlock() + log.Debug().Str("file", blockRange.S3Key).Msg("Waiting for file download to complete") // Wait for a download to complete downloadedRange := <-downloadComplete // Check if this is the file we're waiting for if downloadedRange.StartBlock.Cmp(blockRange.StartBlock) == 0 { + log.Debug().Str("file", downloadedRange.S3Key).Msg("Received correct file, updating blockRange") + // Update the blockRange with the downloaded file's information + mu.Lock() + blockRange.LocalPath = downloadedRange.LocalPath + blockRange.IsDownloaded = downloadedRange.IsDownloaded + mu.Unlock() break } + log.Debug(). + Str("expected_file", blockRange.S3Key). + Str("received_file", downloadedRange.S3Key). + Msg("Received different file, putting back and waiting") // If not the right file, put it back and continue waiting downloadComplete <- downloadedRange } - err := streamParquetFile(chainId, blockRange.LocalPath, nextCommitBlockNumber) + log.Info(). + Str("file", blockRange.LocalPath). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Starting to stream parquet file") + + err := streamParquetFile(blockRange.LocalPath, nextCommitBlockNumber) if err != nil { log.Panic().Err(err).Msg("Failed to stream parquet file") } + log.Info().Str("file", blockRange.LocalPath).Msg("Successfully streamed parquet file") + // Clean up local file and notify download goroutine if err := os.Remove(blockRange.LocalPath); err != nil { log.Warn(). Err(err). Str("file", blockRange.LocalPath). Msg("Failed to clean up local file") + } else { + log.Debug().Str("file", blockRange.LocalPath).Msg("Cleaned up local file") } // Notify that file was deleted @@ -145,22 +234,64 @@ func Commit(chainId *big.Int) error { return nil } +func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { + // Use toString() to force ClickHouse to return a string instead of UInt256 + query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64()) + rows, err := clickhouseConn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + if !rows.Next() { + return big.NewInt(0), nil + } + + var maxBlockNumberStr string + if err := rows.Scan(&maxBlockNumberStr); err != nil { + return nil, err + } + + // Convert string to big.Int to handle UInt256 values + maxBlockNumber, ok := new(big.Int).SetString(maxBlockNumberStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", maxBlockNumberStr) + } + + return maxBlockNumber, nil +} + func downloadFilesInBackground(blockRanges []BlockRange) { + maxConcurrentFiles := config.Cfg.S3MaxParallelFileDownload + log.Info(). + Int("total_files", len(blockRanges)). + Int("max_concurrent", maxConcurrentFiles). + Msg("Starting background downloads") downloadedCount := 0 for i := range blockRanges { // Wait if we've reached the maximum concurrent files - if downloadedCount >= max_concurrent_files { + if downloadedCount >= maxConcurrentFiles { + log.Debug().Int("downloaded_count", downloadedCount).Msg("Reached max concurrent files, waiting for deletion") <-fileDeleted // Wait for a file to be deleted downloadedCount-- + log.Debug().Int("downloaded_count", downloadedCount).Msg("File deleted, continuing downloads") } + log.Debug(). + Int("index", i). + Str("file", blockRanges[i].S3Key). + Int("downloaded_count", downloadedCount). + Msg("Starting download goroutine") + go func(index int) { + log.Debug().Str("file", blockRanges[index].S3Key).Msg("Download goroutine started") err := downloadFile(&blockRanges[index]) if err != nil { log.Error().Err(err).Str("file", blockRanges[index].S3Key).Msg("Failed to download file") return } + log.Debug().Str("file", blockRanges[index].S3Key).Msg("Download completed, sending to channel") downloadComplete <- &blockRanges[index] }(i) @@ -168,18 +299,6 @@ func downloadFilesInBackground(blockRanges []BlockRange) { } } -// Close cleans up resources -func Close() error { - if clickhouseConn != nil { - clickhouseConn.Close() - } - if kafkaPublisher != nil { - kafkaPublisher.Close() - } - // Clean up temp directory - return os.RemoveAll(tempDir) -} - // listS3ParquetFiles lists all parquet files in S3 with the chain prefix func listS3ParquetFiles(chainId *big.Int) ([]string, error) { prefix := fmt.Sprintf("chain_%d/", chainId.Uint64()) @@ -275,14 +394,33 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR // downloadFile downloads a file from S3 and saves it to local storage func downloadFile(blockRange *BlockRange) error { + log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") + // Acquire semaphore to limit concurrent downloads downloadSemaphore <- struct{}{} defer func() { <-downloadSemaphore }() + log.Debug().Str("file", blockRange.S3Key).Msg("Acquired download semaphore") + + // Ensure temp directory exists + if err := os.MkdirAll(tempDir, 0755); err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + log.Debug().Str("temp_dir", tempDir).Msg("Ensured temp directory exists") + // Generate local file path localPath := filepath.Join(tempDir, filepath.Base(blockRange.S3Key)) + log.Debug(). + Str("s3_key", blockRange.S3Key). + Str("local_path", localPath). + Msg("Generated local file path") // Download from S3 + log.Debug(). + Str("bucket", config.Cfg.StagingS3Bucket). + Str("key", blockRange.S3Key). + Msg("Starting S3 download") + result, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(config.Cfg.StagingS3Bucket), Key: aws.String(blockRange.S3Key), @@ -291,6 +429,7 @@ func downloadFile(blockRange *BlockRange) error { return fmt.Errorf("failed to download file from S3: %w", err) } defer result.Body.Close() + log.Debug().Str("file", blockRange.S3Key).Msg("S3 download initiated successfully") // Create local file file, err := os.Create(localPath) @@ -298,13 +437,16 @@ func downloadFile(blockRange *BlockRange) error { return fmt.Errorf("failed to create local file: %w", err) } defer file.Close() + log.Debug().Str("local_path", localPath).Msg("Created local file") // Stream download directly to file without keeping in memory + log.Debug().Str("file", blockRange.S3Key).Msg("Starting file stream to disk") _, err = file.ReadFrom(result.Body) if err != nil { os.Remove(localPath) // Clean up on error return fmt.Errorf("failed to write file: %w", err) } + log.Debug().Str("file", blockRange.S3Key).Msg("File stream completed successfully") // Update block range with local path and downloaded status mu.Lock() @@ -320,20 +462,13 @@ func downloadFile(blockRange *BlockRange) error { return nil } -// ParquetBlockData represents the block data structure in parquet files -type ParquetBlockData struct { - ChainId uint64 `parquet:"chain_id"` - BlockNumber uint64 `parquet:"block_number"` - BlockHash string `parquet:"block_hash"` - BlockTimestamp int64 `parquet:"block_timestamp"` - Block []byte `parquet:"block_json"` - Transactions []byte `parquet:"transactions_json"` - Logs []byte `parquet:"logs_json"` - Traces []byte `parquet:"traces_json"` -} - // streamParquetFile streams a parquet file row by row and processes blocks -func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber *big.Int) error { +func streamParquetFile(filePath string, nextCommitBlockNumber *big.Int) error { + log.Debug(). + Str("file", filePath). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Opening parquet file for streaming") + file, err := os.Open(filePath) if err != nil { return fmt.Errorf("failed to open parquet file: %w", err) @@ -344,15 +479,31 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber if err != nil { return fmt.Errorf("failed to get file stats: %w", err) } + log.Debug(). + Str("file", filePath). + Int64("size_bytes", stat.Size()). + Msg("File stats retrieved") pFile, err := parquet.OpenFile(file, stat.Size()) if err != nil { return fmt.Errorf("failed to open parquet file: %w", err) } + log.Debug(). + Str("file", filePath). + Int("row_groups", len(pFile.RowGroups())). + Msg("Parquet file opened successfully") + + processedBlocks := 0 + for rgIndex, rg := range pFile.RowGroups() { + log.Debug(). + Str("file", filePath). + Int("row_group", rgIndex). + Int64("num_rows", rg.NumRows()). + Msg("Processing row group") - for _, rg := range pFile.RowGroups() { // Use row-by-row reading to avoid loading entire row group into memory reader := parquet.NewRowGroupReader(rg) + rowGroupBlocks := 0 for { // Read single row @@ -375,6 +526,11 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber // Skip if block number is less than next commit block number if blockNumber.Cmp(nextCommitBlockNumber) < 0 { + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Skipping block - already processed") continue } @@ -383,6 +539,12 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) } + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Processing block") + // Build ParquetBlockData from row pd := ParquetBlockData{ ChainId: row[0][0].Uint64(), @@ -401,11 +563,30 @@ func streamParquetFile(chainId *big.Int, filePath string, nextCommitBlockNumber return fmt.Errorf("failed to parse block data: %w", err) } + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Msg("Publishing block data to Kafka") + kafkaPublisher.PublishBlockData([]common.BlockData{blockData}) nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) + processedBlocks++ + rowGroupBlocks++ } + + log.Debug(). + Str("file", filePath). + Int("row_group", rgIndex). + Int("blocks_processed", rowGroupBlocks). + Msg("Completed row group") } + log.Info(). + Str("file", filePath). + Int("total_blocks_processed", processedBlocks). + Str("final_commit_block", nextCommitBlockNumber.String()). + Msg("Completed parquet file processing") + return nil } @@ -445,3 +626,15 @@ func parseBlockData(pd ParquetBlockData) (common.BlockData, error) { Traces: traces, }, nil } + +// Close cleans up resources +func Close() error { + if clickhouseConn != nil { + clickhouseConn.Close() + } + if kafkaPublisher != nil { + kafkaPublisher.Close() + } + // Clean up temp directory + return os.RemoveAll(tempDir) +} From cc4c7be523d44cb98022609afe541106bd35235b Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 20:10:05 +0545 Subject: [PATCH 05/21] minor change --- internal/committer/committer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index d8de399..39b181c 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -206,7 +206,7 @@ func Commit(chainId *big.Int) error { err := streamParquetFile(blockRange.LocalPath, nextCommitBlockNumber) if err != nil { - log.Panic().Err(err).Msg("Failed to stream parquet file") + log.Panic().Err(err).Str("S3Key", blockRange.S3Key).Msg("Failed to stream parquet file") } log.Info().Str("file", blockRange.LocalPath).Msg("Successfully streamed parquet file") @@ -509,12 +509,15 @@ func streamParquetFile(filePath string, nextCommitBlockNumber *big.Int) error { // Read single row row := make([]parquet.Row, 1) n, err := reader.ReadRows(row) - if err == io.EOF || n == 0 { + if err == io.EOF { break } if err != nil { return fmt.Errorf("failed to read row: %w", err) } + if n == 0 { + continue // No rows read in this call, try again + } if len(row[0]) < 8 { continue // Not enough columns From af465ef36644da281b73332bc894f47265f81cbd Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 20:21:13 +0545 Subject: [PATCH 06/21] minor change --- internal/committer/committer.go | 128 +++++++++++++++++++------------- 1 file changed, 75 insertions(+), 53 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 39b181c..20c11e8 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -509,72 +509,94 @@ func streamParquetFile(filePath string, nextCommitBlockNumber *big.Int) error { // Read single row row := make([]parquet.Row, 1) n, err := reader.ReadRows(row) - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to read row: %w", err) - } - if n == 0 { - continue // No rows read in this call, try again - } - if len(row[0]) < 8 { - continue // Not enough columns - } + // Process the row if we successfully read it, even if EOF occurred + if n > 0 { + if len(row[0]) < 8 { + if err == io.EOF { + break // EOF and no valid row, we're done + } + continue // Not enough columns, try again + } - // Extract block number first to check if we need this row - blockNum := row[0][1].Uint64() // block_number is second column - blockNumber := big.NewInt(int64(blockNum)) + // Extract block number first to check if we need this row + blockNum := row[0][1].Uint64() // block_number is second column + blockNumber := big.NewInt(int64(blockNum)) - // Skip if block number is less than next commit block number - if blockNumber.Cmp(nextCommitBlockNumber) < 0 { log.Debug(). Str("file", filePath). Uint64("block_number", blockNum). Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Skipping block - already processed") - continue - } + Msg("Read block from parquet file") + + // Skip if block number is less than next commit block number + if blockNumber.Cmp(nextCommitBlockNumber) < 0 { + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Skipping block - already processed") + if err == io.EOF { + break // EOF after processing, we're done + } + continue + } + + // If block number is greater than next commit block number, exit with error + if blockNumber.Cmp(nextCommitBlockNumber) > 0 { + log.Error(). + Str("file", filePath). + Uint64("block_number", blockNum). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Found block number greater than expected - missing block in sequence") + return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) + } - // If block number is greater than next commit block number, exit with error - if blockNumber.Cmp(nextCommitBlockNumber) > 0 { - return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) - } + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Processing block") + + // Build ParquetBlockData from row + pd := ParquetBlockData{ + ChainId: row[0][0].Uint64(), + BlockNumber: blockNum, + BlockHash: row[0][2].String(), + BlockTimestamp: row[0][3].Int64(), + Block: row[0][4].ByteArray(), + Transactions: row[0][5].ByteArray(), + Logs: row[0][6].ByteArray(), + Traces: row[0][7].ByteArray(), + } + + // Parse block data + blockData, err := parseBlockData(pd) + if err != nil { + return fmt.Errorf("failed to parse block data: %w", err) + } - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Processing block") - - // Build ParquetBlockData from row - pd := ParquetBlockData{ - ChainId: row[0][0].Uint64(), - BlockNumber: blockNum, - BlockHash: row[0][2].String(), - BlockTimestamp: row[0][3].Int64(), - Block: row[0][4].ByteArray(), - Transactions: row[0][5].ByteArray(), - Logs: row[0][6].ByteArray(), - Traces: row[0][7].ByteArray(), + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Msg("Publishing block data to Kafka") + + kafkaPublisher.PublishBlockData([]common.BlockData{blockData}) + nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) + processedBlocks++ + rowGroupBlocks++ } - // Parse block data - blockData, err := parseBlockData(pd) + // Handle EOF and other errors + if err == io.EOF { + break + } if err != nil { - return fmt.Errorf("failed to parse block data: %w", err) + return fmt.Errorf("failed to read row: %w", err) + } + if n == 0 { + continue // No rows read in this call, try again } - - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Msg("Publishing block data to Kafka") - - kafkaPublisher.PublishBlockData([]common.BlockData{blockData}) - nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) - processedBlocks++ - rowGroupBlocks++ } log.Debug(). From ba8846e7d3b72532739ec51811d16bb5b63a4100 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 20:41:52 +0545 Subject: [PATCH 07/21] publish block range to kafka --- internal/committer/committer.go | 155 +++++++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 12 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 20c11e8..d103adf 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -28,11 +28,12 @@ import ( // BlockRange represents a range of blocks in an S3 parquet file type BlockRange struct { - StartBlock *big.Int `json:"start_block"` - EndBlock *big.Int `json:"end_block"` - S3Key string `json:"s3_key"` - IsDownloaded bool `json:"is_downloaded"` - LocalPath string `json:"local_path,omitempty"` + StartBlock *big.Int `json:"start_block"` + EndBlock *big.Int `json:"end_block"` + S3Key string `json:"s3_key"` + IsDownloaded bool `json:"is_downloaded"` + LocalPath string `json:"local_path,omitempty"` + BlockData []common.BlockData `json:"block_data,omitempty"` } // ParquetBlockData represents the block data structure in parquet files @@ -200,16 +201,56 @@ func Commit(chainId *big.Int) error { } log.Info(). - Str("file", blockRange.LocalPath). + Str("file", blockRange.S3Key). Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Starting to stream parquet file") + Int("total_blocks", len(blockRange.BlockData)). + Msg("Starting to process block data") - err := streamParquetFile(blockRange.LocalPath, nextCommitBlockNumber) - if err != nil { - log.Panic().Err(err).Str("S3Key", blockRange.S3Key).Msg("Failed to stream parquet file") + // Process block data sequentially + startIndex := 0 + for i, blockData := range blockRange.BlockData { + blockNumber := blockData.Block.Number + + // Skip if block number is less than next commit block number + if blockNumber.Cmp(nextCommitBlockNumber) < 0 { + log.Debug(). + Str("file", blockRange.S3Key). + Uint64("block_number", blockData.Block.Number.Uint64()). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Skipping block - already processed") + startIndex = i + 1 + continue + } + + // If block number is greater than next commit block number, exit with error + if blockNumber.Cmp(nextCommitBlockNumber) > 0 { + log.Error(). + Str("file", blockRange.S3Key). + Uint64("block_number", blockData.Block.Number.Uint64()). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("Found block number greater than expected - missing block in sequence") + log.Panic().Msg("Block sequence mismatch") + } + nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) } - log.Info().Str("file", blockRange.LocalPath).Msg("Successfully streamed parquet file") + log.Info(). + Str("file", blockRange.S3Key). + Int("blocks_processed", len(blockRange.BlockData[startIndex:])). + Int("start_index", startIndex). + Uint64("start_block", blockRange.BlockData[startIndex].Block.Number.Uint64()). + Uint64("end_block", blockRange.BlockData[len(blockRange.BlockData)-1].Block.Number.Uint64()). + Str("final_commit_block", nextCommitBlockNumber.String()). + Msg("Publishing block range data to Kafka") + + // publish the entire slice to kafka + kafkaPublisher.PublishBlockData(blockRange.BlockData[startIndex:]) + + log.Info(). + Str("file", blockRange.S3Key). + Int("blocks_processed", len(blockRange.BlockData)). + Str("final_commit_block", nextCommitBlockNumber.String()). + Msg("Successfully processed all block data") // Clean up local file and notify download goroutine if err := os.Remove(blockRange.LocalPath); err != nil { @@ -448,10 +489,23 @@ func downloadFile(blockRange *BlockRange) error { } log.Debug().Str("file", blockRange.S3Key).Msg("File stream completed successfully") - // Update block range with local path and downloaded status + // Parse parquet file and extract block data + log.Debug().Str("file", blockRange.S3Key).Msg("Starting parquet parsing") + blockData, err := parseParquetFile(localPath) + if err != nil { + os.Remove(localPath) // Clean up on error + return fmt.Errorf("failed to parse parquet file: %w", err) + } + log.Debug(). + Str("file", blockRange.S3Key). + Int("blocks_parsed", len(blockData)). + Msg("Successfully parsed parquet file") + + // Update block range with local path, downloaded status, and block data mu.Lock() blockRange.LocalPath = localPath blockRange.IsDownloaded = true + blockRange.BlockData = blockData mu.Unlock() log.Info(). @@ -462,6 +516,83 @@ func downloadFile(blockRange *BlockRange) error { return nil } +// parseParquetFile parses a parquet file and returns all block data +func parseParquetFile(filePath string) ([]common.BlockData, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open parquet file: %w", err) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return nil, fmt.Errorf("failed to get file stats: %w", err) + } + + pFile, err := parquet.OpenFile(file, stat.Size()) + if err != nil { + return nil, fmt.Errorf("failed to open parquet file: %w", err) + } + + var allBlockData []common.BlockData + + for _, rg := range pFile.RowGroups() { + reader := parquet.NewRowGroupReader(rg) + + for { + row := make([]parquet.Row, 1) + n, err := reader.ReadRows(row) + + // Process the row if we successfully read it, even if EOF occurred + if n > 0 { + if len(row[0]) < 8 { + if err == io.EOF { + break // EOF and no valid row, we're done + } + continue // Not enough columns, try again + } + + // Extract block number + blockNum := row[0][1].Uint64() + + // Build ParquetBlockData from row + pd := ParquetBlockData{ + ChainId: row[0][0].Uint64(), + BlockNumber: blockNum, + BlockHash: row[0][2].String(), + BlockTimestamp: row[0][3].Int64(), + Block: row[0][4].ByteArray(), + Transactions: row[0][5].ByteArray(), + Logs: row[0][6].ByteArray(), + Traces: row[0][7].ByteArray(), + } + + // Parse block data + blockData, err := parseBlockData(pd) + if err != nil { + log.Warn().Err(err).Uint64("block", blockNum).Msg("Failed to parse block data, skipping") + continue + } + + allBlockData = append(allBlockData, blockData) + } + + // Handle EOF and other errors + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("failed to read row: %w", err) + } + if n == 0 { + continue // No rows read in this call, try again + } + } + } + + return allBlockData, nil +} + // streamParquetFile streams a parquet file row by row and processes blocks func streamParquetFile(filePath string, nextCommitBlockNumber *big.Int) error { log.Debug(). From 5dd4004cf690bb0bf322d9dda56becc2655b088f Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 20:48:19 +0545 Subject: [PATCH 08/21] clear memory array --- internal/committer/committer.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index d103adf..deb158f 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -248,10 +248,21 @@ func Commit(chainId *big.Int) error { log.Info(). Str("file", blockRange.S3Key). - Int("blocks_processed", len(blockRange.BlockData)). + Int("blocks_processed", len(blockRange.BlockData[startIndex:])). + Uint64("start_block", blockRange.BlockData[startIndex].Block.Number.Uint64()). + Uint64("end_block", blockRange.BlockData[len(blockRange.BlockData)-1].Block.Number.Uint64()). Str("final_commit_block", nextCommitBlockNumber.String()). Msg("Successfully processed all block data") + // Clear block data from memory to free up space + mu.Lock() + blockRange.BlockData = nil + mu.Unlock() + + log.Debug(). + Str("file", blockRange.S3Key). + Msg("Cleared block data from memory") + // Clean up local file and notify download goroutine if err := os.Remove(blockRange.LocalPath); err != nil { log.Warn(). From 37198248f2a1d82224be7499aac4e3164e860c6e Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 20:59:25 +0545 Subject: [PATCH 09/21] debug logs --- configs/config.go | 31 ++++++------ internal/committer/committer.go | 83 ++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/configs/config.go b/configs/config.go index 4f84043..0d75f7a 100644 --- a/configs/config.go +++ b/configs/config.go @@ -273,22 +273,21 @@ type Config struct { Validation ValidationConfig `mapstructure:"validation"` Migrator MigratorConfig `mapstructure:"migrator"` - CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` - CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` - CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` - CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` - CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` - CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"` - CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` - CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` - CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` - CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"` - - StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` - StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` - StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` - StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` - S3MaxParallelFileDownload int `env:"S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` + CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` + CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` + CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` + CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` + CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` + CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"` + CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` + CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` + CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` + CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"` + StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` + StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` + StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` + StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` + StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` } var Cfg Config diff --git a/internal/committer/committer.go b/internal/committer/committer.go index deb158f..90a0f1b 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -60,8 +60,8 @@ var downloadComplete chan *BlockRange func Init(chainId *big.Int) { tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64())) - fileDeleted = make(chan string, config.Cfg.S3MaxParallelFileDownload) - downloadComplete = make(chan *BlockRange, config.Cfg.S3MaxParallelFileDownload) + fileDeleted = make(chan string, config.Cfg.StagingS3MaxParallelFileDownload) + downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload) initClickHouse() initS3() @@ -206,6 +206,14 @@ func Commit(chainId *big.Int) error { Int("total_blocks", len(blockRange.BlockData)). Msg("Starting to process block data") + // Check if block data is empty + if len(blockRange.BlockData) == 0 { + log.Warn(). + Str("file", blockRange.S3Key). + Msg("No block data found in parquet file, skipping") + return nil + } + // Process block data sequentially startIndex := 0 for i, blockData := range blockRange.BlockData { @@ -234,23 +242,32 @@ func Commit(chainId *big.Int) error { nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) } + // Check if we have any blocks to process after filtering + if startIndex >= len(blockRange.BlockData) { + log.Info(). + Str("file", blockRange.S3Key). + Msg("All blocks already processed, skipping Kafka publish") + return nil + } + + blocksToProcess := blockRange.BlockData[startIndex:] log.Info(). Str("file", blockRange.S3Key). - Int("blocks_processed", len(blockRange.BlockData[startIndex:])). + Int("blocks_processed", len(blocksToProcess)). Int("start_index", startIndex). - Uint64("start_block", blockRange.BlockData[startIndex].Block.Number.Uint64()). - Uint64("end_block", blockRange.BlockData[len(blockRange.BlockData)-1].Block.Number.Uint64()). + Uint64("start_block", blocksToProcess[0].Block.Number.Uint64()). + Uint64("end_block", blocksToProcess[len(blocksToProcess)-1].Block.Number.Uint64()). Str("final_commit_block", nextCommitBlockNumber.String()). Msg("Publishing block range data to Kafka") // publish the entire slice to kafka - kafkaPublisher.PublishBlockData(blockRange.BlockData[startIndex:]) + kafkaPublisher.PublishBlockData(blocksToProcess) log.Info(). Str("file", blockRange.S3Key). - Int("blocks_processed", len(blockRange.BlockData[startIndex:])). - Uint64("start_block", blockRange.BlockData[startIndex].Block.Number.Uint64()). - Uint64("end_block", blockRange.BlockData[len(blockRange.BlockData)-1].Block.Number.Uint64()). + Int("blocks_processed", len(blocksToProcess)). + Uint64("start_block", blocksToProcess[0].Block.Number.Uint64()). + Uint64("end_block", blocksToProcess[len(blocksToProcess)-1].Block.Number.Uint64()). Str("final_commit_block", nextCommitBlockNumber.String()). Msg("Successfully processed all block data") @@ -314,7 +331,7 @@ func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { } func downloadFilesInBackground(blockRanges []BlockRange) { - maxConcurrentFiles := config.Cfg.S3MaxParallelFileDownload + maxConcurrentFiles := config.Cfg.StagingS3MaxParallelFileDownload log.Info(). Int("total_files", len(blockRanges)). Int("max_concurrent", maxConcurrentFiles). @@ -546,6 +563,14 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { } var allBlockData []common.BlockData + totalRowsRead := 0 + validRowsRead := 0 + parseErrors := 0 + + log.Debug(). + Str("file", filePath). + Int("row_groups", len(pFile.RowGroups())). + Msg("Starting parquet file parsing") for _, rg := range pFile.RowGroups() { reader := parquet.NewRowGroupReader(rg) @@ -556,16 +581,28 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { // Process the row if we successfully read it, even if EOF occurred if n > 0 { + totalRowsRead++ if len(row[0]) < 8 { + log.Debug(). + Str("file", filePath). + Int("columns", len(row[0])). + Msg("Row has insufficient columns, skipping") if err == io.EOF { break // EOF and no valid row, we're done } continue // Not enough columns, try again } + validRowsRead++ + // Extract block number blockNum := row[0][1].Uint64() + log.Debug(). + Str("file", filePath). + Uint64("block_number", blockNum). + Msg("Processing parquet row") + // Build ParquetBlockData from row pd := ParquetBlockData{ ChainId: row[0][0].Uint64(), @@ -581,7 +618,12 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { // Parse block data blockData, err := parseBlockData(pd) if err != nil { - log.Warn().Err(err).Uint64("block", blockNum).Msg("Failed to parse block data, skipping") + parseErrors++ + log.Warn(). + Err(err). + Str("file", filePath). + Uint64("block", blockNum). + Msg("Failed to parse block data, skipping") continue } @@ -601,6 +643,25 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { } } + log.Debug(). + Str("file", filePath). + Int("total_rows_read", totalRowsRead). + Int("valid_rows_read", validRowsRead). + Int("parse_errors", parseErrors). + Int("successful_blocks", len(allBlockData)). + Msg("Completed parquet file parsing") + + // Check if we have any successful blocks + if len(allBlockData) == 0 && validRowsRead > 0 { + return nil, fmt.Errorf("parsed %d valid rows but all failed to convert to BlockData - check parseBlockData function", validRowsRead) + } + + if len(allBlockData) == 0 && totalRowsRead == 0 { + log.Warn(). + Str("file", filePath). + Msg("No rows found in parquet file") + } + return allBlockData, nil } From 00ce56561af89930322de8c4f7aa3d33320493cb Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 21:08:34 +0545 Subject: [PATCH 10/21] minor change --- internal/committer/committer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 90a0f1b..005a8f9 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -188,6 +188,7 @@ func Commit(chainId *big.Int) error { mu.Lock() blockRange.LocalPath = downloadedRange.LocalPath blockRange.IsDownloaded = downloadedRange.IsDownloaded + blockRange.BlockData = downloadedRange.BlockData mu.Unlock() break } From 12e2005807637d3a4d8d6131991e46e014523c70 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 21:31:59 +0545 Subject: [PATCH 11/21] clear blockrange --- internal/committer/committer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 005a8f9..277eadf 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -189,6 +189,8 @@ func Commit(chainId *big.Int) error { blockRange.LocalPath = downloadedRange.LocalPath blockRange.IsDownloaded = downloadedRange.IsDownloaded blockRange.BlockData = downloadedRange.BlockData + // Clear the downloadedRange's BlockData to free memory immediately + downloadedRange.BlockData = nil mu.Unlock() break } @@ -274,11 +276,13 @@ func Commit(chainId *big.Int) error { // Clear block data from memory to free up space mu.Lock() + blockDataCount := len(blockRange.BlockData) blockRange.BlockData = nil mu.Unlock() log.Debug(). Str("file", blockRange.S3Key). + Int("blocks_cleared", blockDataCount). Msg("Cleared block data from memory") // Clean up local file and notify download goroutine From 229f4d03c82f227f254e81f2922bc9315250c36c Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 21:48:29 +0545 Subject: [PATCH 12/21] dedupe kafka message --- internal/storage/kafka_publisher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 9a05366..1d8aa37 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -35,7 +35,7 @@ type PublishableMessagePayload struct { } type PublishableMessageBlockData struct { - common.BlockData + *common.BlockData ChainId uint64 `json:"chain_id"` IsDeleted int8 `json:"is_deleted"` InsertTimestamp time.Time `json:"insert_timestamp"` @@ -258,7 +258,7 @@ func (p *KafkaPublisher) createBlockDataMessage(block common.BlockData, isDelete timestamp := time.Now() data := PublishableMessageBlockData{ - BlockData: block, + BlockData: &block, ChainId: block.Block.ChainId.Uint64(), IsDeleted: 0, InsertTimestamp: timestamp, From 1be7b1f37f88594fe3d6fbb6020c4fd32acb7912 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 23:11:45 +0545 Subject: [PATCH 13/21] log kafka push error --- internal/committer/committer.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 277eadf..e3a9775 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -264,7 +264,15 @@ func Commit(chainId *big.Int) error { Msg("Publishing block range data to Kafka") // publish the entire slice to kafka - kafkaPublisher.PublishBlockData(blocksToProcess) + if err := kafkaPublisher.PublishBlockData(blocksToProcess); err != nil { + log.Panic(). + Err(err). + Str("file", blockRange.S3Key). + Uint64("start_block", blocksToProcess[0].Block.Number.Uint64()). + Uint64("end_block", blocksToProcess[len(blocksToProcess)-1].Block.Number.Uint64()). + Int("blocks_count", len(blocksToProcess)). + Msg("Failed to publish block data to Kafka") + } log.Info(). Str("file", blockRange.S3Key). From 41d78c246c053bb935dbf42117b844152c613b00 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 23:23:25 +0545 Subject: [PATCH 14/21] no more file log --- internal/committer/committer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index e3a9775..35a65ae 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -152,6 +152,14 @@ func Commit(chainId *big.Int) error { log.Info().Int("filtered_ranges", len(blockRanges)).Msg("Filtered and sorted block ranges") // Start downloading files in background + // Check if there are any files to process + if len(blockRanges) == 0 { + log.Info(). + Str("next_commit_block", new(big.Int).Add(maxBlockNumber, big.NewInt(1)).String()). + Msg("No files to process - all blocks are up to date") + return nil + } + log.Info().Msg("Starting background file downloads") go downloadFilesInBackground(blockRanges) From 7e1f0ac5d46206e21276b3550eab784b669b2c9a Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 10 Sep 2025 23:55:40 +0545 Subject: [PATCH 15/21] sync downloads --- internal/committer/committer.go | 315 ++++++-------------------------- 1 file changed, 57 insertions(+), 258 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 35a65ae..e41e11f 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -14,6 +14,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/aws/aws-sdk-go-v2/aws" @@ -51,17 +52,12 @@ type ParquetBlockData struct { var clickhouseConn clickhouse.Conn var s3Client *s3.Client var kafkaPublisher *storage.KafkaPublisher -var downloadSemaphore = make(chan struct{}, 3) var tempDir = filepath.Join(os.TempDir(), "committer") var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) var mu sync.RWMutex -var fileDeleted chan string -var downloadComplete chan *BlockRange func Init(chainId *big.Int) { tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64())) - fileDeleted = make(chan string, config.Cfg.StagingS3MaxParallelFileDownload) - downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload) initClickHouse() initS3() @@ -151,7 +147,6 @@ func Commit(chainId *big.Int) error { } log.Info().Int("filtered_ranges", len(blockRanges)).Msg("Filtered and sorted block ranges") - // Start downloading files in background // Check if there are any files to process if len(blockRanges) == 0 { log.Info(). @@ -160,9 +155,6 @@ func Commit(chainId *big.Int) error { return nil } - log.Info().Msg("Starting background file downloads") - go downloadFilesInBackground(blockRanges) - nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting sequential processing") @@ -175,41 +167,12 @@ func Commit(chainId *big.Int) error { Str("end_block", blockRange.EndBlock.String()). Msg("Processing file") - // Wait for this specific file to be downloaded - for { - mu.RLock() - if blockRange.IsDownloaded { - mu.RUnlock() - log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, proceeding") - break - } - mu.RUnlock() - - log.Debug().Str("file", blockRange.S3Key).Msg("Waiting for file download to complete") - // Wait for a download to complete - downloadedRange := <-downloadComplete - - // Check if this is the file we're waiting for - if downloadedRange.StartBlock.Cmp(blockRange.StartBlock) == 0 { - log.Debug().Str("file", downloadedRange.S3Key).Msg("Received correct file, updating blockRange") - // Update the blockRange with the downloaded file's information - mu.Lock() - blockRange.LocalPath = downloadedRange.LocalPath - blockRange.IsDownloaded = downloadedRange.IsDownloaded - blockRange.BlockData = downloadedRange.BlockData - // Clear the downloadedRange's BlockData to free memory immediately - downloadedRange.BlockData = nil - mu.Unlock() - break - } - - log.Debug(). - Str("expected_file", blockRange.S3Key). - Str("received_file", downloadedRange.S3Key). - Msg("Received different file, putting back and waiting") - // If not the right file, put it back and continue waiting - downloadComplete <- downloadedRange + // Download and parse the file synchronously + log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") + if err := downloadFile(&blockRange); err != nil { + log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file") } + log.Debug().Str("file", blockRange.S3Key).Msg("File download completed") log.Info(). Str("file", blockRange.S3Key). @@ -255,7 +218,7 @@ func Commit(chainId *big.Int) error { // Check if we have any blocks to process after filtering if startIndex >= len(blockRange.BlockData) { - log.Info(). + log.Panic(). Str("file", blockRange.S3Key). Msg("All blocks already processed, skipping Kafka publish") return nil @@ -271,17 +234,58 @@ func Commit(chainId *big.Int) error { Str("final_commit_block", nextCommitBlockNumber.String()). Msg("Publishing block range data to Kafka") - // publish the entire slice to kafka - if err := kafkaPublisher.PublishBlockData(blocksToProcess); err != nil { - log.Panic(). - Err(err). + // publish blocks in batches to prevent timeouts + batchSize := 500 // Publish 500 blocks at a time + totalBlocks := len(blocksToProcess) + publishStart := time.Now() + + log.Debug(). + Str("file", blockRange.S3Key). + Int("total_blocks", totalBlocks). + Int("batch_size", batchSize). + Msg("Starting Kafka publish in batches") + + for i := 0; i < totalBlocks; i += batchSize { + end := min(i+batchSize, totalBlocks) + + batch := blocksToProcess[i:end] + batchStart := time.Now() + + log.Debug(). Str("file", blockRange.S3Key). - Uint64("start_block", blocksToProcess[0].Block.Number.Uint64()). - Uint64("end_block", blocksToProcess[len(blocksToProcess)-1].Block.Number.Uint64()). - Int("blocks_count", len(blocksToProcess)). - Msg("Failed to publish block data to Kafka") + Int("batch", i/batchSize+1). + Int("batch_blocks", len(batch)). + Uint64("start_block", batch[0].Block.Number.Uint64()). + Uint64("end_block", batch[len(batch)-1].Block.Number.Uint64()). + Msg("Publishing batch to Kafka") + + if err := kafkaPublisher.PublishBlockData(batch); err != nil { + log.Panic(). + Err(err). + Str("file", blockRange.S3Key). + Int("batch", i/batchSize+1). + Uint64("start_block", batch[0].Block.Number.Uint64()). + Uint64("end_block", batch[len(batch)-1].Block.Number.Uint64()). + Int("batch_blocks", len(batch)). + Msg("Failed to publish batch to Kafka") + } + + batchDuration := time.Since(batchStart) + log.Debug(). + Str("file", blockRange.S3Key). + Int("batch", i/batchSize+1). + Int("batch_blocks", len(batch)). + Str("batch_duration", batchDuration.String()). + Msg("Completed batch publish") } + totalDuration := time.Since(publishStart) + log.Debug(). + Str("file", blockRange.S3Key). + Int("total_blocks", totalBlocks). + Str("total_duration", totalDuration.String()). + Msg("Completed all Kafka publishes") + log.Info(). Str("file", blockRange.S3Key). Int("blocks_processed", len(blocksToProcess)). @@ -301,7 +305,7 @@ func Commit(chainId *big.Int) error { Int("blocks_cleared", blockDataCount). Msg("Cleared block data from memory") - // Clean up local file and notify download goroutine + // Clean up local file if err := os.Remove(blockRange.LocalPath); err != nil { log.Warn(). Err(err). @@ -311,9 +315,6 @@ func Commit(chainId *big.Int) error { log.Debug().Str("file", blockRange.LocalPath).Msg("Cleaned up local file") } - // Notify that file was deleted - fileDeleted <- blockRange.LocalPath - log.Info(). Int("processed", i+1). Int("total", len(blockRanges)). @@ -351,44 +352,6 @@ func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { return maxBlockNumber, nil } -func downloadFilesInBackground(blockRanges []BlockRange) { - maxConcurrentFiles := config.Cfg.StagingS3MaxParallelFileDownload - log.Info(). - Int("total_files", len(blockRanges)). - Int("max_concurrent", maxConcurrentFiles). - Msg("Starting background downloads") - downloadedCount := 0 - - for i := range blockRanges { - // Wait if we've reached the maximum concurrent files - if downloadedCount >= maxConcurrentFiles { - log.Debug().Int("downloaded_count", downloadedCount).Msg("Reached max concurrent files, waiting for deletion") - <-fileDeleted // Wait for a file to be deleted - downloadedCount-- - log.Debug().Int("downloaded_count", downloadedCount).Msg("File deleted, continuing downloads") - } - - log.Debug(). - Int("index", i). - Str("file", blockRanges[i].S3Key). - Int("downloaded_count", downloadedCount). - Msg("Starting download goroutine") - - go func(index int) { - log.Debug().Str("file", blockRanges[index].S3Key).Msg("Download goroutine started") - err := downloadFile(&blockRanges[index]) - if err != nil { - log.Error().Err(err).Str("file", blockRanges[index].S3Key).Msg("Failed to download file") - return - } - log.Debug().Str("file", blockRanges[index].S3Key).Msg("Download completed, sending to channel") - downloadComplete <- &blockRanges[index] - }(i) - - downloadedCount++ - } -} - // listS3ParquetFiles lists all parquet files in S3 with the chain prefix func listS3ParquetFiles(chainId *big.Int) ([]string, error) { prefix := fmt.Sprintf("chain_%d/", chainId.Uint64()) @@ -486,12 +449,6 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR func downloadFile(blockRange *BlockRange) error { log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") - // Acquire semaphore to limit concurrent downloads - downloadSemaphore <- struct{}{} - defer func() { <-downloadSemaphore }() - - log.Debug().Str("file", blockRange.S3Key).Msg("Acquired download semaphore") - // Ensure temp directory exists if err := os.MkdirAll(tempDir, 0755); err != nil { return fmt.Errorf("failed to create temp directory: %w", err) @@ -619,11 +576,6 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { // Extract block number blockNum := row[0][1].Uint64() - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Msg("Processing parquet row") - // Build ParquetBlockData from row pd := ParquetBlockData{ ChainId: row[0][0].Uint64(), @@ -686,159 +638,6 @@ func parseParquetFile(filePath string) ([]common.BlockData, error) { return allBlockData, nil } -// streamParquetFile streams a parquet file row by row and processes blocks -func streamParquetFile(filePath string, nextCommitBlockNumber *big.Int) error { - log.Debug(). - Str("file", filePath). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Opening parquet file for streaming") - - file, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open parquet file: %w", err) - } - defer file.Close() - - stat, err := file.Stat() - if err != nil { - return fmt.Errorf("failed to get file stats: %w", err) - } - log.Debug(). - Str("file", filePath). - Int64("size_bytes", stat.Size()). - Msg("File stats retrieved") - - pFile, err := parquet.OpenFile(file, stat.Size()) - if err != nil { - return fmt.Errorf("failed to open parquet file: %w", err) - } - log.Debug(). - Str("file", filePath). - Int("row_groups", len(pFile.RowGroups())). - Msg("Parquet file opened successfully") - - processedBlocks := 0 - for rgIndex, rg := range pFile.RowGroups() { - log.Debug(). - Str("file", filePath). - Int("row_group", rgIndex). - Int64("num_rows", rg.NumRows()). - Msg("Processing row group") - - // Use row-by-row reading to avoid loading entire row group into memory - reader := parquet.NewRowGroupReader(rg) - rowGroupBlocks := 0 - - for { - // Read single row - row := make([]parquet.Row, 1) - n, err := reader.ReadRows(row) - - // Process the row if we successfully read it, even if EOF occurred - if n > 0 { - if len(row[0]) < 8 { - if err == io.EOF { - break // EOF and no valid row, we're done - } - continue // Not enough columns, try again - } - - // Extract block number first to check if we need this row - blockNum := row[0][1].Uint64() // block_number is second column - blockNumber := big.NewInt(int64(blockNum)) - - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Read block from parquet file") - - // Skip if block number is less than next commit block number - if blockNumber.Cmp(nextCommitBlockNumber) < 0 { - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Skipping block - already processed") - if err == io.EOF { - break // EOF after processing, we're done - } - continue - } - - // If block number is greater than next commit block number, exit with error - if blockNumber.Cmp(nextCommitBlockNumber) > 0 { - log.Error(). - Str("file", filePath). - Uint64("block_number", blockNum). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Found block number greater than expected - missing block in sequence") - return fmt.Errorf("block data not found for block number %s in S3", nextCommitBlockNumber.String()) - } - - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Str("next_commit_block", nextCommitBlockNumber.String()). - Msg("Processing block") - - // Build ParquetBlockData from row - pd := ParquetBlockData{ - ChainId: row[0][0].Uint64(), - BlockNumber: blockNum, - BlockHash: row[0][2].String(), - BlockTimestamp: row[0][3].Int64(), - Block: row[0][4].ByteArray(), - Transactions: row[0][5].ByteArray(), - Logs: row[0][6].ByteArray(), - Traces: row[0][7].ByteArray(), - } - - // Parse block data - blockData, err := parseBlockData(pd) - if err != nil { - return fmt.Errorf("failed to parse block data: %w", err) - } - - log.Debug(). - Str("file", filePath). - Uint64("block_number", blockNum). - Msg("Publishing block data to Kafka") - - kafkaPublisher.PublishBlockData([]common.BlockData{blockData}) - nextCommitBlockNumber.Add(nextCommitBlockNumber, big.NewInt(1)) - processedBlocks++ - rowGroupBlocks++ - } - - // Handle EOF and other errors - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to read row: %w", err) - } - if n == 0 { - continue // No rows read in this call, try again - } - } - - log.Debug(). - Str("file", filePath). - Int("row_group", rgIndex). - Int("blocks_processed", rowGroupBlocks). - Msg("Completed row group") - } - - log.Info(). - Str("file", filePath). - Int("total_blocks_processed", processedBlocks). - Str("final_commit_block", nextCommitBlockNumber.String()). - Msg("Completed parquet file processing") - - return nil -} - // parseBlockData converts ParquetBlockData to common.BlockData func parseBlockData(pd ParquetBlockData) (common.BlockData, error) { // Unmarshal JSON data From 1c7900cfe7b3985809674bb101d44749f1cd6206 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 00:17:42 +0545 Subject: [PATCH 16/21] lookahead download --- internal/committer/committer.go | 67 +++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index e41e11f..753e07e 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -29,12 +29,13 @@ import ( // BlockRange represents a range of blocks in an S3 parquet file type BlockRange struct { - StartBlock *big.Int `json:"start_block"` - EndBlock *big.Int `json:"end_block"` - S3Key string `json:"s3_key"` - IsDownloaded bool `json:"is_downloaded"` - LocalPath string `json:"local_path,omitempty"` - BlockData []common.BlockData `json:"block_data,omitempty"` + StartBlock *big.Int `json:"start_block"` + EndBlock *big.Int `json:"end_block"` + S3Key string `json:"s3_key"` + IsDownloaded bool `json:"is_downloaded"` + IsDownloading bool `json:"is_downloading"` + LocalPath string `json:"local_path,omitempty"` + BlockData []common.BlockData `json:"block_data,omitempty"` } // ParquetBlockData represents the block data structure in parquet files @@ -167,7 +168,20 @@ func Commit(chainId *big.Int) error { Str("end_block", blockRange.EndBlock.String()). Msg("Processing file") - // Download and parse the file synchronously + // Start downloading the next file in background (lookahead) + if i+1 < len(blockRanges) { + nextBlockRange := &blockRanges[i+1] + log.Debug(). + Str("next_file", nextBlockRange.S3Key). + Msg("Starting lookahead download") + go func(br *BlockRange) { + if err := downloadFile(br); err != nil { + log.Error().Err(err).Str("file", br.S3Key).Msg("Failed to download file in background") + } + }(nextBlockRange) + } + + // Download and parse the current file log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") if err := downloadFile(&blockRange); err != nil { log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file") @@ -449,6 +463,45 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR func downloadFile(blockRange *BlockRange) error { log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") + // Check if file is already downloaded + mu.RLock() + if blockRange.IsDownloaded { + mu.RUnlock() + log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, skipping") + return nil + } + mu.RUnlock() + + // Check if file is already being downloaded by another goroutine + mu.Lock() + if blockRange.IsDownloading { + mu.Unlock() + log.Debug().Str("file", blockRange.S3Key).Msg("File is already being downloaded, waiting...") + + // Poll every 250ms until download is complete + for { + time.Sleep(250 * time.Millisecond) + mu.RLock() + if blockRange.IsDownloaded { + mu.RUnlock() + log.Debug().Str("file", blockRange.S3Key).Msg("Download completed by another goroutine") + return nil + } + mu.RUnlock() + } + } + + // Mark as downloading + blockRange.IsDownloading = true + mu.Unlock() + + // Ensure downloading flag is cleared on error + defer func() { + mu.Lock() + blockRange.IsDownloading = false + mu.Unlock() + }() + // Ensure temp directory exists if err := os.MkdirAll(tempDir, 0755); err != nil { return fmt.Errorf("failed to create temp directory: %w", err) From 700befa672ebddcdae7ddef514b6e8b778977706 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 00:44:01 +0545 Subject: [PATCH 17/21] channel based download --- internal/committer/committer.go | 149 +++++++++++++------------------- 1 file changed, 60 insertions(+), 89 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 753e07e..2dbe6b1 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -56,9 +56,11 @@ var kafkaPublisher *storage.KafkaPublisher var tempDir = filepath.Join(os.TempDir(), "committer") var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) var mu sync.RWMutex +var downloadComplete chan *BlockRange func Init(chainId *big.Int) { tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64())) + downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload) initClickHouse() initS3() @@ -157,8 +159,16 @@ func Commit(chainId *big.Int) error { } nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) - log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting sequential processing") + log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting producer-consumer processing") + // Start the block range processor goroutine + processorDone := make(chan struct{}) + go func() { + blockRangeProcessor(nextCommitBlockNumber) + close(processorDone) + }() + + // Download files synchronously and send to channel for i, blockRange := range blockRanges { log.Info(). Int("processing", i+1). @@ -166,28 +176,57 @@ func Commit(chainId *big.Int) error { Str("file", blockRange.S3Key). Str("start_block", blockRange.StartBlock.String()). Str("end_block", blockRange.EndBlock.String()). - Msg("Processing file") - - // Start downloading the next file in background (lookahead) - if i+1 < len(blockRanges) { - nextBlockRange := &blockRanges[i+1] - log.Debug(). - Str("next_file", nextBlockRange.S3Key). - Msg("Starting lookahead download") - go func(br *BlockRange) { - if err := downloadFile(br); err != nil { - log.Error().Err(err).Str("file", br.S3Key).Msg("Failed to download file in background") - } - }(nextBlockRange) - } + Msg("Starting download") - // Download and parse the current file - log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") if err := downloadFile(&blockRange); err != nil { log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file") } - log.Debug().Str("file", blockRange.S3Key).Msg("File download completed") + log.Debug().Str("file", blockRange.S3Key).Msg("Download completed, sending to channel") + downloadComplete <- &blockRange + } + + // Close channel to signal processor that all downloads are done + log.Info().Msg("All downloads completed, waiting for processing to finish") + close(downloadComplete) + <-processorDone + log.Info().Msg("All processing completed successfully") + + return nil +} + +func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { + // Use toString() to force ClickHouse to return a string instead of UInt256 + query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64()) + rows, err := clickhouseConn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + if !rows.Next() { + return big.NewInt(0), nil + } + + var maxBlockNumberStr string + if err := rows.Scan(&maxBlockNumberStr); err != nil { + return nil, err + } + + // Convert string to big.Int to handle UInt256 values + maxBlockNumber, ok := new(big.Int).SetString(maxBlockNumberStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", maxBlockNumberStr) + } + + return maxBlockNumber, nil +} + +// blockRangeProcessor processes BlockRanges from the download channel and publishes to Kafka +func blockRangeProcessor(nextCommitBlockNumber *big.Int) { + log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting block range processor") + + for blockRange := range downloadComplete { log.Info(). Str("file", blockRange.S3Key). Str("next_commit_block", nextCommitBlockNumber.String()). @@ -199,7 +238,7 @@ func Commit(chainId *big.Int) error { log.Warn(). Str("file", blockRange.S3Key). Msg("No block data found in parquet file, skipping") - return nil + continue } // Process block data sequentially @@ -235,7 +274,7 @@ func Commit(chainId *big.Int) error { log.Panic(). Str("file", blockRange.S3Key). Msg("All blocks already processed, skipping Kafka publish") - return nil + continue } blocksToProcess := blockRange.BlockData[startIndex:] @@ -330,40 +369,11 @@ func Commit(chainId *big.Int) error { } log.Info(). - Int("processed", i+1). - Int("total", len(blockRanges)). Str("file", blockRange.S3Key). Msg("Completed processing file") } - return nil -} - -func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) { - // Use toString() to force ClickHouse to return a string instead of UInt256 - query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64()) - rows, err := clickhouseConn.Query(context.Background(), query) - if err != nil { - return nil, err - } - defer rows.Close() - - if !rows.Next() { - return big.NewInt(0), nil - } - - var maxBlockNumberStr string - if err := rows.Scan(&maxBlockNumberStr); err != nil { - return nil, err - } - - // Convert string to big.Int to handle UInt256 values - maxBlockNumber, ok := new(big.Int).SetString(maxBlockNumberStr, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", maxBlockNumberStr) - } - - return maxBlockNumber, nil + log.Info().Msg("Block range processor finished") } // listS3ParquetFiles lists all parquet files in S3 with the chain prefix @@ -463,45 +473,6 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR func downloadFile(blockRange *BlockRange) error { log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download") - // Check if file is already downloaded - mu.RLock() - if blockRange.IsDownloaded { - mu.RUnlock() - log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, skipping") - return nil - } - mu.RUnlock() - - // Check if file is already being downloaded by another goroutine - mu.Lock() - if blockRange.IsDownloading { - mu.Unlock() - log.Debug().Str("file", blockRange.S3Key).Msg("File is already being downloaded, waiting...") - - // Poll every 250ms until download is complete - for { - time.Sleep(250 * time.Millisecond) - mu.RLock() - if blockRange.IsDownloaded { - mu.RUnlock() - log.Debug().Str("file", blockRange.S3Key).Msg("Download completed by another goroutine") - return nil - } - mu.RUnlock() - } - } - - // Mark as downloading - blockRange.IsDownloading = true - mu.Unlock() - - // Ensure downloading flag is cleared on error - defer func() { - mu.Lock() - blockRange.IsDownloading = false - mu.Unlock() - }() - // Ensure temp directory exists if err := os.MkdirAll(tempDir, 0755); err != nil { return fmt.Errorf("failed to create temp directory: %w", err) From 27ced99ef74ed2ebdd58850e124c9a0861b32fcd Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 01:50:14 +0545 Subject: [PATCH 18/21] fetch latest block after s3 is completed --- cmd/committer.go | 2 +- internal/committer/committer.go | 193 +++++++++++++++++++++++++++++- internal/committer/fetchLatest.go | 21 ++++ 3 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 internal/committer/fetchLatest.go diff --git a/cmd/committer.go b/cmd/committer.go index 170e1db..a966aea 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -24,6 +24,6 @@ func RunCommitter(cmd *cobra.Command, args []string) { } chainId := rpc.GetChainID() - committer.Init(chainId) + committer.Init(chainId, rpc) committer.Commit(chainId) } diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 2dbe6b1..0292ae3 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -24,6 +24,7 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" ) @@ -50,6 +51,7 @@ type ParquetBlockData struct { Traces []byte `parquet:"traces_json"` } +var rpcClient rpc.IRPCClient var clickhouseConn clickhouse.Conn var s3Client *s3.Client var kafkaPublisher *storage.KafkaPublisher @@ -58,7 +60,8 @@ var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`) var mu sync.RWMutex var downloadComplete chan *BlockRange -func Init(chainId *big.Int) { +func Init(chainId *big.Int, rpc rpc.IRPCClient) { + rpcClient = rpc tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64())) downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload) @@ -192,6 +195,9 @@ func Commit(chainId *big.Int) error { <-processorDone log.Info().Msg("All processing completed successfully") + log.Info().Msg("Fetching latest blocks") + fetchLatest(nextCommitBlockNumber) + return nil } @@ -710,3 +716,188 @@ func Close() error { // Clean up temp directory return os.RemoveAll(tempDir) } + +func fetchLatest(nextCommitBlockNumber *big.Int) error { + for { + latestBlock, err := rpcClient.GetLatestBlockNumber(context.Background()) + if err != nil { + log.Warn().Err(err).Msg("Failed to get latest block number, retrying...") + time.Sleep(250 * time.Millisecond) + continue + } + if nextCommitBlockNumber.Cmp(latestBlock) >= 0 { + time.Sleep(250 * time.Millisecond) + continue + } + + // Configuration variables + rpcBatchSize := int64(50) // Number of blocks per batch + rpcNumParallelCalls := int64(10) // Maximum number of parallel RPC calls + maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Total blocks per fetch cycle + + // Calculate the range of blocks to fetch + blocksToFetch := new(big.Int).Sub(latestBlock, nextCommitBlockNumber) + if blocksToFetch.Cmp(big.NewInt(maxBlocksPerFetch)) > 0 { + blocksToFetch = big.NewInt(maxBlocksPerFetch) // Limit to maxBlocksPerFetch blocks per batch + } + + log.Info(). + Str("next_commit_block", nextCommitBlockNumber.String()). + Str("latest_block", latestBlock.String()). + Str("blocks_to_fetch", blocksToFetch.String()). + Int64("batch_size", rpcBatchSize). + Int64("max_parallel_calls", rpcNumParallelCalls). + Msg("Starting to fetch latest blocks") + + // Precreate array of block data + blockDataArray := make([]common.BlockData, blocksToFetch.Int64()) + + // Create batches and calculate number of parallel calls needed + numBatches := min((blocksToFetch.Int64()+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls) + + var wg sync.WaitGroup + var mu sync.Mutex + var fetchErrors []error + + for batchIndex := int64(0); batchIndex < numBatches; batchIndex++ { + wg.Add(1) + go func(batchIdx int64) { + defer wg.Done() + + startBlock := new(big.Int).Add(nextCommitBlockNumber, big.NewInt(batchIdx*rpcBatchSize)) + endBlock := new(big.Int).Add(startBlock, big.NewInt(rpcBatchSize-1)) + + // Don't exceed the latest block + if endBlock.Cmp(latestBlock) > 0 { + endBlock = latestBlock + } + + log.Debug(). + Int64("batch", batchIdx). + Str("start_block", startBlock.String()). + Str("end_block", endBlock.String()). + Msg("Starting batch fetch") + + // Create block numbers array for this batch + var blockNumbers []*big.Int + for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) <= 0; i.Add(i, big.NewInt(1)) { + blockNumbers = append(blockNumbers, new(big.Int).Set(i)) + } + + // Make RPC call with retry mechanism (3 retries) + var batchResults []rpc.GetFullBlockResult + var fetchErr error + + for retry := 0; retry < 3; retry++ { + batchResults = rpcClient.GetFullBlocks(context.Background(), blockNumbers) + + // Check if all blocks were fetched successfully + allSuccess := true + for _, result := range batchResults { + if result.Error != nil { + allSuccess = false + break + } + } + + if allSuccess { + break + } + + if retry < 2 { + log.Warn(). + Int64("batch", batchIdx). + Int("retry", retry+1). + Msg("Batch fetch failed, retrying...") + time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) + } else { + fetchErr = fmt.Errorf("batch %d failed after 3 retries", batchIdx) + } + } + + if fetchErr != nil { + mu.Lock() + fetchErrors = append(fetchErrors, fetchErr) + mu.Unlock() + return + } + + // Set values to the array + mu.Lock() + for i, result := range batchResults { + arrayIndex := batchIdx*rpcBatchSize + int64(i) + if arrayIndex < int64(len(blockDataArray)) { + blockDataArray[arrayIndex] = result.Data + } + } + mu.Unlock() + + log.Debug(). + Int64("batch", batchIdx). + Int("blocks_fetched", len(batchResults)). + Msg("Completed batch fetch") + }(batchIndex) + } + + // Wait for all goroutines to complete + wg.Wait() + + // Check for fetch errors + if len(fetchErrors) > 0 { + log.Error(). + Int("error_count", len(fetchErrors)). + Msg("Some batches failed to fetch") + for _, err := range fetchErrors { + log.Error().Err(err).Msg("Batch fetch error") + } + log.Panic().Msg("Failed to fetch all required blocks") + } + + // Validate that all blocks are sequential and nothing is missing + expectedBlockNumber := new(big.Int).Set(nextCommitBlockNumber) + for i, blockData := range blockDataArray { + if blockData.Block.Number == nil { + log.Panic(). + Int("index", i). + Str("expected_block", expectedBlockNumber.String()). + Msg("Found nil block number in array") + } + + if blockData.Block.Number.Cmp(expectedBlockNumber) != 0 { + log.Panic(). + Int("index", i). + Str("expected_block", expectedBlockNumber.String()). + Str("actual_block", blockData.Block.Number.String()). + Msg("Block sequence mismatch - missing or out of order block") + } + + expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1)) + } + + log.Info(). + Int("total_blocks", len(blockDataArray)). + Str("start_block", nextCommitBlockNumber.String()). + Str("end_block", new(big.Int).Sub(expectedBlockNumber, big.NewInt(1)).String()). + Msg("All blocks validated successfully") + + // Publish to Kafka + log.Info(). + Int("blocks_to_publish", len(blockDataArray)). + Msg("Publishing blocks to Kafka") + + if err := kafkaPublisher.PublishBlockData(blockDataArray); err != nil { + log.Panic(). + Err(err). + Int("blocks_count", len(blockDataArray)). + Msg("Failed to publish blocks to Kafka") + } + + log.Info(). + Int("blocks_published", len(blockDataArray)). + Str("next_commit_block", expectedBlockNumber.String()). + Msg("Successfully published blocks to Kafka") + + // Update nextCommitBlockNumber for next iteration + nextCommitBlockNumber.Set(expectedBlockNumber) + } +} diff --git a/internal/committer/fetchLatest.go b/internal/committer/fetchLatest.go new file mode 100644 index 0000000..3b1daac --- /dev/null +++ b/internal/committer/fetchLatest.go @@ -0,0 +1,21 @@ +package committer + +import ( + "context" + "math/big" + + "github.com/thirdweb-dev/indexer/internal/rpc" +) + +func FetchLatest(chainId *big.Int, rpc rpc.IRPCClient) error { + for { + latestBlock, err := rpc.GetLatestBlockNumber(context.Background()) + if err != nil { + return err + } + if latestBlock.Cmp(chainId) > 0 { + return nil + } + } + return nil +} From 0d0fa7588ef54cd38297fba330b50cb93ad776da Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 01:50:30 +0545 Subject: [PATCH 19/21] minor change --- internal/committer/committer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 0292ae3..62aaf9b 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -190,10 +190,10 @@ func Commit(chainId *big.Int) error { } // Close channel to signal processor that all downloads are done - log.Info().Msg("All downloads completed, waiting for processing to finish") + log.Info().Msg("All downloads completed, waiting for processing to finish from S3") close(downloadComplete) <-processorDone - log.Info().Msg("All processing completed successfully") + log.Info().Msg("All processing completed successfully from S3") log.Info().Msg("Fetching latest blocks") fetchLatest(nextCommitBlockNumber) From 7466bc9c227d54275a73454196f42ee00153acf3 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 02:21:46 +0545 Subject: [PATCH 20/21] minor change --- internal/committer/committer.go | 84 ++++++++++++++++----------------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 62aaf9b..7648400 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -139,6 +139,9 @@ func Commit(chainId *big.Int) error { } log.Info().Str("max_block_number", maxBlockNumber.String()).Msg("Retrieved max block number from ClickHouse") + nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) + log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting producer-consumer processing") + files, err := listS3ParquetFiles(chainId) if err != nil { log.Error().Err(err).Msg("Failed to list S3 parquet files") @@ -154,48 +157,44 @@ func Commit(chainId *big.Int) error { log.Info().Int("filtered_ranges", len(blockRanges)).Msg("Filtered and sorted block ranges") // Check if there are any files to process - if len(blockRanges) == 0 { - log.Info(). - Str("next_commit_block", new(big.Int).Add(maxBlockNumber, big.NewInt(1)).String()). - Msg("No files to process - all blocks are up to date") - return nil - } - - nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1)) - log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting producer-consumer processing") - - // Start the block range processor goroutine - processorDone := make(chan struct{}) - go func() { - blockRangeProcessor(nextCommitBlockNumber) - close(processorDone) - }() + if len(blockRanges) != 0 { + // Start the block range processor goroutine + processorDone := make(chan struct{}) + go func() { + blockRangeProcessor(nextCommitBlockNumber) + close(processorDone) + }() + + // Download files synchronously and send to channel + for i, blockRange := range blockRanges { + log.Info(). + Int("processing", i+1). + Int("total", len(blockRanges)). + Str("file", blockRange.S3Key). + Str("start_block", blockRange.StartBlock.String()). + Str("end_block", blockRange.EndBlock.String()). + Msg("Starting download") - // Download files synchronously and send to channel - for i, blockRange := range blockRanges { - log.Info(). - Int("processing", i+1). - Int("total", len(blockRanges)). - Str("file", blockRange.S3Key). - Str("start_block", blockRange.StartBlock.String()). - Str("end_block", blockRange.EndBlock.String()). - Msg("Starting download") + if err := downloadFile(&blockRange); err != nil { + log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file") + } - if err := downloadFile(&blockRange); err != nil { - log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file") + log.Debug().Str("file", blockRange.S3Key).Msg("Download completed, sending to channel") + downloadComplete <- &blockRange } - log.Debug().Str("file", blockRange.S3Key).Msg("Download completed, sending to channel") - downloadComplete <- &blockRange + // Close channel to signal processor that all downloads are done + log.Info().Msg("All downloads completed, waiting for processing to finish from S3") + close(downloadComplete) + <-processorDone + log.Info().Msg("All processing completed successfully from S3") + } else { + log.Info(). + Str("next_commit_block", nextCommitBlockNumber.String()). + Msg("No files to process - all blocks are up to date from S3") } - // Close channel to signal processor that all downloads are done - log.Info().Msg("All downloads completed, waiting for processing to finish from S3") - close(downloadComplete) - <-processorDone - log.Info().Msg("All processing completed successfully from S3") - - log.Info().Msg("Fetching latest blocks") + log.Info().Msg("Consuming latest blocks from RPC") fetchLatest(nextCommitBlockNumber) return nil @@ -451,11 +450,6 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR // Skip files where end block is less than max block number from ClickHouse if endBlock.Cmp(maxBlockNumber) <= 0 { - log.Debug(). - Str("file", file). - Str("end_block", endBlock.String()). - Str("max_block", maxBlockNumber.String()). - Msg("Skipping file - end block is less than or equal to max block") continue } @@ -468,9 +462,11 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR } // Sort by start block number in ascending order - sort.Slice(blockRanges, func(i, j int) bool { - return blockRanges[i].StartBlock.Cmp(blockRanges[j].StartBlock) < 0 - }) + if len(blockRanges) > 0 { + sort.Slice(blockRanges, func(i, j int) bool { + return blockRanges[i].StartBlock.Cmp(blockRanges[j].StartBlock) < 0 + }) + } return blockRanges, nil } From b8be8e463ef23fcb9eb1f0e147bd4e5315ae19ef Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Sep 2025 02:40:54 +0545 Subject: [PATCH 21/21] committer rpc parallel calls env --- configs/config.go | 1 + internal/committer/committer.go | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/configs/config.go b/configs/config.go index 0d75f7a..abeef9c 100644 --- a/configs/config.go +++ b/configs/config.go @@ -288,6 +288,7 @@ type Config struct { StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` + CommitterRPCNumParallelCalls int64 `env:"COMMITTER_RPC_NUM_PARALLEL_CALLS" envDefault:"10"` } var Cfg Config diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 7648400..922dc1f 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -726,15 +726,14 @@ func fetchLatest(nextCommitBlockNumber *big.Int) error { continue } - // Configuration variables - rpcBatchSize := int64(50) // Number of blocks per batch - rpcNumParallelCalls := int64(10) // Maximum number of parallel RPC calls - maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Total blocks per fetch cycle + rpcNumParallelCalls := config.Cfg.CommitterRPCNumParallelCalls + rpcBatchSize := int64(50) + maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Calculate the range of blocks to fetch blocksToFetch := new(big.Int).Sub(latestBlock, nextCommitBlockNumber) if blocksToFetch.Cmp(big.NewInt(maxBlocksPerFetch)) > 0 { - blocksToFetch = big.NewInt(maxBlocksPerFetch) // Limit to maxBlocksPerFetch blocks per batch + blocksToFetch = big.NewInt(maxBlocksPerFetch) } log.Info().