Skip to content

Commit 4ebc2ca

Browse files
committed
k8s updates and add the app we want to run
1 parent b86f23b commit 4ebc2ca

File tree

2 files changed

+233
-1
lines changed

2 files changed

+233
-1
lines changed
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Real-world test: Load ERC20 transfers into Snowflake with token labels using parallel streaming.
4+
5+
This test demonstrates the CSV label joining feature by enriching ERC20 transfer data
6+
with token metadata (symbol, name, decimals) from a CSV file.
7+
8+
Usage:
9+
python apps/test_erc20_labeled_parallel.py [--blocks BLOCKS] [--workers WORKERS]
10+
11+
Example:
12+
python apps/test_erc20_labeled_parallel.py --blocks 100000 --workers 4
13+
"""
14+
15+
import argparse
16+
import os
17+
import time
18+
from datetime import datetime
19+
from pathlib import Path
20+
21+
from amp.client import Client
22+
from amp.streaming.parallel import ParallelConfig
23+
24+
25+
def get_recent_block_range(client: Client, num_blocks: int = 100_000):
26+
"""Query amp server to get recent block range."""
27+
print(f'\n🔍 Detecting recent block range ({num_blocks:,} blocks)...')
28+
29+
query = 'SELECT MAX(block_num) as max_block FROM eth_firehose.logs'
30+
result = client.get_sql(query, read_all=True)
31+
32+
if result.num_rows == 0:
33+
raise RuntimeError('No data found in eth_firehose.logs')
34+
35+
max_block = result.column('max_block')[0].as_py()
36+
if max_block is None:
37+
raise RuntimeError('No blocks found in eth_firehose.logs')
38+
39+
min_block = max(0, max_block - num_blocks)
40+
41+
print(f'✅ Block range: {min_block:,} to {max_block:,} ({max_block - min_block:,} blocks)')
42+
return min_block, max_block
43+
44+
45+
def load_erc20_transfers_with_labels(
46+
num_blocks: int = 100_000, num_workers: int = 4, flush_interval: float = 1.0
47+
):
48+
"""Load ERC20 transfers with token labels using Snowpipe Streaming and parallel streaming."""
49+
50+
# Initialize client
51+
server_url = os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80')
52+
client = Client(server_url)
53+
print(f'📡 Connected to amp server: {server_url}')
54+
55+
# Configure token metadata labels
56+
project_root = Path(__file__).parent.parent
57+
token_csv_path = project_root / 'data' / 'eth_mainnet_token_metadata.csv'
58+
59+
if not token_csv_path.exists():
60+
raise FileNotFoundError(
61+
f'Token metadata CSV not found at {token_csv_path}. Please ensure the file exists in the data directory.'
62+
)
63+
64+
print(f'\n🏷️ Configuring token metadata labels from: {token_csv_path}')
65+
client.configure_label('token_metadata', str(token_csv_path))
66+
print(f'✅ Loaded token labels: {len(client.label_manager.get_label("token_metadata"))} tokens')
67+
68+
# Get recent block range
69+
min_block, max_block = get_recent_block_range(client, num_blocks)
70+
71+
# Generate unique table name
72+
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
73+
table_name = f'erc20_labeled_{timestamp}'
74+
print(f'\n📊 Target table: {table_name}')
75+
print('🌊 Using Snowpipe Streaming with label joining')
76+
77+
# ERC20 Transfer event signature
78+
transfer_sig = 'Transfer(address indexed from, address indexed to, uint256 value)'
79+
80+
# ERC20 transfer query - decode from raw logs and include token address
81+
# The address is binary, but our join logic will auto-convert to match CSV hex strings
82+
erc20_query = f"""
83+
select
84+
pc.block_num,
85+
pc.block_hash,
86+
pc.timestamp,
87+
pc.tx_hash,
88+
pc.tx_index,
89+
pc.log_index,
90+
pc.address as token_address,
91+
pc.dec['from'] as from_address,
92+
pc.dec['to'] as to_address,
93+
pc.dec['value'] as value
94+
from (
95+
select
96+
l.block_num,
97+
l.block_hash,
98+
l.tx_hash,
99+
l.tx_index,
100+
l.log_index,
101+
l.timestamp,
102+
l.address,
103+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{transfer_sig}') as dec
104+
from eth_firehose.logs l
105+
where
106+
l.topic0 = evm_topic('{transfer_sig}') and
107+
l.topic3 IS NULL) pc
108+
"""
109+
110+
# Configure Snowflake connection with Snowpipe Streaming
111+
snowflake_config = {
112+
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
113+
'user': os.getenv('SNOWFLAKE_USER'),
114+
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
115+
'database': os.getenv('SNOWFLAKE_DATABASE'),
116+
'private_key': os.getenv('SNOWFLAKE_PRIVATE_KEY'),
117+
'loading_method': 'snowpipe_streaming', # Use Snowpipe Streaming
118+
'pool_size': num_workers + 2, # Set pool size to match workers + buffer
119+
'streaming_buffer_flush_interval': int(flush_interval), # Buffer flush interval in seconds
120+
}
121+
122+
client.configure_connection(name='snowflake_snowpipe_labeled', loader='snowflake', config=snowflake_config)
123+
124+
# Configure parallel execution
125+
parallel_config = ParallelConfig(
126+
num_workers=num_workers,
127+
table_name='eth_firehose.logs',
128+
min_block=min_block,
129+
max_block=max_block,
130+
block_column='block_num',
131+
)
132+
133+
print(f'\n🚀 Starting parallel Snowpipe Streaming load with {num_workers} workers...')
134+
print('🏷️ Joining with token labels on token_address column')
135+
print(' Only transfers from tokens in the metadata CSV will be loaded (inner join)\n')
136+
137+
start_time = time.time()
138+
139+
# Load data in parallel with label joining
140+
results = list(
141+
client.sql(erc20_query).load(
142+
connection='snowflake_snowpipe_labeled',
143+
destination=table_name,
144+
stream=True,
145+
parallel_config=parallel_config,
146+
# Label joining parameters
147+
label='token_metadata',
148+
label_key_column='token_address', # Key in CSV
149+
stream_key_column='token_address', # Key in streaming data
150+
)
151+
)
152+
153+
duration = time.time() - start_time
154+
155+
# Calculate statistics
156+
total_rows = sum(r.rows_loaded for r in results if r.success)
157+
failures = [r for r in results if not r.success]
158+
rows_per_sec = total_rows / duration if duration > 0 else 0
159+
partitions = [r for r in results if 'partition_id' in r.metadata]
160+
successful_workers = len(set(r.metadata.get('partition_id') for r in partitions))
161+
failed_count = len(failures)
162+
163+
# Print results
164+
print(f'\n{"=" * 70}')
165+
if failures:
166+
print(f'⚠️ ERC20 Labeled Load Complete (with {failed_count} failures)')
167+
else:
168+
print('🎉 ERC20 Labeled Load Complete!')
169+
print(f'{"=" * 70}')
170+
print(f'📊 Table name: {table_name}')
171+
print(f'📦 Block range: {min_block:,} to {max_block:,}')
172+
print(f'📈 Rows loaded: {total_rows:,}')
173+
print('🏷️ Label columns: symbol, name, decimals (from CSV)')
174+
print(f'⏱️ Duration: {duration:.2f}s')
175+
print(f'🚀 Throughput: {rows_per_sec:,.0f} rows/sec')
176+
print(f'👷 Workers: {num_workers} configured')
177+
print(f'✅ Successful: {len(results) - failed_count}/{len(results)} batches')
178+
if failed_count > 0:
179+
print(f'❌ Failed batches: {failed_count}')
180+
print('\nFirst 3 errors:')
181+
for f in failures[:3]:
182+
print(f' - {f.error}')
183+
if total_rows > 0:
184+
print(f'📊 Avg rows/block: {total_rows / (max_block - min_block):.0f}')
185+
print(f'{"=" * 70}')
186+
187+
print(f'\n✅ Table "{table_name}" is ready in Snowflake with token labels!')
188+
print('\n💡 Sample queries:')
189+
print(' -- View transfers with token info')
190+
print(' SELECT token_address, symbol, name, decimals, from_address, to_address, value')
191+
print(f' FROM {table_name} LIMIT 10;')
192+
print('\n -- Top tokens by transfer count')
193+
print(' SELECT symbol, name, COUNT(*) as transfer_count')
194+
print(f' FROM {table_name}')
195+
print(' GROUP BY symbol, name')
196+
print(' ORDER BY transfer_count DESC')
197+
print(' LIMIT 10;')
198+
print('\n💡 Note: Snowpipe Streaming data may take a few moments to be queryable')
199+
print('💡 Note: Only transfers for tokens in the metadata CSV are included (inner join)')
200+
201+
return table_name, total_rows, duration
202+
203+
204+
if __name__ == '__main__':
205+
parser = argparse.ArgumentParser(
206+
description='Load ERC20 transfers with token labels into Snowflake using Snowpipe Streaming with parallel streaming'
207+
)
208+
parser.add_argument(
209+
'--blocks', type=int, default=100_000, help='Number of recent blocks to load (default: 100,000)'
210+
)
211+
parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers (default: 4)')
212+
parser.add_argument(
213+
'--flush-interval',
214+
type=float,
215+
default=1.0,
216+
help='Snowpipe Streaming buffer flush interval in seconds (default: 1.0)',
217+
)
218+
219+
args = parser.parse_args()
220+
221+
try:
222+
load_erc20_transfers_with_labels(
223+
num_blocks=args.blocks, num_workers=args.workers, flush_interval=args.flush_interval
224+
)
225+
except KeyboardInterrupt:
226+
print('\n\n⚠️ Interrupted by user')
227+
except Exception as e:
228+
print(f'\n\n❌ Error: {e}')
229+
import traceback
230+
231+
traceback.print_exc()
232+
raise

k8s/deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ spec:
1818
spec:
1919
containers:
2020
- name: loader
21-
image: ghcr.io/edgeandnode/amp-python:latest
21+
image: ghcr.io/edgeandnode/amp-python:sha-b2b10aa
2222
imagePullPolicy: Always
2323

2424
# Command line arguments for the loader

0 commit comments

Comments
 (0)