Skip to content

Commit f874f4d

Browse files
committed
updating examples and docs
1 parent e17e262 commit f874f4d

File tree

10 files changed

+322
-289
lines changed

10 files changed

+322
-289
lines changed

README.md

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,33 @@ async def get_user(user_id: str):
5050

5151
```python
5252
# ✅ Async code with our wrapper - NON-BLOCKING
53+
from contextlib import asynccontextmanager
5354
from fastapi import FastAPI
5455
from async_cassandra import AsyncCluster
5556

56-
app = FastAPI()
57-
cluster = AsyncCluster(['localhost'])
5857
session = None
59-
60-
@app.on_event("startup")
61-
async def startup():
62-
global session
63-
session = await cluster.connect()
58+
user_stmt = None
59+
60+
@asynccontextmanager
61+
async def lifespan(app: FastAPI):
62+
global session, user_stmt
63+
# Use context managers for proper resource management
64+
async with AsyncCluster(['localhost']) as cluster:
65+
async with cluster.connect() as session:
66+
# Prepare statement for better performance
67+
user_stmt = await session.prepare(
68+
"SELECT * FROM users WHERE id = ?"
69+
)
70+
yield # App runs here
71+
# Cleanup happens automatically
72+
73+
app = FastAPI(lifespan=lifespan)
6474

6575
@app.get("/users/{user_id}")
6676
async def get_user(user_id: str):
6777
# This doesn't block! The event loop remains free to handle
6878
# other requests while waiting for the database response
69-
result = await session.execute("SELECT * FROM users WHERE id = %s", [user_id])
79+
result = await session.execute(user_stmt, [user_id])
7080
return {"user": result.one()}
7181
```
7282

docs/getting-started.md

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,17 @@ import asyncio
2727
from async_cassandra import AsyncCluster
2828

2929
async def main():
30-
# 1. Create a cluster object (doesn't connect yet)
31-
cluster = AsyncCluster(['localhost'])
32-
33-
# 2. Connect and get a session (this is where connection happens)
34-
session = await cluster.connect('my_keyspace')
35-
36-
# 3. Execute a query (waits for results without blocking)
37-
result = await session.execute("SELECT * FROM users LIMIT 10")
30+
# Use context managers for automatic cleanup (REQUIRED)
31+
async with AsyncCluster(['localhost']) as cluster:
32+
async with cluster.connect('my_keyspace') as session:
33+
# Execute a query (waits for results without blocking)
34+
result = await session.execute("SELECT * FROM users LIMIT 10")
3835

39-
# 4. Process results (rows are like dictionaries)
40-
for row in result:
41-
print(f"User: {row.name}, Email: {row.email}")
36+
# Process results (rows are like dictionaries)
37+
for row in result:
38+
print(f"User: {row.name}, Email: {row.email}")
4239

43-
# 5. Clean up (IMPORTANT: always close connections)
44-
await cluster.shutdown()
40+
# No manual cleanup needed - context managers handle it!
4541

4642
# Run the async function
4743
asyncio.run(main())
@@ -353,42 +349,39 @@ FastAPI is an async web framework. If you use the regular Cassandra driver, it w
353349

354350
### FastAPI Example
355351

356-
#### The Setup
352+
#### The Setup (Using Lifespan Context Manager)
357353

358354
```python
355+
from contextlib import asynccontextmanager
359356
from fastapi import FastAPI, HTTPException
360357
from async_cassandra import AsyncCluster
361358
import uuid
362359

363-
app = FastAPI()
364-
365-
# Global variables for cluster and session
366-
cluster = None
360+
# Global variables for session and prepared statements
367361
session = None
368362
prepared_statements = {}
369363

370-
@app.on_event("startup")
371-
async def startup():
372-
"""Initialize Cassandra connection when server starts."""
373-
global cluster, session, prepared_statements
364+
@asynccontextmanager
365+
async def lifespan(app: FastAPI):
366+
"""Manage Cassandra connection lifecycle."""
367+
global session, prepared_statements
374368

375-
# Create cluster connection
376-
cluster = AsyncCluster(['localhost'])
377-
session = await cluster.connect('my_keyspace')
369+
# Startup: Create connection with context manager
370+
async with AsyncCluster(['localhost']) as cluster:
371+
async with cluster.connect('my_keyspace') as session:
372+
# Prepare statements once at startup (more efficient)
373+
prepared_statements['get_user'] = await session.prepare(
374+
"SELECT * FROM users WHERE id = ?"
375+
)
376+
prepared_statements['create_user'] = await session.prepare(
377+
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
378+
)
378379

379-
# Prepare statements once at startup (more efficient)
380-
prepared_statements['get_user'] = await session.prepare(
381-
"SELECT * FROM users WHERE id = ?"
382-
)
383-
prepared_statements['create_user'] = await session.prepare(
384-
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
385-
)
380+
yield # Server runs here
381+
382+
# Cleanup happens automatically when context exits
386383

387-
@app.on_event("shutdown")
388-
async def shutdown():
389-
"""Clean up when server stops."""
390-
if cluster:
391-
await cluster.shutdown()
384+
app = FastAPI(lifespan=lifespan)
392385
```
393386

394387
#### API Endpoints

examples/README.md

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,46 @@ python metrics_simple.py
8282

8383
### 6. [Advanced Metrics](metrics_example.py)
8484

85-
More comprehensive metrics example (requires updates to work with current API).
85+
Comprehensive metrics and observability example:
86+
- Multiple metrics collectors setup
87+
- Query performance monitoring
88+
- Connection health tracking
89+
- Prometheus integration example
90+
- FastAPI integration patterns
8691

87-
### 7. [Monitoring Configuration](monitoring/)
92+
**Run:**
93+
```bash
94+
python metrics_example.py
95+
```
96+
97+
### 7. [Context Manager Safety](context_manager_safety_demo.py)
98+
99+
Demonstrates proper context manager usage:
100+
- Context manager isolation
101+
- Error safety in queries and streaming
102+
- Concurrent operations with shared resources
103+
- Resource cleanup guarantees
104+
105+
**Run:**
106+
```bash
107+
python context_manager_safety_demo.py
108+
```
109+
110+
### 8. [Thread Pool Configuration](thread_pool_configuration.py)
111+
112+
Shows how to configure thread pools for different workloads:
113+
- Web application configuration
114+
- Batch processing setup
115+
- Thread pool size comparison
116+
- Thread starvation demonstration
117+
- Thread pool monitoring
118+
119+
**Run:**
120+
```bash
121+
python thread_pool_configuration.py
122+
```
123+
124+
### 9. [Monitoring Configuration](monitoring/)
88125

89126
Production-ready monitoring configurations:
90127
- **alerts.yml** - Prometheus alerting rules for:
@@ -114,25 +151,48 @@ All examples require:
114151
# pip install async-cassandra
115152
```
116153

117-
## Common Patterns Demonstrated
154+
## Best Practices Demonstrated
155+
156+
### MANDATORY: Always Use Context Managers
157+
All examples follow the required pattern:
158+
```python
159+
# ALWAYS use context managers for resource management
160+
async with AsyncCluster(["localhost"]) as cluster:
161+
async with cluster.connect() as session:
162+
# Your code here
163+
pass
164+
```
165+
166+
### MANDATORY: Always Use PreparedStatements
167+
For any query with parameters:
168+
```python
169+
# Prepare statement once
170+
stmt = await session.prepare(
171+
"INSERT INTO users (id, name) VALUES (?, ?)"
172+
)
173+
# Execute many times
174+
await session.execute(stmt, [user_id, name])
175+
```
176+
177+
### Common Patterns Demonstrated
118178

119-
### Connection Management
120-
- Using context managers for automatic cleanup
179+
#### Connection Management
180+
- Using context managers for automatic cleanup (REQUIRED)
121181
- Proper cluster and session lifecycle
122182
- Connection health monitoring
123183

124-
### Error Handling
184+
#### Error Handling
125185
- Catching and handling Cassandra exceptions
126186
- Retry strategies with idempotency
127187
- Graceful degradation
128188

129-
### Performance Optimization
130-
- Prepared statements for repeated queries
189+
#### Performance Optimization
190+
- Prepared statements for repeated queries (REQUIRED)
131191
- Concurrent query execution
132-
- Streaming for large datasets
192+
- Streaming for large datasets with context managers
133193
- Appropriate fetch sizes
134194

135-
### Monitoring & Observability
195+
#### Monitoring & Observability
136196
- Metrics collection
137197
- Performance tracking
138198
- Health checks
@@ -159,11 +219,14 @@ Examples use local Cassandra by default. Network latency may vary with remote cl
159219
## Contributing
160220

161221
We welcome new examples! When contributing:
222+
- **MUST use context managers** for all cluster/session/streaming operations
223+
- **MUST use PreparedStatements** for all parameterized queries
162224
- Include clear documentation in the code
163225
- Handle errors appropriately
164226
- Clean up resources (drop keyspaces/tables)
165227
- Test with Python 3.12
166228
- Update this README
229+
- Follow the patterns shown in existing examples
167230

168231
## Support
169232

examples/context_manager_safety_demo.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,10 @@ async def demonstrate_streaming_error_safety(cluster):
6565
"""
6666
)
6767

68-
# Insert some data
68+
# Insert some data using prepared statement
69+
insert_stmt = await session.prepare("INSERT INTO test_data (id, value) VALUES (?, ?)")
6970
for i in range(10):
70-
await session.execute(
71-
"INSERT INTO test_data (id, value) VALUES (%s, %s)", [uuid.uuid4(), f"value_{i}"]
72-
)
71+
await session.execute(insert_stmt, [uuid.uuid4(), f"value_{i}"])
7372

7473
# Try streaming from non-existent table (will fail)
7574
try:

examples/export_large_table.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -283,35 +283,29 @@ async def setup_sample_data(session):
283283

284284
async def main():
285285
"""Run the export example."""
286-
# Connect to Cassandra
287-
cluster = AsyncCluster(["localhost"])
288-
289-
try:
290-
session = await cluster.connect()
291-
292-
# Setup sample data
293-
await setup_sample_data(session)
294-
295-
# Create output directory
296-
output_dir = Path("exports")
297-
output_dir.mkdir(exist_ok=True)
298-
299-
# Export using async I/O if available
300-
if ASYNC_FILE_IO:
301-
await export_table_async(
302-
session, "export_example", "products", str(output_dir / "products_async.csv")
303-
)
304-
else:
305-
await export_table_sync(
306-
session, "export_example", "products", str(output_dir / "products_sync.csv")
307-
)
308-
309-
# Cleanup (optional)
310-
logger.info("\nCleaning up...")
311-
await session.execute("DROP KEYSPACE export_example")
286+
# Connect to Cassandra using context manager
287+
async with AsyncCluster(["localhost"]) as cluster:
288+
async with cluster.connect() as session:
289+
# Setup sample data
290+
await setup_sample_data(session)
291+
292+
# Create output directory
293+
output_dir = Path("exports")
294+
output_dir.mkdir(exist_ok=True)
295+
296+
# Export using async I/O if available
297+
if ASYNC_FILE_IO:
298+
await export_table_async(
299+
session, "export_example", "products", str(output_dir / "products_async.csv")
300+
)
301+
else:
302+
await export_table_sync(
303+
session, "export_example", "products", str(output_dir / "products_sync.csv")
304+
)
312305

313-
finally:
314-
await cluster.shutdown()
306+
# Cleanup (optional)
307+
logger.info("\nCleaning up...")
308+
await session.execute("DROP KEYSPACE export_example")
315309

316310

317311
if __name__ == "__main__":

examples/metrics_example.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,10 @@ async def main():
2828
)
2929

3030
# 2. Create cluster and session with metrics
31-
cluster = AsyncCluster(contact_points=["localhost"])
32-
session = await cluster.connect()
33-
34-
# Inject metrics middleware into session
35-
session._metrics = metrics
36-
37-
try:
31+
async with AsyncCluster(contact_points=["localhost"]) as cluster:
32+
async with cluster.connect() as session:
33+
# Inject metrics middleware into session
34+
session._metrics = metrics
3835
# 3. Set up test environment
3936
await session.execute(
4037
"""
@@ -135,10 +132,6 @@ async def main():
135132
print(f" Total Queries: {health['total_queries']}")
136133
print(f" Errors: {health['error_count']}")
137134

138-
finally:
139-
await session.close()
140-
await cluster.shutdown()
141-
142135

143136
async def prometheus_example():
144137
"""Example showing Prometheus integration."""
@@ -153,20 +146,20 @@ async def prometheus_example():
153146
print("📊 Prometheus metrics server started on http://localhost:8000")
154147

155148
# Run some queries with metrics
156-
cluster = AsyncCluster(contact_points=["localhost"])
157-
session = await cluster.connect()
158-
session._metrics = metrics
149+
async with AsyncCluster(contact_points=["localhost"]) as cluster:
150+
async with cluster.connect() as session:
151+
session._metrics = metrics
159152

160-
# Execute some queries
161-
for i in range(5):
162-
await session.execute("SELECT release_version FROM system.local")
153+
# Prepare statement
154+
stmt = await session.prepare("SELECT release_version FROM system.local")
163155

164-
print("📈 Metrics available at http://localhost:8000/metrics")
165-
print("Sample metrics output:")
166-
print(generate_latest().decode("utf-8")[:500] + "...")
156+
# Execute some queries
157+
for i in range(5):
158+
await session.execute(stmt, [])
167159

168-
await session.close()
169-
await cluster.shutdown()
160+
print("📈 Metrics available at http://localhost:8000/metrics")
161+
print("Sample metrics output:")
162+
print(generate_latest().decode("utf-8")[:500] + "...")
170163

171164
except ImportError:
172165
print("❌ prometheus_client not installed. Install with: pip install prometheus_client")

0 commit comments

Comments
 (0)