Skip to content

Commit 7620102

Browse files
committed
redis loader: Add reorg aware streaming support
1 parent 8523877 commit 7620102

File tree

2 files changed

+457
-8
lines changed

2 files changed

+457
-8
lines changed

src/amp/loaders/implementations/redis_loader.py

Lines changed: 162 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import pyarrow as pa
1111
import redis
1212

13+
from ...streaming.types import BlockRange
1314
from ..base import DataLoader, LoadMode
1415

1516

@@ -79,6 +80,14 @@ class RedisLoader(DataLoader[RedisConfig]):
7980
- Comprehensive error handling
8081
- Connection pooling
8182
- Binary data support
83+
84+
Important Notes:
85+
- For key-based data structures (hash, string, json) with {id} in key_pattern,
86+
an 'id' field is REQUIRED in the data to ensure collision-proof keys across
87+
job restarts. Without explicit IDs, keys could be overwritten when the job
88+
is restarted.
89+
- For streaming/reorg support, 'id' fields must be non-null to maintain
90+
consistent secondary indexes.
8291
"""
8392

8493
# Declare loader capabilities
@@ -235,6 +244,9 @@ def _load_as_hashes_optimized(self, data_dict: Dict[str, List], num_rows: int, t
235244
pipe = self.redis_client.pipeline()
236245
commands_in_pipe = 0
237246

247+
# Maintain secondary indexes for streaming data (if metadata present)
248+
self._maintain_block_range_indexes(data_dict, num_rows, table_name, pipe)
249+
238250
# Execute remaining commands
239251
if commands_in_pipe > 0:
240252
pipe.execute()
@@ -276,6 +288,9 @@ def _load_as_strings_optimized(self, data_dict: Dict[str, List], num_rows: int,
276288
pipe = self.redis_client.pipeline()
277289
commands_in_pipe = 0
278290

291+
# Maintain secondary indexes for streaming data (if metadata present)
292+
self._maintain_block_range_indexes(data_dict, num_rows, table_name, pipe)
293+
279294
# Execute remaining commands
280295
if commands_in_pipe > 0:
281296
pipe.execute()
@@ -501,18 +516,23 @@ def _generate_key_optimized(self, data_dict: Dict[str, List], row_index: int, ta
501516

502517
# Handle remaining {id} placeholder
503518
if '{id}' in key:
504-
if 'id' in data_dict:
505-
id_value = data_dict['id'][row_index]
506-
key = key.replace('{id}', str(id_value) if id_value is not None else str(row_index))
507-
else:
508-
key = key.replace('{id}', str(row_index))
519+
if 'id' not in data_dict:
520+
raise ValueError(
521+
f"Key pattern contains {{id}} placeholder but no 'id' field found in data. "
522+
f'Available fields: {list(data_dict.keys())}. '
523+
f"Please provide an 'id' field or use a different key pattern."
524+
)
525+
id_value = data_dict['id'][row_index]
526+
if id_value is None:
527+
raise ValueError(f'ID value is None at row {row_index}. Redis keys require non-null IDs.')
528+
key = key.replace('{id}', str(id_value))
509529

510530
return key
511531

512532
except Exception as e:
513-
# Fallback to simple key generation
514-
self.logger.warning(f'Key generation failed, using fallback: {e}')
515-
return f'{table_name}:{row_index}'
533+
# Re-raise to fail fast rather than silently using fallback
534+
self.logger.error(f'Key generation failed: {e}')
535+
raise
516536

517537
def _clear_data(self, table_name: str) -> None:
518538
"""Optimized data clearing for overwrite mode"""
@@ -676,3 +696,137 @@ def _get_loader_table_metadata(
676696
"""Get Redis-specific metadata for table operation"""
677697
metadata = {'data_structure': self.data_structure.value}
678698
return metadata
699+
700+
def _maintain_block_range_indexes(self, data_dict: Dict[str, List], num_rows: int, table_name: str, pipe) -> None:
701+
"""
702+
Maintain secondary indexes for efficient block range lookups.
703+
704+
Creates index entries of the form:
705+
block_index:{table}:{network}:{start}-{end} -> SET of primary key IDs
706+
"""
707+
# Check if this data has block range metadata
708+
if '_meta_block_ranges' not in data_dict:
709+
return
710+
711+
for i in range(num_rows):
712+
# Get the primary key for this row
713+
primary_key_id = self._extract_primary_key_id(data_dict, i, table_name)
714+
715+
# Parse block ranges from JSON metadata
716+
ranges_json = data_dict['_meta_block_ranges'][i]
717+
if ranges_json:
718+
try:
719+
ranges_data = json.loads(ranges_json)
720+
for range_info in ranges_data:
721+
network = range_info['network']
722+
start = range_info['start']
723+
end = range_info['end']
724+
725+
# Create index key
726+
index_key = f'block_index:{table_name}:{network}:{start}-{end}'
727+
728+
# Add primary key to the index set
729+
pipe.sadd(index_key, primary_key_id)
730+
731+
# Set TTL on index if configured
732+
if self.config.ttl:
733+
pipe.expire(index_key, self.config.ttl)
734+
735+
except (json.JSONDecodeError, KeyError) as e:
736+
self.logger.warning(f'Failed to parse block ranges for indexing: {e}')
737+
738+
def _extract_primary_key_id(self, data_dict: Dict[str, List], row_index: int, table_name: str) -> str:
739+
"""
740+
Extract a primary key identifier from the row data for use in secondary indexes.
741+
This should match the primary key used in the actual data storage.
742+
"""
743+
# Require 'id' field for consistent key generation
744+
if 'id' not in data_dict:
745+
# This should have been caught by _generate_key_optimized already
746+
# but double-check here for secondary index consistency
747+
raise ValueError(
748+
f"Secondary indexes require an 'id' field in the data. Available fields: {list(data_dict.keys())}"
749+
)
750+
751+
id_value = data_dict['id'][row_index]
752+
if id_value is None:
753+
raise ValueError(f'ID value is None at row {row_index}. Redis secondary indexes require non-null IDs.')
754+
755+
return str(id_value)
756+
757+
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
758+
"""
759+
Handle blockchain reorganization by efficiently deleting affected data using secondary indexes.
760+
761+
Uses the block range indexes to quickly find and delete all data that overlaps
762+
with the invalidation ranges, supporting multi-network scenarios.
763+
"""
764+
if not invalidation_ranges:
765+
return
766+
767+
try:
768+
pipe = self.redis_client.pipeline()
769+
total_deleted = 0
770+
771+
for invalidation_range in invalidation_ranges:
772+
network = invalidation_range.network
773+
reorg_start = invalidation_range.start
774+
775+
# Find all index keys for this network
776+
index_pattern = f'block_index:{table_name}:{network}:*'
777+
778+
for index_key in self.redis_client.scan_iter(match=index_pattern, count=1000):
779+
# Parse the range from the index key
780+
# Format: block_index:{table}:{network}:{start}-{end}
781+
try:
782+
key_parts = index_key.decode('utf-8').split(':')
783+
range_part = key_parts[-1] # "{start}-{end}"
784+
_start_str, end_str = range_part.split('-')
785+
range_end = int(end_str)
786+
787+
# Check if this range should be invalidated
788+
# In blockchain reorgs: if reorg starts at block N, delete all data where range_end >= N
789+
if range_end >= reorg_start:
790+
# Get all affected primary keys from this index
791+
affected_keys = self.redis_client.smembers(index_key)
792+
793+
# Delete the primary data keys
794+
for key_id in affected_keys:
795+
key_id_str = key_id.decode('utf-8') if isinstance(key_id, bytes) else str(key_id)
796+
primary_key = self._construct_primary_key(key_id_str, table_name)
797+
pipe.delete(primary_key)
798+
total_deleted += 1
799+
800+
# Delete the index entry itself
801+
pipe.delete(index_key)
802+
803+
except (ValueError, IndexError) as e:
804+
self.logger.warning(f'Failed to parse index key {index_key}: {e}')
805+
continue
806+
807+
# Execute all deletions
808+
if total_deleted > 0:
809+
pipe.execute()
810+
self.logger.info(f"Blockchain reorg deleted {total_deleted} keys from table '{table_name}'")
811+
else:
812+
self.logger.info(f"No data to delete for reorg in table '{table_name}'")
813+
814+
except Exception as e:
815+
self.logger.error(f"Failed to handle blockchain reorg for table '{table_name}': {str(e)}")
816+
raise
817+
818+
def _construct_primary_key(self, key_id: str, table_name: str) -> str:
819+
"""
820+
Construct the actual primary data key from the key ID used in indexes.
821+
This should match the key generation logic used in data storage.
822+
"""
823+
# Use the same pattern as the original key generation
824+
# For most cases, this will be {table}:{id}
825+
base_pattern = self.config.key_pattern.replace('{table}', table_name)
826+
827+
# Replace {id} with the actual key_id
828+
if '{id}' in base_pattern:
829+
return base_pattern.replace('{id}', key_id)
830+
else:
831+
# Fallback for custom patterns - use table:id format
832+
return f'{table_name}:{key_id}'

0 commit comments

Comments
 (0)