Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions internal/committer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
return nil
}

// finding the reorg start and end block
// 1) Block verification: find reorg range from header continuity (existing behavior)
reorgStartBlock := int64(-1)
reorgEndBlock := int64(-1)
for i := 1; i < len(blockHeaders); i++ {
Expand All @@ -131,18 +131,69 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
}

// set end to the last block if not set
lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
if reorgEndBlock == -1 {
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
// No header-based end detected; default to the last header for last-valid-block tracking.
reorgEndBlock = lastHeaderBlock
}

// 2) Transaction verification: check for mismatches between block.transaction_count
// and the number of transactions stored per block in ClickHouse.
txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
}

// 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse.
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
}

// 4) Combine all ranges:
// - If all three ranges (blocks, tx, logs) are empty, then there is no reorg.
// - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range.
finalStart := int64(-1)
finalEnd := int64(-1)

// block headers range
if reorgStartBlock > -1 {
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
return err
finalStart = reorgStartBlock
finalEnd = reorgEndBlock
}

// transactions range
if txStart > -1 {
if finalStart == -1 || txStart < finalStart {
finalStart = txStart
}
if finalEnd == -1 || txEnd > finalEnd {
finalEnd = txEnd
}
}

// logs range
if logsStart > -1 {
if finalStart == -1 || logsStart < finalStart {
finalStart = logsStart
}
if finalEnd == -1 || logsEnd > finalEnd {
finalEnd = logsEnd
}
}

// update last valid block. if there was no reorg, this will update to the last block
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
lastValidBlock := lastHeaderBlock
if finalStart > -1 {
// We found at least one inconsistent range; reorg from min(start) to max(end).
if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil {
return err
}
lastValidBlock = finalEnd
}
err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock)
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err)
}

return nil
}
Expand Down
167 changes: 167 additions & 0 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ var defaultTraceFields = []string{
"reward_type", "refund_address",
}

type blockTxAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
TxCount uint64 `ch:"tx_count"`
}

type blockLogAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
LogCount uint64 `ch:"log_count"`
MaxLogIndex uint64 `ch:"max_log_index"`
}

// only use this for backfill or getting old data.
var ClickhouseConnV1 clickhouse.Conn

Expand Down Expand Up @@ -253,6 +264,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
return blockData, nil
}

// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where the stored transaction_count in the blocks table does not match the number
// of transactions in the transactions table. It returns the minimum and maximum
// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent.
func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate transaction counts per block from the transactions table.
query := fmt.Sprintf(
"SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

txAggRows, err := execQueryV2[blockTxAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err)
}

txCounts := make(map[uint64]uint64, len(txAggRows))
for _, row := range txAggRows {
if row.BlockNumber == nil {
continue
}
txCounts[row.BlockNumber.Uint64()] = row.TxCount
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
expectedTxCount := block.TransactionCount
actualTxCount, hasTx := txCounts[bn]

mismatch := false
if expectedTxCount == 0 {
// Header says no transactions; ensure there are none in the table.
if hasTx && actualTxCount > 0 {
mismatch = true
}
} else {
// Header says there should be transactions.
if !hasTx || actualTxCount != expectedTxCount {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where logs in the logs table are inconsistent with the block's logs_bloom:
// - logsBloom is non-empty but there are no logs for that block
// - logsBloom is empty/zero but logs exist
// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist)
// It returns the minimum and maximum block numbers that have a mismatch, or
// (-1, -1) if all blocks are consistent.
func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate log counts and max log_index per block from the logs table.
query := fmt.Sprintf(
"SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

logAggRows, err := execQueryV2[blockLogAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err)
}

logAggs := make(map[uint64]blockLogAggregate, len(logAggRows))
for _, row := range logAggRows {
if row.BlockNumber == nil {
continue
}
bn := row.BlockNumber.Uint64()
logAggs[bn] = row
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM
logAgg, hasLogAgg := logAggs[bn]

mismatch := false

if hasLogsBloom {
// logsBloom indicates logs should exist
if !hasLogAgg || logAgg.LogCount == 0 {
mismatch = true
} else if logAgg.MaxLogIndex+1 != logAgg.LogCount {
// log_index should be contiguous from 0..log_count-1
mismatch = true
}
} else {
// logsBloom is empty/zero; there should be no logs
if hasLogAgg && logAgg.LogCount > 0 {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
Expand Down