Skip to content

Commit e6ba334

Browse files
committed
lmdb loader: Add reorg aware streaming support
1 parent 10bd4c3 commit e6ba334

File tree

2 files changed

+326
-0
lines changed

2 files changed

+326
-0
lines changed

src/amp/loaders/implementations/lmdb_loader.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# amp/loaders/implementations/lmdb_loader.py
22

33
import hashlib
4+
import json
45
from dataclasses import dataclass
56
from pathlib import Path
67
from typing import Any, Dict, List, Optional
78

89
import lmdb
910
import pyarrow as pa
1011

12+
from ...streaming.types import BlockRange
1113
from ..base import DataLoader, LoadMode
1214

1315

@@ -347,3 +349,97 @@ def get_table_info(self, table_name: str) -> Optional[Dict[str, Any]]:
347349
except Exception as e:
348350
self.logger.error(f'Failed to get table info: {e}')
349351
return None
352+
353+
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
354+
"""
355+
Handle blockchain reorganization by deleting affected entries from LMDB.
356+
357+
LMDB's key-value architecture requires iterating through entries to find
358+
and delete affected data based on the metadata stored in each value.
359+
360+
Args:
361+
invalidation_ranges: List of block ranges to invalidate (reorg points)
362+
table_name: The table containing the data to invalidate
363+
"""
364+
if not invalidation_ranges:
365+
return
366+
367+
try:
368+
db = self._get_or_create_db(self.config.database_name)
369+
deleted_count = 0
370+
371+
with self.env.begin(write=True, db=db) as txn:
372+
cursor = txn.cursor()
373+
keys_to_delete = []
374+
375+
# First pass: identify keys to delete
376+
if cursor.first():
377+
while True:
378+
key = cursor.key()
379+
value = cursor.value()
380+
381+
# Deserialize the Arrow batch to check metadata
382+
try:
383+
# Read the serialized Arrow batch
384+
reader = pa.ipc.open_stream(value)
385+
batch = reader.read_next_batch()
386+
387+
# Check if this batch has metadata column
388+
if '_meta_block_ranges' in batch.schema.names:
389+
# Get the metadata (should be a single row)
390+
meta_idx = batch.schema.get_field_index('_meta_block_ranges')
391+
meta_json = batch.column(meta_idx)[0].as_py()
392+
393+
if meta_json:
394+
try:
395+
ranges_data = json.loads(meta_json)
396+
397+
# Ensure ranges_data is a list
398+
if not isinstance(ranges_data, list):
399+
continue
400+
401+
# Check each invalidation range
402+
for range_obj in invalidation_ranges:
403+
network = range_obj.network
404+
reorg_start = range_obj.start
405+
406+
# Check if any range for this network should be invalidated
407+
for range_info in ranges_data:
408+
if (
409+
isinstance(range_info, dict)
410+
and range_info.get('network') == network
411+
and range_info.get('end', 0) >= reorg_start
412+
):
413+
keys_to_delete.append(key)
414+
deleted_count += 1
415+
break
416+
417+
if key in keys_to_delete:
418+
break
419+
420+
except (json.JSONDecodeError, KeyError):
421+
pass
422+
423+
except Exception as e:
424+
self.logger.debug(f'Failed to deserialize entry: {e}')
425+
426+
if not cursor.next():
427+
break
428+
429+
# Second pass: delete identified keys
430+
for key in keys_to_delete:
431+
txn.delete(key)
432+
433+
if deleted_count > 0:
434+
self.logger.info(
435+
f'Blockchain reorg deleted {deleted_count} entries from LMDB '
436+
f"(database: '{self.config.database_name or 'main'}')"
437+
)
438+
else:
439+
self.logger.info(
440+
f"No entries to delete for reorg in LMDB (database: '{self.config.database_name or 'main'}')"
441+
)
442+
443+
except Exception as e:
444+
self.logger.error(f'Failed to handle blockchain reorg in LMDB: {str(e)}')
445+
raise

tests/integration/test_lmdb_loader.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,236 @@ def test_data_persistence(self, lmdb_config, sample_test_data, test_table_name):
354354

355355
loader2.disconnect()
356356

357+
def test_handle_reorg_empty_db(self, lmdb_config):
358+
"""Test reorg handling on empty database"""
359+
from src.amp.streaming.types import BlockRange
360+
361+
loader = LMDBLoader(lmdb_config)
362+
loader.connect()
363+
364+
# Call handle reorg on empty database
365+
invalidation_ranges = [BlockRange(network='ethereum', start=100, end=200)]
366+
367+
# Should not raise any errors
368+
loader._handle_reorg(invalidation_ranges, 'test_reorg_empty')
369+
370+
loader.disconnect()
371+
372+
def test_handle_reorg_no_metadata(self, lmdb_config):
373+
"""Test reorg handling when data lacks metadata column"""
374+
from src.amp.streaming.types import BlockRange
375+
376+
config = {**lmdb_config, 'key_column': 'id'}
377+
loader = LMDBLoader(config)
378+
loader.connect()
379+
380+
# Create data without metadata column
381+
data = pa.table({'id': [1, 2, 3], 'block_num': [100, 150, 200], 'value': [10.0, 20.0, 30.0]})
382+
loader.load_table(data, 'test_reorg_no_meta', mode=LoadMode.OVERWRITE)
383+
384+
# Call handle reorg
385+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=250)]
386+
387+
# Should not delete any data (no metadata to check)
388+
loader._handle_reorg(invalidation_ranges, 'test_reorg_no_meta')
389+
390+
# Verify data still exists
391+
with loader.env.begin() as txn:
392+
assert txn.get(b'1') is not None
393+
assert txn.get(b'2') is not None
394+
assert txn.get(b'3') is not None
395+
396+
loader.disconnect()
397+
398+
def test_handle_reorg_single_network(self, lmdb_config):
399+
"""Test reorg handling for single network data"""
400+
import json
401+
402+
from src.amp.streaming.types import BlockRange
403+
404+
config = {**lmdb_config, 'key_column': 'id'}
405+
loader = LMDBLoader(config)
406+
loader.connect()
407+
408+
# Create table with metadata
409+
block_ranges = [
410+
[{'network': 'ethereum', 'start': 100, 'end': 110}],
411+
[{'network': 'ethereum', 'start': 150, 'end': 160}],
412+
[{'network': 'ethereum', 'start': 200, 'end': 210}],
413+
]
414+
415+
data = pa.table(
416+
{
417+
'id': [1, 2, 3],
418+
'block_num': [105, 155, 205],
419+
'_meta_block_ranges': [json.dumps(ranges) for ranges in block_ranges],
420+
}
421+
)
422+
423+
# Load initial data
424+
result = loader.load_table(data, 'test_reorg_single', mode=LoadMode.OVERWRITE)
425+
assert result.success
426+
assert result.rows_loaded == 3
427+
428+
# Verify all data exists
429+
with loader.env.begin() as txn:
430+
assert txn.get(b'1') is not None
431+
assert txn.get(b'2') is not None
432+
assert txn.get(b'3') is not None
433+
434+
# Reorg from block 155 - should delete rows 2 and 3
435+
invalidation_ranges = [BlockRange(network='ethereum', start=155, end=300)]
436+
loader._handle_reorg(invalidation_ranges, 'test_reorg_single')
437+
438+
# Verify only first row remains
439+
with loader.env.begin() as txn:
440+
assert txn.get(b'1') is not None
441+
assert txn.get(b'2') is None # Deleted
442+
assert txn.get(b'3') is None # Deleted
443+
444+
loader.disconnect()
445+
446+
def test_handle_reorg_multi_network(self, lmdb_config):
447+
"""Test reorg handling preserves data from unaffected networks"""
448+
import json
449+
450+
from src.amp.streaming.types import BlockRange
451+
452+
config = {**lmdb_config, 'key_column': 'id'}
453+
loader = LMDBLoader(config)
454+
loader.connect()
455+
456+
# Create data from multiple networks
457+
block_ranges = [
458+
[{'network': 'ethereum', 'start': 100, 'end': 110}],
459+
[{'network': 'polygon', 'start': 100, 'end': 110}],
460+
[{'network': 'ethereum', 'start': 150, 'end': 160}],
461+
[{'network': 'polygon', 'start': 150, 'end': 160}],
462+
]
463+
464+
data = pa.table(
465+
{
466+
'id': [1, 2, 3, 4],
467+
'network': ['ethereum', 'polygon', 'ethereum', 'polygon'],
468+
'_meta_block_ranges': [json.dumps(r) for r in block_ranges],
469+
}
470+
)
471+
472+
# Load initial data
473+
result = loader.load_table(data, 'test_reorg_multi', mode=LoadMode.OVERWRITE)
474+
assert result.success
475+
assert result.rows_loaded == 4
476+
477+
# Reorg only ethereum from block 150
478+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=200)]
479+
loader._handle_reorg(invalidation_ranges, 'test_reorg_multi')
480+
481+
# Verify ethereum row 3 deleted, but polygon rows preserved
482+
with loader.env.begin() as txn:
483+
assert txn.get(b'1') is not None # ethereum block 100
484+
assert txn.get(b'2') is not None # polygon block 100
485+
assert txn.get(b'3') is None # ethereum block 150 (deleted)
486+
assert txn.get(b'4') is not None # polygon block 150
487+
488+
loader.disconnect()
489+
490+
def test_handle_reorg_overlapping_ranges(self, lmdb_config):
491+
"""Test reorg with overlapping block ranges"""
492+
import json
493+
494+
from src.amp.streaming.types import BlockRange
495+
496+
config = {**lmdb_config, 'key_column': 'id'}
497+
loader = LMDBLoader(config)
498+
loader.connect()
499+
500+
# Create data with overlapping ranges
501+
block_ranges = [
502+
[{'network': 'ethereum', 'start': 90, 'end': 110}], # Overlaps with reorg
503+
[{'network': 'ethereum', 'start': 140, 'end': 160}], # Overlaps with reorg
504+
[{'network': 'ethereum', 'start': 170, 'end': 190}], # After reorg
505+
]
506+
507+
data = pa.table({'id': [1, 2, 3], '_meta_block_ranges': [json.dumps(ranges) for ranges in block_ranges]})
508+
509+
# Load initial data
510+
result = loader.load_table(data, 'test_reorg_overlap', mode=LoadMode.OVERWRITE)
511+
assert result.success
512+
assert result.rows_loaded == 3
513+
514+
# Reorg from block 150 - should delete rows where end >= 150
515+
invalidation_ranges = [BlockRange(network='ethereum', start=150, end=200)]
516+
loader._handle_reorg(invalidation_ranges, 'test_reorg_overlap')
517+
518+
# Only first row should remain (ends at 110 < 150)
519+
with loader.env.begin() as txn:
520+
assert txn.get(b'1') is not None
521+
assert txn.get(b'2') is None # Deleted (end=160 >= 150)
522+
assert txn.get(b'3') is None # Deleted (end=190 >= 150)
523+
524+
loader.disconnect()
525+
526+
def test_streaming_with_reorg(self, lmdb_config):
527+
"""Test streaming data with reorg support"""
528+
from src.amp.streaming.types import (
529+
BatchMetadata,
530+
BlockRange,
531+
ResponseBatch,
532+
ResponseBatchType,
533+
ResponseBatchWithReorg,
534+
)
535+
536+
config = {**lmdb_config, 'key_column': 'id'}
537+
loader = LMDBLoader(config)
538+
loader.connect()
539+
540+
# Create streaming data with metadata
541+
data1 = pa.RecordBatch.from_pydict({'id': [1, 2], 'value': [100, 200]})
542+
543+
data2 = pa.RecordBatch.from_pydict({'id': [3, 4], 'value': [300, 400]})
544+
545+
# Create response batches
546+
response1 = ResponseBatchWithReorg(
547+
batch_type=ResponseBatchType.DATA,
548+
data=ResponseBatch(
549+
data=data1, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110)])
550+
),
551+
)
552+
553+
response2 = ResponseBatchWithReorg(
554+
batch_type=ResponseBatchType.DATA,
555+
data=ResponseBatch(
556+
data=data2, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160)])
557+
),
558+
)
559+
560+
# Simulate reorg event
561+
reorg_response = ResponseBatchWithReorg(
562+
batch_type=ResponseBatchType.REORG, invalidation_ranges=[BlockRange(network='ethereum', start=150, end=200)]
563+
)
564+
565+
# Process streaming data
566+
stream = [response1, response2, reorg_response]
567+
results = list(loader.load_stream_continuous(iter(stream), 'test_streaming_reorg'))
568+
569+
# Verify results
570+
assert len(results) == 3
571+
assert results[0].success
572+
assert results[0].rows_loaded == 2
573+
assert results[1].success
574+
assert results[1].rows_loaded == 2
575+
assert results[2].success
576+
assert results[2].is_reorg
577+
578+
# Verify reorg deleted the second batch
579+
with loader.env.begin() as txn:
580+
assert txn.get(b'1') is not None
581+
assert txn.get(b'2') is not None
582+
assert txn.get(b'3') is None # Deleted by reorg
583+
assert txn.get(b'4') is None # Deleted by reorg
584+
585+
loader.disconnect()
586+
357587

358588
if __name__ == '__main__':
359589
pytest.main([__file__, '-v'])

0 commit comments

Comments
 (0)