Skip to content

Commit f468390

Browse files
committed
tidying up docs and examples around data paging and streaming
1 parent e540612 commit f468390

File tree

8 files changed

+301
-66
lines changed

8 files changed

+301
-66
lines changed

docs/streaming.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ When you query Cassandra for potentially large result sets, you need to understa
1515
- [Advanced Patterns](#advanced-patterns)
1616
- [Performance Guidelines](#performance-guidelines)
1717
- [Common Pitfalls](#common-pitfalls)
18+
- [True Async Paging](#true-async-paging)
1819

1920
## How Cassandra Paging Works
2021

@@ -848,6 +849,17 @@ async def load_batch(prepared_stmt, batch):
848849
)
849850
```
850851

852+
## True Async Paging
853+
854+
For detailed information about True Async Paging behavior, common misconceptions, and best practices, see our dedicated guide: [True Async Paging](true-async-paging.md).
855+
856+
Key points covered:
857+
- Critical importance of context managers
858+
- How paging actually works (on-demand, not pre-fetched)
859+
- When LIMIT is needed (hint: rarely with paging!)
860+
- Page size recommendations for different use cases
861+
- Common patterns and anti-patterns
862+
851863
## Conclusion
852864

853865
The key to understanding streaming in async-cassandra is recognizing that:

docs/true-async-paging.md

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# True Async Paging in async-cassandra
2+
3+
## Key Concepts
4+
5+
### 1. Always Use Context Managers (CRITICAL)
6+
7+
```python
8+
# ✅ CORRECT - Prevents resource leaks
9+
async with await session.execute_stream("SELECT * FROM table") as result:
10+
async for row in result:
11+
await process_row(row)
12+
13+
# ❌ WRONG - Will leak resources!
14+
result = await session.execute_stream("SELECT * FROM table")
15+
async for row in result: # Missing context manager!
16+
await process_row(row)
17+
```
18+
19+
### 2. How Paging Actually Works
20+
21+
The Cassandra driver implements **true streaming** with these characteristics:
22+
23+
- **On-Demand Fetching**: Pages are fetched as you consume data, NOT all at once
24+
- **Async Fetching**: While you process page N, the driver can fetch page N+1
25+
- **Memory Efficient**: Only one page is held in memory at a time
26+
- **No Pre-fetching All Data**: The driver doesn't load the entire result set
27+
28+
### 3. Page Size Recommendations
29+
30+
```python
31+
# Small Pages (1000-5000 rows)
32+
# ✅ Best for: Real-time processing, low memory usage, better responsiveness
33+
# ❌ Trade-off: More network round trips
34+
config = StreamConfig(fetch_size=1000)
35+
36+
# Medium Pages (5000-10000 rows)
37+
# ✅ Best for: General purpose, good balance
38+
config = StreamConfig(fetch_size=5000)
39+
40+
# Large Pages (10000-50000 rows)
41+
# ✅ Best for: Bulk exports, batch processing, fewer round trips
42+
# ❌ Trade-off: Higher memory usage, slower first results
43+
config = StreamConfig(fetch_size=20000)
44+
```
45+
46+
### 4. LIMIT vs Paging
47+
48+
**You don't need LIMIT with paging!**
49+
50+
```python
51+
# ❌ UNNECESSARY - fetch_size already controls data flow
52+
stmt = await session.prepare("SELECT * FROM users LIMIT ?")
53+
async with await session.execute_stream(stmt, [1000]) as result:
54+
# This limits total results, not page size!
55+
56+
# ✅ CORRECT - Let paging handle the data flow
57+
stmt = await session.prepare("SELECT * FROM users")
58+
config = StreamConfig(fetch_size=1000) # This controls page size
59+
async with await session.execute_stream(stmt, stream_config=config) as result:
60+
# Process all data efficiently, page by page
61+
```
62+
63+
### 5. Processing Patterns
64+
65+
#### Row-by-Row Processing
66+
```python
67+
# Process each row as it arrives
68+
async with await session.execute_stream("SELECT * FROM large_table") as result:
69+
async for row in result:
70+
await process_row(row) # Non-blocking, pages fetched as needed
71+
```
72+
73+
#### Page-by-Page Processing
74+
```python
75+
# Process entire pages at once (e.g., for batch operations)
76+
config = StreamConfig(fetch_size=5000)
77+
async with await session.execute_stream("SELECT * FROM large_table", stream_config=config) as result:
78+
async for page in result.pages():
79+
# Process entire page (list of rows)
80+
await bulk_insert_to_warehouse(page)
81+
```
82+
83+
### 6. Common Misconceptions
84+
85+
**Myth**: "The driver pre-fetches all pages"
86+
**Reality**: Pages are fetched on-demand as you consume data
87+
88+
**Myth**: "I need LIMIT to control memory usage"
89+
**Reality**: `fetch_size` controls memory usage, LIMIT just limits total results
90+
91+
**Myth**: "Larger pages are always better"
92+
**Reality**: It depends on your use case - see recommendations above
93+
94+
**Myth**: "I can skip the context manager"
95+
**Reality**: Context managers are MANDATORY to prevent resource leaks
96+
97+
### 7. Performance Tips
98+
99+
1. **Match fetch_size to your processing speed**
100+
- Fast processing → larger pages
101+
- Slow processing → smaller pages
102+
103+
2. **Use page callbacks for monitoring**
104+
```python
105+
config = StreamConfig(
106+
fetch_size=5000,
107+
page_callback=lambda page_num, total_rows:
108+
logger.info(f"Processing page {page_num}, total: {total_rows:,}")
109+
)
110+
```
111+
112+
3. **Consider network latency**
113+
- High latency → larger pages (fewer round trips)
114+
- Low latency → smaller pages are fine
115+
116+
4. **Monitor memory usage**
117+
- Each page holds `fetch_size` rows in memory
118+
- Adjust based on row size and available memory

examples/README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,16 @@ All examples follow the required pattern:
144144
```python
145145
# ALWAYS use context managers for resource management
146146
async with AsyncCluster(["localhost"]) as cluster:
147-
async with cluster.connect() as session:
148-
# Your code here
149-
pass
147+
async with await cluster.connect() as session:
148+
# For streaming, ALWAYS use context manager:
149+
async with await session.execute_stream("SELECT * FROM table") as result:
150+
async for row in result:
151+
# Process row
152+
pass
150153
```
151154
155+
**⚠️ CRITICAL**: See [True Async Paging](../docs/true-async-paging.md) for important details about streaming patterns and common mistakes.
156+
152157
### MANDATORY: Always Use PreparedStatements
153158
For any query with parameters:
154159
```python

examples/fastapi_app/main.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,9 @@ async def stream_users(
337337
stream_config = StreamConfig(fetch_size=fetch_size)
338338

339339
# Use context manager for proper resource cleanup
340-
stmt = await session.prepare("SELECT * FROM users LIMIT ?")
341-
async with await session.execute_stream(
342-
stmt, [limit], stream_config=stream_config
343-
) as result:
340+
# Note: LIMIT not needed - fetch_size controls data flow
341+
stmt = await session.prepare("SELECT * FROM users")
342+
async with await session.execute_stream(stmt, stream_config=stream_config) as result:
344343
users = []
345344
async for row in result:
346345
# Handle both dict-like and object-like row access
@@ -420,10 +419,9 @@ async def stream_users_by_pages(
420419
stream_config = StreamConfig(fetch_size=fetch_size, max_pages=max_pages)
421420

422421
# Use context manager for automatic cleanup
423-
stmt = await session.prepare("SELECT * FROM users LIMIT ?")
424-
async with await session.execute_stream(
425-
stmt, [limit], stream_config=stream_config
426-
) as result:
422+
# Note: LIMIT not needed - fetch_size controls data flow
423+
stmt = await session.prepare("SELECT * FROM users")
424+
async with await session.execute_stream(stmt, stream_config=stream_config) as result:
427425
pages_info = []
428426
total_processed = 0
429427

examples/fastapi_app/main_enhanced.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -309,17 +309,16 @@ def page_callback(page_number: int, page_size: int):
309309
stream_config.page_callback = page_callback
310310

311311
# Execute streaming query with prepared statement
312-
stmt = await session.session.prepare("SELECT * FROM users LIMIT ?")
313-
result = await session.session.execute_stream(
314-
stmt,
315-
[limit],
316-
stream_config=stream_config,
317-
)
312+
# Note: LIMIT is not needed with paging - fetch_size controls data flow
313+
stmt = await session.session.prepare("SELECT * FROM users")
318314

319315
users = []
320316

321-
# Use context manager for proper cleanup
322-
async with result as stream:
317+
# CRITICAL: Always use context manager to prevent resource leaks
318+
async with await session.session.execute_stream(
319+
stmt,
320+
stream_config=stream_config,
321+
) as stream:
323322
async for row in stream:
324323
users.append(
325324
{
@@ -329,7 +328,8 @@ def page_callback(page_number: int, page_size: int):
329328
}
330329
)
331330

332-
# Check if we've reached the limit
331+
# Note: If you need to limit results, track count manually
332+
# The fetch_size in StreamConfig controls page size efficiently
333333
if limit and len(users) >= limit:
334334
break
335335

0 commit comments

Comments
 (0)