Skip to content

Commit 10bd4c3

Browse files
committed
deltalake loader: Add reorg aware streaming support
1 parent f164add commit 10bd4c3

File tree

2 files changed

+365
-0
lines changed

2 files changed

+365
-0
lines changed

src/amp/loaders/implementations/deltalake_loader.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# src/amp/loaders/implementations/deltalake_loader.py
22

3+
import json
34
import os
45
import time
56
from dataclasses import dataclass, field
@@ -19,6 +20,7 @@
1920
except ImportError:
2021
DELTALAKE_AVAILABLE = False
2122

23+
from ...streaming.types import BlockRange
2224
from ..base import DataLoader, LoadMode
2325

2426

@@ -649,3 +651,106 @@ def query_table(self, columns: Optional[List[str]] = None, limit: Optional[int]
649651
except Exception as e:
650652
self.logger.error(f'Query failed: {e}')
651653
raise
654+
655+
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
656+
"""
657+
Handle blockchain reorganization by deleting affected rows from Delta Lake.
658+
659+
Delta Lake's versioning and transaction capabilities make this operation
660+
particularly powerful - we can precisely delete affected data and even
661+
roll back if needed using time travel features.
662+
663+
Args:
664+
invalidation_ranges: List of block ranges to invalidate (reorg points)
665+
table_name: The table containing the data to invalidate (not used but kept for API consistency)
666+
"""
667+
if not invalidation_ranges:
668+
return
669+
670+
try:
671+
# First, ensure we have a connected table
672+
if not self._delta_table:
673+
self.logger.warning('No Delta table connected, skipping reorg handling')
674+
return
675+
676+
# Load the current table data
677+
current_table = self._delta_table.to_pyarrow_table()
678+
679+
# Check if the table has metadata column
680+
if '_meta_block_ranges' not in current_table.schema.names:
681+
self.logger.warning("Delta table doesn't have '_meta_block_ranges' column, skipping reorg handling")
682+
return
683+
684+
# Build a mask to identify rows to keep
685+
keep_mask = pa.array([True] * current_table.num_rows)
686+
687+
# Process each row to check if it should be invalidated
688+
meta_column = current_table['_meta_block_ranges']
689+
690+
for i in range(current_table.num_rows):
691+
meta_json = meta_column[i].as_py()
692+
693+
if meta_json:
694+
try:
695+
ranges_data = json.loads(meta_json)
696+
697+
# Ensure ranges_data is a list
698+
if not isinstance(ranges_data, list):
699+
continue
700+
701+
# Check each invalidation range
702+
for range_obj in invalidation_ranges:
703+
network = range_obj.network
704+
reorg_start = range_obj.start
705+
706+
# Check if any range for this network should be invalidated
707+
for range_info in ranges_data:
708+
if (
709+
isinstance(range_info, dict)
710+
and range_info.get('network') == network
711+
and range_info.get('end', 0) >= reorg_start
712+
):
713+
# Mark this row for deletion
714+
# Create a mask for this specific row
715+
row_mask = pa.array([j == i for j in range(current_table.num_rows)])
716+
keep_mask = pa.compute.and_(keep_mask, pa.compute.invert(row_mask))
717+
break
718+
719+
except (json.JSONDecodeError, KeyError):
720+
pass
721+
722+
# Filter the table to keep only valid rows
723+
filtered_table = current_table.filter(keep_mask)
724+
deleted_count = current_table.num_rows - filtered_table.num_rows
725+
726+
if deleted_count > 0:
727+
# Overwrite the table with filtered data
728+
# This creates a new version in Delta Lake, preserving history
729+
self.logger.info(
730+
f'Executing blockchain reorg deletion for {len(invalidation_ranges)} networks '
731+
f'in Delta Lake table. Deleting {deleted_count} rows.'
732+
)
733+
734+
# Use overwrite mode to replace table contents
735+
write_deltalake(
736+
table_or_uri=self.config.table_path,
737+
data=filtered_table,
738+
mode='overwrite',
739+
partition_by=self.config.partition_by,
740+
schema_mode='overwrite' if self.config.schema_evolution else None,
741+
storage_options=self.config.storage_options,
742+
)
743+
744+
# Refresh table reference
745+
self._refresh_table_reference()
746+
747+
self.logger.info(
748+
f'Blockchain reorg completed. Deleted {deleted_count} rows from Delta Lake. '
749+
f'New version: {self._delta_table.version() if self._delta_table else "unknown"}'
750+
)
751+
else:
752+
self.logger.info('No rows to delete for reorg in Delta Lake table')
753+
754+
except Exception as e:
755+
self.logger.error(f'Failed to handle blockchain reorg in Delta Lake: {str(e)}')
756+
raise

tests/integration/test_deltalake_loader.py

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,3 +543,263 @@ def test_concurrent_operations_safety(self, delta_basic_config, small_test_data)
543543
# Verify final data integrity
544544
final_data = loader.query_table()
545545
assert final_data.num_rows == 8 # 5 + 3 * 1
546+
547+
def test_handle_reorg_no_table(self, delta_basic_config):
548+
"""Test reorg handling when table doesn't exist"""
549+
from src.amp.streaming.types import BlockRange
550+
551+
loader = DeltaLakeLoader(delta_basic_config)
552+
553+
with loader:
554+
# Call handle reorg on non-existent table
555+
invalidation_ranges = [BlockRange(network='ethereum', start=100, end=200)]
556+
557+
# Should not raise any errors
558+
loader._handle_reorg(invalidation_ranges, 'test_reorg_empty')
559+
560+
def test_handle_reorg_no_metadata_column(self, delta_basic_config):
561+
"""Test reorg handling when table lacks metadata column"""
562+
from src.amp.streaming.types import BlockRange
563+
564+
loader = DeltaLakeLoader(delta_basic_config)
565+
566+
with loader:
567+
# Create table without metadata column
568+
data = pa.table(
569+
{
570+
'id': [1, 2, 3],
571+
'block_num': [100, 150, 200],
572+
'value': [10.0, 20.0, 30.0],
573+
'year': [2024, 2024, 2024],
574+
'month': [1, 1, 1],
575+
}
576+
)
577+
loader.load_table(data, 'test_reorg_no_meta', mode=LoadMode.OVERWRITE)
578+
579+
# Call handle reorg
580+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=250)]
581+
582+
# Should log warning and not modify data
583+
loader._handle_reorg(invalidation_ranges, 'test_reorg_no_meta')
584+
585+
# Verify data unchanged
586+
remaining_data = loader.query_table()
587+
assert remaining_data.num_rows == 3
588+
589+
def test_handle_reorg_single_network(self, delta_basic_config):
590+
"""Test reorg handling for single network data"""
591+
from src.amp.streaming.types import BlockRange
592+
593+
loader = DeltaLakeLoader(delta_basic_config)
594+
595+
with loader:
596+
# Create table with metadata
597+
block_ranges = [
598+
[{'network': 'ethereum', 'start': 100, 'end': 110}],
599+
[{'network': 'ethereum', 'start': 150, 'end': 160}],
600+
[{'network': 'ethereum', 'start': 200, 'end': 210}],
601+
]
602+
603+
data = pa.table(
604+
{
605+
'id': [1, 2, 3],
606+
'block_num': [105, 155, 205],
607+
'_meta_block_ranges': [json.dumps(ranges) for ranges in block_ranges],
608+
'year': [2024, 2024, 2024],
609+
'month': [1, 1, 1],
610+
}
611+
)
612+
613+
# Load initial data
614+
result = loader.load_table(data, 'test_reorg_single', mode=LoadMode.OVERWRITE)
615+
assert result.success
616+
assert result.rows_loaded == 3
617+
618+
# Verify all data exists
619+
initial_data = loader.query_table()
620+
assert initial_data.num_rows == 3
621+
622+
# Reorg from block 155 - should delete rows 2 and 3
623+
invalidation_ranges = [BlockRange(network='ethereum', start=155, end=300)]
624+
loader._handle_reorg(invalidation_ranges, 'test_reorg_single')
625+
626+
# Verify only first row remains
627+
remaining_data = loader.query_table()
628+
assert remaining_data.num_rows == 1
629+
assert remaining_data['id'][0].as_py() == 1
630+
631+
def test_handle_reorg_multi_network(self, delta_basic_config):
632+
"""Test reorg handling preserves data from unaffected networks"""
633+
from src.amp.streaming.types import BlockRange
634+
635+
loader = DeltaLakeLoader(delta_basic_config)
636+
637+
with loader:
638+
# Create data from multiple networks
639+
block_ranges = [
640+
[{'network': 'ethereum', 'start': 100, 'end': 110}],
641+
[{'network': 'polygon', 'start': 100, 'end': 110}],
642+
[{'network': 'ethereum', 'start': 150, 'end': 160}],
643+
[{'network': 'polygon', 'start': 150, 'end': 160}],
644+
]
645+
646+
data = pa.table(
647+
{
648+
'id': [1, 2, 3, 4],
649+
'network': ['ethereum', 'polygon', 'ethereum', 'polygon'],
650+
'_meta_block_ranges': [json.dumps(r) for r in block_ranges],
651+
'year': [2024, 2024, 2024, 2024],
652+
'month': [1, 1, 1, 1],
653+
}
654+
)
655+
656+
# Load initial data
657+
result = loader.load_table(data, 'test_reorg_multi', mode=LoadMode.OVERWRITE)
658+
assert result.success
659+
assert result.rows_loaded == 4
660+
661+
# Reorg only ethereum from block 150
662+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=200)]
663+
loader._handle_reorg(invalidation_ranges, 'test_reorg_multi')
664+
665+
# Verify ethereum row 3 deleted, but polygon rows preserved
666+
remaining_data = loader.query_table()
667+
assert remaining_data.num_rows == 3
668+
remaining_ids = sorted([id.as_py() for id in remaining_data['id']])
669+
assert remaining_ids == [1, 2, 4] # Row 3 deleted
670+
671+
def test_handle_reorg_overlapping_ranges(self, delta_basic_config):
672+
"""Test reorg with overlapping block ranges"""
673+
from src.amp.streaming.types import BlockRange
674+
675+
loader = DeltaLakeLoader(delta_basic_config)
676+
677+
with loader:
678+
# Create data with overlapping ranges
679+
block_ranges = [
680+
[{'network': 'ethereum', 'start': 90, 'end': 110}], # Overlaps with reorg
681+
[{'network': 'ethereum', 'start': 140, 'end': 160}], # Overlaps with reorg
682+
[{'network': 'ethereum', 'start': 170, 'end': 190}], # After reorg
683+
]
684+
685+
data = pa.table(
686+
{
687+
'id': [1, 2, 3],
688+
'_meta_block_ranges': [json.dumps(ranges) for ranges in block_ranges],
689+
'year': [2024, 2024, 2024],
690+
'month': [1, 1, 1],
691+
}
692+
)
693+
694+
# Load initial data
695+
result = loader.load_table(data, 'test_reorg_overlap', mode=LoadMode.OVERWRITE)
696+
assert result.success
697+
assert result.rows_loaded == 3
698+
699+
# Reorg from block 150 - should delete rows where end >= 150
700+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=200)]
701+
loader._handle_reorg(invalidation_ranges, 'test_reorg_overlap')
702+
703+
# Only first row should remain (ends at 110 < 150)
704+
remaining_data = loader.query_table()
705+
assert remaining_data.num_rows == 1
706+
assert remaining_data['id'][0].as_py() == 1
707+
708+
def test_handle_reorg_version_history(self, delta_basic_config):
709+
"""Test that reorg creates proper version history in Delta Lake"""
710+
from src.amp.streaming.types import BlockRange
711+
712+
loader = DeltaLakeLoader(delta_basic_config)
713+
714+
with loader:
715+
# Create initial data
716+
data = pa.table(
717+
{
718+
'id': [1, 2, 3],
719+
'_meta_block_ranges': [
720+
json.dumps([{'network': 'ethereum', 'start': i * 50, 'end': i * 50 + 10}]) for i in range(3)
721+
],
722+
'year': [2024, 2024, 2024],
723+
'month': [1, 1, 1],
724+
}
725+
)
726+
727+
# Load initial data
728+
loader.load_table(data, 'test_reorg_history', mode=LoadMode.OVERWRITE)
729+
initial_version = loader._delta_table.version()
730+
731+
# Perform reorg
732+
invalidation_ranges = [BlockRange(network='ethereum', start=50, end=200)]
733+
loader._handle_reorg(invalidation_ranges, 'test_reorg_history')
734+
735+
# Check that version increased
736+
final_version = loader._delta_table.version()
737+
assert final_version > initial_version
738+
739+
# Check history
740+
history = loader.get_table_history(limit=5)
741+
assert len(history) >= 2
742+
# Latest operation should be an overwrite (from reorg)
743+
assert history[0]['operation'] == 'WRITE'
744+
745+
def test_streaming_with_reorg(self, delta_temp_config):
746+
"""Test streaming data with reorg support"""
747+
from src.amp.streaming.types import (
748+
BatchMetadata,
749+
BlockRange,
750+
ResponseBatch,
751+
ResponseBatchType,
752+
ResponseBatchWithReorg,
753+
)
754+
755+
loader = DeltaLakeLoader(delta_temp_config)
756+
757+
with loader:
758+
# Create streaming data with metadata
759+
data1 = pa.RecordBatch.from_pydict(
760+
{'id': [1, 2], 'value': [100, 200], 'year': [2024, 2024], 'month': [1, 1]}
761+
)
762+
763+
data2 = pa.RecordBatch.from_pydict(
764+
{'id': [3, 4], 'value': [300, 400], 'year': [2024, 2024], 'month': [1, 1]}
765+
)
766+
767+
# Create response batches
768+
response1 = ResponseBatchWithReorg(
769+
batch_type=ResponseBatchType.DATA,
770+
data=ResponseBatch(
771+
data=data1, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110)])
772+
),
773+
)
774+
775+
response2 = ResponseBatchWithReorg(
776+
batch_type=ResponseBatchType.DATA,
777+
data=ResponseBatch(
778+
data=data2, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160)])
779+
),
780+
)
781+
782+
# Simulate reorg event
783+
reorg_response = ResponseBatchWithReorg(
784+
batch_type=ResponseBatchType.REORG,
785+
invalidation_ranges=[BlockRange(network='ethereum', start=150, end=200)],
786+
)
787+
788+
# Process streaming data
789+
stream = [response1, response2, reorg_response]
790+
results = list(loader.load_stream_continuous(iter(stream), 'test_streaming_reorg'))
791+
792+
# Verify results
793+
assert len(results) == 3
794+
assert results[0].success
795+
assert results[0].rows_loaded == 2
796+
assert results[1].success
797+
assert results[1].rows_loaded == 2
798+
assert results[2].success
799+
assert results[2].is_reorg
800+
801+
# Verify reorg deleted the second batch
802+
final_data = loader.query_table()
803+
assert final_data.num_rows == 2
804+
remaining_ids = sorted([id.as_py() for id in final_data['id']])
805+
assert remaining_ids == [1, 2] # 3 and 4 deleted by reorg

0 commit comments

Comments
 (0)