Skip to content

Commit f164add

Browse files
committed
iceberg loader: Add reorg aware streaming support
1 parent 7620102 commit f164add

File tree

2 files changed

+424
-1
lines changed

2 files changed

+424
-1
lines changed

src/amp/loaders/implementations/iceberg_loader.py

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# src/amp/loaders/implementations/iceberg_loader.py
22

3+
import json
34
from dataclasses import dataclass
4-
from typing import Any, Dict, Optional
5+
from typing import Any, Dict, List, Optional
56

67
import pyarrow as pa
78
import pyarrow.compute as pc
@@ -32,6 +33,7 @@
3233
ICEBERG_AVAILABLE = False
3334

3435
# Import types for better IDE support
36+
from ...streaming.types import BlockRange
3537
from ..base import DataLoader, LoadMode
3638
from .iceberg_types import IcebergCatalog, IcebergTable
3739

@@ -510,3 +512,188 @@ def get_table_info(self, table_name: str) -> Dict[str, Any]:
510512
except Exception as e:
511513
self.logger.error(f'Failed to get table info for {table_name}: {e}')
512514
return {'exists': False, 'error': str(e), 'table_name': table_name}
515+
516+
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
517+
"""
518+
Handle blockchain reorganization by deleting affected rows from Iceberg table.
519+
520+
Iceberg's time-travel capabilities make this particularly powerful:
521+
- We can precisely delete affected data using predicates
522+
- Snapshots preserve history if rollback is needed
523+
- ACID transactions ensure consistency
524+
525+
Args:
526+
invalidation_ranges: List of block ranges to invalidate (reorg points)
527+
table_name: The table containing the data to invalidate
528+
"""
529+
if not invalidation_ranges:
530+
return
531+
532+
try:
533+
# Load the Iceberg table
534+
table_identifier = f'{self.config.namespace}.{table_name}'
535+
try:
536+
iceberg_table = self._catalog.load_table(table_identifier)
537+
except NoSuchTableError:
538+
self.logger.warning(f"Table '{table_identifier}' does not exist, skipping reorg handling")
539+
return
540+
541+
# Build delete predicate for all invalidation ranges
542+
# For Iceberg, we'll use PyArrow expressions which get converted automatically
543+
delete_conditions = []
544+
545+
for range_obj in invalidation_ranges:
546+
network = range_obj.network
547+
reorg_start = range_obj.start
548+
549+
# Create condition for this network's reorg
550+
# Delete all rows where the block range metadata for this network has end >= reorg_start
551+
# This catches both overlapping ranges and forward ranges from the reorg point
552+
553+
# Build expression to check _meta_block_ranges JSON array
554+
# We need to parse the JSON and check if any range for this network
555+
# has an end block >= reorg_start
556+
delete_conditions.append(
557+
f'_meta_block_ranges LIKE \'%"network":"{network}"%\' AND '
558+
f'EXISTS (SELECT 1 FROM JSON_ARRAY_ELEMENTS(_meta_block_ranges) AS range_elem '
559+
f"WHERE range_elem->>'network' = '{network}' AND "
560+
f"(range_elem->>'end')::int >= {reorg_start})"
561+
)
562+
563+
# Process reorg if we have deletion conditions
564+
if delete_conditions:
565+
self.logger.info(
566+
f'Executing blockchain reorg deletion for {len(invalidation_ranges)} networks '
567+
f"in Iceberg table '{table_name}'"
568+
)
569+
570+
# Since PyIceberg doesn't have a direct delete API yet, we'll use overwrite
571+
# with filtered data as a workaround
572+
# Future: Use SQL delete when available:
573+
# combined_condition = ' OR '.join(f'({cond})' for cond in delete_conditions)
574+
# delete_expr = f"DELETE FROM {table_identifier} WHERE {combined_condition}"
575+
self._perform_reorg_deletion(iceberg_table, invalidation_ranges, table_name)
576+
577+
except Exception as e:
578+
self.logger.error(f"Failed to handle blockchain reorg for table '{table_name}': {str(e)}")
579+
raise
580+
581+
def _perform_reorg_deletion(
582+
self, iceberg_table: IcebergTable, invalidation_ranges: List[BlockRange], table_name: str
583+
) -> None:
584+
"""
585+
Perform the actual deletion for reorg handling using Iceberg's capabilities.
586+
587+
Since PyIceberg doesn't have a direct DELETE API yet, we'll use scan and overwrite
588+
to achieve the same effect while maintaining ACID guarantees.
589+
"""
590+
try:
591+
# First, scan the table to get current data
592+
# We'll filter out the invalidated ranges during the scan
593+
scan = iceberg_table.scan()
594+
595+
# Read all data into memory (for now - could be optimized with streaming)
596+
arrow_table = scan.to_arrow()
597+
598+
if arrow_table.num_rows == 0:
599+
self.logger.info(f"Table '{table_name}' is empty, nothing to delete for reorg")
600+
return
601+
602+
# Check if the table has the metadata column
603+
if '_meta_block_ranges' not in arrow_table.schema.names:
604+
self.logger.warning(
605+
f"Table '{table_name}' doesn't have '_meta_block_ranges' column, skipping reorg handling"
606+
)
607+
return
608+
609+
# Filter out invalidated rows
610+
import pyarrow.compute as pc
611+
612+
# Start with all rows marked as valid
613+
keep_mask = pc.equal(pc.scalar(True), pc.scalar(True))
614+
615+
for range_obj in invalidation_ranges:
616+
network = range_obj.network
617+
reorg_start = range_obj.start
618+
619+
# For each row, check if it should be invalidated
620+
# This is complex with JSON, so we'll parse and check each row
621+
for i in range(arrow_table.num_rows):
622+
meta_json = arrow_table['_meta_block_ranges'][i].as_py()
623+
if meta_json:
624+
try:
625+
ranges_data = json.loads(meta_json)
626+
# Check if any range for this network should be invalidated
627+
for range_info in ranges_data:
628+
if range_info['network'] == network and range_info['end'] >= reorg_start:
629+
# Mark this row for deletion
630+
keep_mask = pc.and_(keep_mask, pc.not_equal(pc.scalar(i), pc.scalar(i)))
631+
break
632+
except (json.JSONDecodeError, KeyError):
633+
continue
634+
635+
# Create a filtered table with only the rows we want to keep
636+
# For a more efficient implementation, build a boolean array
637+
keep_indices = []
638+
deleted_count = 0
639+
640+
for i in range(arrow_table.num_rows):
641+
should_delete = False
642+
meta_json = arrow_table['_meta_block_ranges'][i].as_py()
643+
644+
if meta_json:
645+
try:
646+
ranges_data = json.loads(meta_json)
647+
648+
# Ensure ranges_data is a list
649+
if not isinstance(ranges_data, list):
650+
continue
651+
652+
# Check each invalidation range
653+
for range_obj in invalidation_ranges:
654+
network = range_obj.network
655+
reorg_start = range_obj.start
656+
657+
# Check if any range for this network should be invalidated
658+
for range_info in ranges_data:
659+
if (
660+
isinstance(range_info, dict)
661+
and range_info.get('network') == network
662+
and range_info.get('end', 0) >= reorg_start
663+
):
664+
should_delete = True
665+
deleted_count += 1
666+
break
667+
668+
if should_delete:
669+
break
670+
671+
except (json.JSONDecodeError, KeyError):
672+
pass
673+
674+
if not should_delete:
675+
keep_indices.append(i)
676+
677+
if deleted_count == 0:
678+
self.logger.info(f"No rows to delete for reorg in table '{table_name}'")
679+
return
680+
681+
# Create new table with only kept rows
682+
if keep_indices:
683+
filtered_table = arrow_table.take(keep_indices)
684+
else:
685+
# All rows deleted - create empty table with same schema
686+
filtered_table = pa.table({col: [] for col in arrow_table.schema.names}, schema=arrow_table.schema)
687+
688+
# Overwrite the table with filtered data
689+
# This creates a new snapshot in Iceberg, preserving history
690+
iceberg_table.overwrite(filtered_table)
691+
692+
self.logger.info(
693+
f"Blockchain reorg deleted {deleted_count} rows from Iceberg table '{table_name}'. "
694+
f'New snapshot created with {filtered_table.num_rows} remaining rows.'
695+
)
696+
697+
except Exception as e:
698+
self.logger.error(f'Failed to perform reorg deletion: {str(e)}')
699+
raise

0 commit comments

Comments
 (0)