Skip to content

Commit 30fd99f

Browse files
committed
examples
1 parent 6eaa8c7 commit 30fd99f

10 files changed

+703
-328
lines changed

Makefile

Lines changed: 187 additions & 43 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,50 @@ We welcome contributions! Please see:
287287
- [FastAPI Integration](examples/fastapi_app/README.md) - Complete REST API example
288288
- [More Examples](examples/) - Additional usage patterns
289289

290+
## 🎯 Running the Examples
291+
292+
The project includes comprehensive examples demonstrating various features and use cases. Each example can be run using the provided Makefile, which automatically handles Cassandra setup if needed.
293+
294+
### Available Examples
295+
296+
Run any example with: `make example-<name>`
297+
298+
- **`make example-basic`** - Basic connection and query execution
299+
- **`make example-streaming`** - Memory-efficient streaming of large result sets with True Async Paging
300+
- **`make example-context-safety`** - Demonstrates proper context manager usage and resource isolation
301+
- **`make example-export-large-table`** - Export large tables to CSV with progress tracking
302+
- **`make example-export-parquet`** - Export data to Parquet format with complex data types
303+
- **`make example-metrics`** - Comprehensive metrics collection and performance monitoring
304+
- **`make example-metrics-simple`** - Basic metrics collection example
305+
- **`make example-realtime`** - Real-time data processing with sliding window analytics
306+
- **`make example-streaming-demo`** - Visual demonstration that streaming doesn't block the event loop
307+
308+
### Running with External Cassandra
309+
310+
If you have Cassandra running elsewhere:
311+
312+
```bash
313+
# Single node
314+
CASSANDRA_CONTACT_POINTS=10.0.0.1 make example-streaming
315+
316+
# Multiple nodes
317+
CASSANDRA_CONTACT_POINTS=10.0.0.1,10.0.0.2,10.0.0.3 make example-streaming
318+
319+
# With custom port
320+
CASSANDRA_CONTACT_POINTS=cassandra.example.com CASSANDRA_PORT=9043 make example-basic
321+
```
322+
323+
### Example Descriptions
324+
325+
- **Basic Example**: Shows fundamental operations like connecting, executing queries, and using prepared statements
326+
- **Streaming Examples**: Demonstrate True Async Paging for processing millions of rows without memory issues
327+
- **Export Examples**: Show how to export Cassandra data to various formats (CSV, Parquet) with progress tracking
328+
- **Metrics Examples**: Illustrate performance monitoring, query tracking, and connection health checking
329+
- **Real-time Processing**: Demonstrates processing time-series IoT data with concurrent operations
330+
- **Context Safety Demo**: Proves that errors in one operation don't affect others when using context managers
331+
332+
Each example includes detailed comments explaining the concepts and best practices. Start with `example-basic` if you're new to the library.
333+
290334
## ⚡ Performance
291335

292336
async-cassandra enables your async Python application to work with Cassandra without blocking the event loop. While it doesn't eliminate the underlying driver's thread pool, it prevents those blocking operations from freezing your entire application. This is crucial for web servers where a blocked event loop means no requests can be processed.

examples/context_manager_safety_demo.py

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,26 +40,33 @@
4040

4141
async def demonstrate_query_error_safety(cluster):
4242
"""Show that query errors don't close the session."""
43-
logger.info("\n=== Demonstrating Query Error Safety ===")
43+
logger.info("\n" + "=" * 80)
44+
logger.info("🛡️ QUERY ERROR SAFETY DEMONSTRATION")
45+
logger.info("=" * 80)
4446

4547
async with await cluster.connect() as session:
48+
logger.info("\n🧪 Test 1: Execute a failing query")
4649
try:
4750
# This will fail
4851
await session.execute("SELECT * FROM non_existent_table")
4952
except InvalidRequest as e:
50-
logger.info(f"Query failed as expected: {e}")
53+
logger.info(f"Query failed as expected: {type(e).__name__}")
5154

5255
# Session should still work
53-
logger.info("Session still works after error:")
56+
logger.info("\n🧪 Test 2: Verify session still works after error")
5457
result = await session.execute("SELECT release_version FROM system.local")
55-
logger.info(f"Cassandra version: {result.one().release_version}")
58+
logger.info(f" ✅ Session is healthy! Cassandra version: {result.one().release_version}")
59+
logger.info("\n💡 Key insight: Query errors are isolated - they don't affect the session!")
5660

5761

5862
async def demonstrate_streaming_error_safety(cluster):
5963
"""Show that streaming errors don't close the session."""
60-
logger.info("\n=== Demonstrating Streaming Error Safety ===")
64+
logger.info("\n" + "=" * 80)
65+
logger.info("🌊 STREAMING ERROR SAFETY DEMONSTRATION")
66+
logger.info("=" * 80)
6167

6268
async with await cluster.connect() as session:
69+
logger.info("\n🛠️ Setting up test data...")
6370
# Create test keyspace and data
6471
await session.execute(
6572
"""
@@ -70,96 +77,108 @@ async def demonstrate_streaming_error_safety(cluster):
7077
}
7178
"""
7279
)
73-
await session.set_keyspace("context_demo")
74-
7580
await session.execute(
7681
"""
77-
CREATE TABLE IF NOT EXISTS test_data (
82+
CREATE TABLE IF NOT EXISTS context_demo.test_data (
7883
id UUID PRIMARY KEY,
7984
value TEXT
8085
)
8186
"""
8287
)
8388

8489
# Insert some data using prepared statement
85-
insert_stmt = await session.prepare("INSERT INTO test_data (id, value) VALUES (?, ?)")
90+
insert_stmt = await session.prepare(
91+
"INSERT INTO context_demo.test_data (id, value) VALUES (?, ?)"
92+
)
8693
for i in range(10):
8794
await session.execute(insert_stmt, [uuid.uuid4(), f"value_{i}"])
95+
logger.info(" ✓ Created 10 test records")
8896

8997
# Try streaming from non-existent table (will fail)
98+
logger.info("\n🧪 Test 1: Stream from non-existent table")
9099
try:
91100
async with await session.execute_stream("SELECT * FROM non_existent_table") as stream:
92101
async for row in stream:
93102
pass
94103
except Exception as e:
95-
logger.info(f"Streaming failed as expected: {e}")
104+
logger.info(f"Streaming failed as expected: {type(e).__name__}")
96105

97106
# Session should still work for new streaming
98-
logger.info("Starting new streaming operation after error:")
107+
logger.info("\n🧪 Test 2: Start new streaming operation after error")
99108
count = 0
100-
async with await session.execute_stream("SELECT * FROM test_data") as stream:
109+
async with await session.execute_stream("SELECT * FROM context_demo.test_data") as stream:
101110
async for row in stream:
102111
count += 1
103112

104-
logger.info(f"Successfully streamed {count} rows after error")
113+
logger.info(f" ✅ Successfully streamed {count} rows after error!")
114+
logger.info("\n💡 Key insight: Streaming errors are isolated - session remains healthy!")
105115

106116
# Cleanup
107117
await session.execute("DROP KEYSPACE context_demo")
108118

109119

110120
async def demonstrate_context_manager_isolation(cluster):
111121
"""Show how context managers isolate resource cleanup."""
112-
logger.info("\n=== Demonstrating Context Manager Isolation ===")
122+
logger.info("\n" + "=" * 80)
123+
logger.info("🔒 CONTEXT MANAGER ISOLATION DEMONSTRATION")
124+
logger.info("=" * 80)
113125

114126
# Scenario 1: Session context doesn't affect cluster
115-
logger.info("\nScenario 1: Session context with error")
127+
logger.info("\n🧪 Scenario 1: Session error doesn't affect cluster")
116128
try:
117129
async with await cluster.connect() as session:
118130
result = await session.execute("SELECT now() FROM system.local")
119-
logger.info(f"Query succeeded: {result.one()[0]}")
131+
logger.info(f" ✓ Query succeeded: {result.one()[0]}")
132+
logger.info(" 💥 Simulating error...")
120133
raise ValueError("Simulated error in session context")
121134
except ValueError:
122-
logger.info("Error handled, session was closed by context manager")
135+
logger.info("Error handled, session closed by context manager")
123136

124137
# Cluster should still work
125-
logger.info("Creating new session from same cluster:")
138+
logger.info("\n🧪 Creating new session from same cluster:")
126139
async with await cluster.connect() as session2:
127140
result = await session2.execute("SELECT now() FROM system.local")
128-
logger.info(f"New session works: {result.one()[0]}")
141+
logger.info(f"New session works perfectly: {result.one()[0]}")
129142

130143
# Scenario 2: Streaming context doesn't affect session
131-
logger.info("\nScenario 2: Streaming context with early exit")
144+
logger.info("\n🧪 Scenario 2: Early streaming exit doesn't affect session")
132145
async with await cluster.connect() as session3:
133146
# Stream with early exit
134147
count = 0
148+
logger.info(" 🔄 Starting streaming with early exit...")
135149
async with await session3.execute_stream("SELECT * FROM system.local") as stream:
136150
async for row in stream:
137151
count += 1
152+
logger.info(f" ✓ Read {count} row, exiting early...")
138153
break # Early exit
139154

140-
logger.info(f"Exited streaming early after {count} row")
141-
142155
# Session should still work
156+
logger.info("\n 🧪 Testing session after early streaming exit:")
143157
result = await session3.execute("SELECT now() FROM system.local")
144-
logger.info(f"Session still works: {result.one()[0]}")
158+
logger.info(f" ✅ Session still healthy: {result.one()[0]}")
159+
160+
logger.info("\n💡 Key insight: Context managers provide proper isolation!")
145161

146162

147163
async def demonstrate_concurrent_safety(cluster):
148164
"""Show that multiple operations can use shared resources safely."""
149-
logger.info("\n=== Demonstrating Concurrent Safety ===")
165+
logger.info("\n" + "=" * 80)
166+
logger.info("🚀 CONCURRENT OPERATIONS SAFETY DEMONSTRATION")
167+
logger.info("=" * 80)
150168

151169
# Create shared session
170+
logger.info("\n🔄 Running multiple concurrent operations on shared session...")
152171
async with await cluster.connect() as session:
153172

154173
async def worker(worker_id, query_count):
155174
"""Worker that executes queries."""
156175
for i in range(query_count):
157176
try:
158-
result = await session.execute("SELECT now() FROM system.local")
159-
logger.info(f"Worker {worker_id} query {i+1}: {result.one()[0]}")
177+
await session.execute("SELECT now() FROM system.local")
178+
logger.info(f" 👷 Worker {worker_id} query {i+1}: Success")
160179
await asyncio.sleep(0.1)
161180
except Exception as e:
162-
logger.error(f"Worker {worker_id} error: {e}")
181+
logger.error(f"Worker {worker_id} error: {e}")
163182

164183
async def streamer():
165184
"""Worker that uses streaming."""
@@ -171,27 +190,30 @@ async def streamer():
171190
async for row in stream:
172191
count += 1
173192
if count % 5 == 0:
174-
logger.info(f"Streamer: Processed {count} keyspaces")
193+
logger.info(f" 🌊 Streamer: Processed {count} keyspaces")
175194
await asyncio.sleep(0.1)
176-
logger.info(f"Streamer: Total {count} keyspaces")
195+
logger.info(f"Streamer: Completed ({count} keyspaces)")
177196
except Exception as e:
178-
logger.error(f"Streamer error: {e}")
197+
logger.error(f"Streamer error: {e}")
179198

180199
# Run workers concurrently
181200
await asyncio.gather(worker(1, 3), worker(2, 3), streamer(), return_exceptions=True)
182201

183-
logger.info("All concurrent operations completed")
202+
logger.info("\n✅ All concurrent operations completed successfully!")
203+
logger.info("\n💡 Key insight: Multiple operations can safely share a session!")
184204

185205

186206
async def main():
187207
"""Run all demonstrations."""
188-
logger.info("Starting Context Manager Safety Demonstration")
208+
logger.info("\n" + "=" * 80)
209+
logger.info("🛡️ CONTEXT MANAGER SAFETY DEMONSTRATION")
210+
logger.info("=" * 80)
189211

190212
# Get contact points from environment or use localhost
191213
contact_points = os.environ.get("CASSANDRA_CONTACT_POINTS", "localhost").split(",")
192214
port = int(os.environ.get("CASSANDRA_PORT", "9042"))
193215

194-
logger.info(f"Connecting to Cassandra at {contact_points}:{port}")
216+
logger.info(f"\n📡 Connecting to Cassandra at {contact_points}:{port}")
195217

196218
# Use cluster in context manager for automatic cleanup
197219
async with AsyncCluster(contact_points, port=port) as cluster:
@@ -200,12 +222,15 @@ async def main():
200222
await demonstrate_context_manager_isolation(cluster)
201223
await demonstrate_concurrent_safety(cluster)
202224

203-
logger.info("\nAll demonstrations completed successfully!")
204-
logger.info("Key takeaways:")
205-
logger.info("1. Query errors don't close sessions")
206-
logger.info("2. Streaming errors don't close sessions")
207-
logger.info("3. Context managers only close their own resources")
208-
logger.info("4. Multiple operations can safely share sessions and clusters")
225+
logger.info("\n" + "=" * 80)
226+
logger.info("✅ ALL DEMONSTRATIONS COMPLETED SUCCESSFULLY!")
227+
logger.info("=" * 80)
228+
logger.info("\n🎯 Key Takeaways:")
229+
logger.info(" 1. Query errors don't close sessions")
230+
logger.info(" 2. Streaming errors don't close sessions")
231+
logger.info(" 3. Context managers only close their own resources")
232+
logger.info(" 4. Multiple operations can safely share sessions and clusters")
233+
logger.info("\n💡 Best Practice: Always use context managers for proper resource management!")
209234

210235

211236
if __name__ == "__main__":

0 commit comments

Comments
 (0)