Skip to content

Commit 0bc6e29

Browse files
committed
adding tests around examples to make sure they actually work
1 parent 9e2afc8 commit 0bc6e29

File tree

9 files changed

+1065
-590
lines changed

9 files changed

+1065
-590
lines changed

examples/README.md

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,7 @@ Demonstrates proper context manager usage:
107107
python context_manager_safety_demo.py
108108
```
109109

110-
### 8. [Thread Pool Configuration](thread_pool_configuration.py)
111-
112-
Shows how to configure thread pools for different workloads:
113-
- Web application configuration
114-
- Batch processing setup
115-
- Thread pool size comparison
116-
- Thread starvation demonstration
117-
- Thread pool monitoring
118-
119-
**Run:**
120-
```bash
121-
python thread_pool_configuration.py
122-
```
123-
124-
### 9. [Monitoring Configuration](monitoring/)
110+
### 8. [Monitoring Configuration](monitoring/)
125111

126112
Production-ready monitoring configurations:
127113
- **alerts.yml** - Prometheus alerting rules for:

examples/context_manager_safety_demo.py

Lines changed: 94 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
import logging
1111
import uuid
1212

13+
from cassandra import InvalidRequest
14+
1315
from async_cassandra import AsyncCluster
14-
from async_cassandra.exceptions import QueryError
1516

1617
# Set up logging
1718
logging.basicConfig(level=logging.INFO)
@@ -22,74 +23,69 @@ async def demonstrate_query_error_safety(cluster):
2223
"""Show that query errors don't close the session."""
2324
logger.info("\n=== Demonstrating Query Error Safety ===")
2425

25-
session = await cluster.connect()
26-
27-
try:
28-
# This will fail
29-
await session.execute("SELECT * FROM non_existent_table")
30-
except QueryError as e:
31-
logger.info(f"Query failed as expected: {e}")
32-
33-
# Session should still work
34-
logger.info("Session still works after error:")
35-
result = await session.execute("SELECT release_version FROM system.local")
36-
logger.info(f"Cassandra version: {result.one().release_version}")
26+
async with await cluster.connect() as session:
27+
try:
28+
# This will fail
29+
await session.execute("SELECT * FROM non_existent_table")
30+
except InvalidRequest as e:
31+
logger.info(f"Query failed as expected: {e}")
3732

38-
await session.close()
33+
# Session should still work
34+
logger.info("Session still works after error:")
35+
result = await session.execute("SELECT release_version FROM system.local")
36+
logger.info(f"Cassandra version: {result.one().release_version}")
3937

4038

4139
async def demonstrate_streaming_error_safety(cluster):
4240
"""Show that streaming errors don't close the session."""
4341
logger.info("\n=== Demonstrating Streaming Error Safety ===")
4442

45-
session = await cluster.connect()
46-
47-
# Create test keyspace and data
48-
await session.execute(
49-
"""
50-
CREATE KEYSPACE IF NOT EXISTS context_demo
51-
WITH REPLICATION = {
52-
'class': 'SimpleStrategy',
53-
'replication_factor': 1
54-
}
55-
"""
56-
)
57-
await session.set_keyspace("context_demo")
58-
59-
await session.execute(
60-
"""
61-
CREATE TABLE IF NOT EXISTS test_data (
62-
id UUID PRIMARY KEY,
63-
value TEXT
43+
async with await cluster.connect() as session:
44+
# Create test keyspace and data
45+
await session.execute(
46+
"""
47+
CREATE KEYSPACE IF NOT EXISTS context_demo
48+
WITH REPLICATION = {
49+
'class': 'SimpleStrategy',
50+
'replication_factor': 1
51+
}
52+
"""
53+
)
54+
await session.set_keyspace("context_demo")
55+
56+
await session.execute(
57+
"""
58+
CREATE TABLE IF NOT EXISTS test_data (
59+
id UUID PRIMARY KEY,
60+
value TEXT
61+
)
62+
"""
6463
)
65-
"""
66-
)
6764

68-
# Insert some data using prepared statement
69-
insert_stmt = await session.prepare("INSERT INTO test_data (id, value) VALUES (?, ?)")
70-
for i in range(10):
71-
await session.execute(insert_stmt, [uuid.uuid4(), f"value_{i}"])
65+
# Insert some data using prepared statement
66+
insert_stmt = await session.prepare("INSERT INTO test_data (id, value) VALUES (?, ?)")
67+
for i in range(10):
68+
await session.execute(insert_stmt, [uuid.uuid4(), f"value_{i}"])
7269

73-
# Try streaming from non-existent table (will fail)
74-
try:
75-
async with await session.execute_stream("SELECT * FROM non_existent_table") as stream:
76-
async for row in stream:
77-
pass
78-
except Exception as e:
79-
logger.info(f"Streaming failed as expected: {e}")
70+
# Try streaming from non-existent table (will fail)
71+
try:
72+
async with await session.execute_stream("SELECT * FROM non_existent_table") as stream:
73+
async for row in stream:
74+
pass
75+
except Exception as e:
76+
logger.info(f"Streaming failed as expected: {e}")
8077

81-
# Session should still work for new streaming
82-
logger.info("Starting new streaming operation after error:")
83-
count = 0
84-
async with await session.execute_stream("SELECT * FROM test_data") as stream:
85-
async for row in stream:
86-
count += 1
78+
# Session should still work for new streaming
79+
logger.info("Starting new streaming operation after error:")
80+
count = 0
81+
async with await session.execute_stream("SELECT * FROM test_data") as stream:
82+
async for row in stream:
83+
count += 1
8784

88-
logger.info(f"Successfully streamed {count} rows after error")
85+
logger.info(f"Successfully streamed {count} rows after error")
8986

90-
# Cleanup
91-
await session.execute("DROP KEYSPACE context_demo")
92-
await session.close()
87+
# Cleanup
88+
await session.execute("DROP KEYSPACE context_demo")
9389

9490

9591
async def demonstrate_context_manager_isolation(cluster):
@@ -108,69 +104,64 @@ async def demonstrate_context_manager_isolation(cluster):
108104

109105
# Cluster should still work
110106
logger.info("Creating new session from same cluster:")
111-
session2 = await cluster.connect()
112-
result = await session2.execute("SELECT now() FROM system.local")
113-
logger.info(f"New session works: {result.one()[0]}")
114-
await session2.close()
107+
async with await cluster.connect() as session2:
108+
result = await session2.execute("SELECT now() FROM system.local")
109+
logger.info(f"New session works: {result.one()[0]}")
115110

116111
# Scenario 2: Streaming context doesn't affect session
117112
logger.info("\nScenario 2: Streaming context with early exit")
118-
session3 = await cluster.connect()
119-
120-
# Stream with early exit
121-
count = 0
122-
async with await session3.execute_stream("SELECT * FROM system.local") as stream:
123-
async for row in stream:
124-
count += 1
125-
break # Early exit
126-
127-
logger.info(f"Exited streaming early after {count} row")
113+
async with await cluster.connect() as session3:
114+
# Stream with early exit
115+
count = 0
116+
async with await session3.execute_stream("SELECT * FROM system.local") as stream:
117+
async for row in stream:
118+
count += 1
119+
break # Early exit
128120

129-
# Session should still work
130-
result = await session3.execute("SELECT now() FROM system.local")
131-
logger.info(f"Session still works: {result.one()[0]}")
121+
logger.info(f"Exited streaming early after {count} row")
132122

133-
await session3.close()
123+
# Session should still work
124+
result = await session3.execute("SELECT now() FROM system.local")
125+
logger.info(f"Session still works: {result.one()[0]}")
134126

135127

136128
async def demonstrate_concurrent_safety(cluster):
137129
"""Show that multiple operations can use shared resources safely."""
138130
logger.info("\n=== Demonstrating Concurrent Safety ===")
139131

140132
# Create shared session
141-
session = await cluster.connect()
142-
143-
async def worker(worker_id, query_count):
144-
"""Worker that executes queries."""
145-
for i in range(query_count):
133+
async with await cluster.connect() as session:
134+
135+
async def worker(worker_id, query_count):
136+
"""Worker that executes queries."""
137+
for i in range(query_count):
138+
try:
139+
result = await session.execute("SELECT now() FROM system.local")
140+
logger.info(f"Worker {worker_id} query {i+1}: {result.one()[0]}")
141+
await asyncio.sleep(0.1)
142+
except Exception as e:
143+
logger.error(f"Worker {worker_id} error: {e}")
144+
145+
async def streamer():
146+
"""Worker that uses streaming."""
146147
try:
147-
result = await session.execute("SELECT now() FROM system.local")
148-
logger.info(f"Worker {worker_id} query {i+1}: {result.one()[0]}")
149-
await asyncio.sleep(0.1)
148+
async with await session.execute_stream(
149+
"SELECT * FROM system_schema.keyspaces"
150+
) as stream:
151+
count = 0
152+
async for row in stream:
153+
count += 1
154+
if count % 5 == 0:
155+
logger.info(f"Streamer: Processed {count} keyspaces")
156+
await asyncio.sleep(0.1)
157+
logger.info(f"Streamer: Total {count} keyspaces")
150158
except Exception as e:
151-
logger.error(f"Worker {worker_id} error: {e}")
152-
153-
async def streamer():
154-
"""Worker that uses streaming."""
155-
try:
156-
async with await session.execute_stream(
157-
"SELECT * FROM system_schema.keyspaces"
158-
) as stream:
159-
count = 0
160-
async for row in stream:
161-
count += 1
162-
if count % 5 == 0:
163-
logger.info(f"Streamer: Processed {count} keyspaces")
164-
await asyncio.sleep(0.1)
165-
logger.info(f"Streamer: Total {count} keyspaces")
166-
except Exception as e:
167-
logger.error(f"Streamer error: {e}")
159+
logger.error(f"Streamer error: {e}")
168160

169-
# Run workers concurrently
170-
await asyncio.gather(worker(1, 3), worker(2, 3), streamer(), return_exceptions=True)
161+
# Run workers concurrently
162+
await asyncio.gather(worker(1, 3), worker(2, 3), streamer(), return_exceptions=True)
171163

172-
logger.info("All concurrent operations completed")
173-
await session.close()
164+
logger.info("All concurrent operations completed")
174165

175166

176167
async def main():

examples/export_large_table.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@ async def count_table_rows(session, keyspace: str, table_name: str) -> int:
3636
"""Count total rows in a table (approximate for large tables)."""
3737
# Note: COUNT(*) can be slow on large tables
3838
# Consider using token ranges for very large tables
39-
# Using system schema to validate table exists and avoid SQL injection
40-
validation_query = await session.execute(
41-
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?",
42-
[keyspace, table_name],
39+
40+
# First validate that the table exists to prevent SQL injection
41+
validation_stmt = await session.prepare(
42+
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?"
4343
)
44-
if not validation_query.one():
44+
validation_result = await session.execute(validation_stmt, [keyspace, table_name])
45+
if not validation_result.one():
4546
raise ValueError(f"Table {keyspace}.{table_name} does not exist")
4647

47-
# Safe to use table name after validation - but still use qualified name
48-
# In production, consider using prepared statements even for COUNT queries
48+
# For COUNT queries, we can't use prepared statements with dynamic table names
49+
# Since we've validated the table exists, we can safely construct the query
50+
# In production, consider implementing a token range count for large tables
4951
result = await session.execute(f"SELECT COUNT(*) FROM {keyspace}.{table_name}")
5052
return result.one()[0]
5153

@@ -78,13 +80,15 @@ def progress_callback(page_num: int, rows_so_far: int):
7880

7981
# CRITICAL: Use context manager for streaming to prevent memory leaks
8082
# Validate table exists before streaming
81-
validation_query = await session.execute(
82-
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?",
83-
[keyspace, table_name],
83+
validation_stmt = await session.prepare(
84+
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?"
8485
)
85-
if not validation_query.one():
86+
validation_result = await session.execute(validation_stmt, [keyspace, table_name])
87+
if not validation_result.one():
8688
raise ValueError(f"Table {keyspace}.{table_name} does not exist")
8789

90+
# For SELECT * with dynamic table names, we can't use prepared statements
91+
# Since we've validated the table exists, we can safely construct the query
8892
async with await session.execute_stream(
8993
f"SELECT * FROM {keyspace}.{table_name}", stream_config=config
9094
) as result:
@@ -152,13 +156,15 @@ async def _export():
152156

153157
# Use context manager for proper streaming cleanup
154158
# Validate table exists before streaming
155-
validation_query = await session.execute(
156-
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?",
157-
[keyspace, table_name],
159+
validation_stmt = await session.prepare(
160+
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?"
158161
)
159-
if not validation_query.one():
162+
validation_result = await session.execute(validation_stmt, [keyspace, table_name])
163+
if not validation_result.one():
160164
raise ValueError(f"Table {keyspace}.{table_name} does not exist")
161165

166+
# For SELECT * with dynamic table names, we can't use prepared statements
167+
# Since we've validated the table exists, we can safely construct the query
162168
async with await session.execute_stream(
163169
f"SELECT * FROM {keyspace}.{table_name}", stream_config=config
164170
) as result:
@@ -285,7 +291,7 @@ async def main():
285291
"""Run the export example."""
286292
# Connect to Cassandra using context manager
287293
async with AsyncCluster(["localhost"]) as cluster:
288-
async with cluster.connect() as session:
294+
async with await cluster.connect() as session:
289295
# Setup sample data
290296
await setup_sample_data(session)
291297

0 commit comments

Comments
 (0)