Skip to content

Commit 1c4a51b

Browse files
committed
updating streaming and architecture docs
1 parent 7453eea commit 1c4a51b

File tree

2 files changed

+256
-52
lines changed

2 files changed

+256
-52
lines changed

docs/architecture.md

Lines changed: 192 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,134 @@ sequenceDiagram
3535

3636
## Solution Architecture
3737

38-
async-cassandra wraps the driver's async operations to provide true async/await support:
38+
async-cassandra bridges the gap between the DataStax driver's thread-based callbacks and Python's asyncio event loop. Here's how it actually works:
39+
40+
### The Bridge Pattern
41+
42+
The DataStax driver uses a ThreadPoolExecutor for I/O operations and callbacks. We don't replace this - instead, we bridge between the driver's threading model and asyncio:
3943

4044
```mermaid
4145
sequenceDiagram
4246
participant App as Async Application
4347
participant AsyncWrapper as async-cassandra
4448
participant Driver as Cassandra Driver
45-
participant EventLoop as Event Loop
49+
participant ThreadPool as Driver Thread Pool
50+
participant EventLoop as Asyncio Event Loop
4651
participant Cassandra as Cassandra DB
4752
4853
App->>AsyncWrapper: await execute(query)
4954
AsyncWrapper->>Driver: execute_async(query)
50-
Note over AsyncWrapper: Create Future
51-
Driver->>Cassandra: Send query (non-blocking)
52-
AsyncWrapper-->>EventLoop: Register callback
53-
EventLoop-->>App: Control returned
54-
Note over App: Can handle other requests
55-
Cassandra-->>Driver: Response
56-
Driver-->>AsyncWrapper: Callback triggered
57-
AsyncWrapper-->>EventLoop: Set Future result
58-
EventLoop-->>App: Resume coroutine
55+
Driver->>ThreadPool: Submit I/O task
56+
ThreadPool->>Cassandra: Send query (in thread)
57+
AsyncWrapper->>EventLoop: Create asyncio.Future
58+
AsyncWrapper-->>App: Return control (await)
59+
60+
Note over App: Free to handle other requests
61+
62+
Cassandra-->>ThreadPool: Response data
63+
ThreadPool->>AsyncWrapper: Callback fired (in driver thread)
64+
AsyncWrapper->>EventLoop: call_soon_threadsafe(set_result)
65+
EventLoop-->>App: Resume coroutine with result
66+
```
67+
68+
### Key Implementation Details
69+
70+
1. **We DON'T create our own thread pool** - we use the driver's existing ThreadPoolExecutor
71+
2. **Thread-safe communication** - Callbacks from driver threads use `call_soon_threadsafe()` to safely communicate with the asyncio event loop
72+
3. **Futures bridging** - We create asyncio Futures that are resolved by callbacks from driver threads
73+
74+
## How the Bridge Works: Code Deep Dive
75+
76+
### 1. Query Execution Flow
77+
78+
When you call `await session.execute(query)`, here's what happens:
79+
80+
```python
81+
# In AsyncCassandraSession.execute() - src/async_cassandra/session.py:156-168
82+
response_future = self._session.execute_async(
83+
query, parameters, trace, custom_payload, timeout,
84+
execution_profile, paging_state, host, execute_as
85+
)
86+
87+
handler = AsyncResultHandler(response_future)
88+
result = await handler.get_result(timeout=query_timeout)
89+
```
90+
91+
The DataStax driver's `execute_async()` returns a `ResponseFuture` that will be completed by a driver thread.
92+
93+
### 2. The Bridge: AsyncResultHandler
94+
95+
The magic happens in `AsyncResultHandler` ([src/async_cassandra/result.py](../src/async_cassandra/result.py)):
96+
97+
```python
98+
class AsyncResultHandler:
99+
def __init__(self, response_future: ResponseFuture):
100+
self.response_future = response_future
101+
self.rows: List[Any] = []
102+
self._future: Optional[asyncio.Future[AsyncResultSet]] = None
103+
self._lock = threading.Lock() # Thread safety!
104+
105+
# Register callbacks with the driver
106+
self.response_future.add_callbacks(
107+
callback=self._handle_page,
108+
errback=self._handle_error
109+
)
110+
```
111+
112+
### 3. Thread-Safe Callback Handling
113+
114+
When the driver completes the query (in a driver thread), our callback is invoked:
115+
116+
```python
117+
def _handle_page(self, rows: List[Any]) -> None:
118+
"""Called from driver thread - must be thread-safe!"""
119+
with self._lock:
120+
if rows is not None:
121+
self.rows.extend(list(rows)) # Defensive copy
122+
123+
if self.response_future.has_more_pages:
124+
self.response_future.start_fetching_next_page()
125+
else:
126+
# All done - notify the asyncio Future
127+
final_result = AsyncResultSet(list(self.rows), self.response_future)
128+
129+
if self._future and not self._future.done():
130+
loop = getattr(self, "_loop", None)
131+
if loop:
132+
# CRITICAL: Use call_soon_threadsafe to bridge threads!
133+
loop.call_soon_threadsafe(self._future.set_result, final_result)
59134
```
60135

136+
### 4. The Asyncio Side
137+
138+
Meanwhile, the asyncio coroutine is waiting:
139+
140+
```python
141+
async def get_result(self, timeout: Optional[float] = None) -> "AsyncResultSet":
142+
# Create asyncio Future in the current event loop
143+
loop = asyncio.get_running_loop()
144+
self._future = loop.create_future()
145+
self._loop = loop # Store for callbacks to use
146+
147+
# Wait for the driver thread to complete
148+
if timeout is not None:
149+
return await asyncio.wait_for(self._future, timeout=timeout)
150+
else:
151+
return await self._future
152+
```
153+
154+
### 5. Driver Thread Pool Configuration
155+
156+
The driver's thread pool size is configurable ([src/async_cassandra/cluster.py](../src/async_cassandra/cluster.py)):
157+
158+
```python
159+
def __init__(self, ..., executor_threads: int = 2, ...):
160+
# This is passed to the DataStax Cluster constructor
161+
# The driver creates: ThreadPoolExecutor(max_workers=executor_threads)
162+
```
163+
164+
**Important**: This thread pool is shared for ALL I/O operations, so under high concurrency, you may need to increase `executor_threads`.
165+
61166
## Key Components
62167

63168
### AsyncCluster
@@ -94,22 +199,28 @@ sequenceDiagram
94199
participant User as User Code
95200
participant Session as AsyncCassandraSession
96201
participant Handler as AsyncResultHandler
202+
participant EventLoop as Event Loop
97203
participant Driver as Cassandra Driver
204+
participant ThreadPool as Driver Thread Pool
98205
participant DB as Cassandra
99206
100207
User->>Session: await execute(query)
101208
Session->>Driver: execute_async(query)
209+
Driver->>ThreadPool: Submit I/O task
210+
ThreadPool->>DB: Send CQL query
102211
Driver-->>Session: ResponseFuture
103212
Session->>Handler: new AsyncResultHandler(ResponseFuture)
104-
Handler->>Handler: Register callbacks
105-
Session-->>User: Return Future
213+
Handler->>Driver: add_callbacks(callback, errback)
214+
Handler->>EventLoop: create_future()
215+
Session-->>User: await handler.get_result()
106216
107-
Note over User,DB: Async execution in progress
217+
Note over User: Coroutine suspended, event loop free
108218
109-
DB-->>Driver: Query result
110-
Driver-->>Handler: Trigger callback
111-
Handler->>Handler: Process result/pages
112-
Handler-->>User: Resolve Future with AsyncResultSet
219+
DB-->>ThreadPool: Query result
220+
ThreadPool->>Handler: _handle_page(rows) [in driver thread]
221+
Handler->>Handler: Lock, process rows
222+
Handler->>EventLoop: call_soon_threadsafe(future.set_result)
223+
EventLoop-->>User: Resume coroutine with AsyncResultSet
113224
```
114225

115226
### Streaming Execution
@@ -121,26 +232,34 @@ sequenceDiagram
121232
participant App as Application
122233
participant Session as AsyncCassandraSession
123234
participant Stream as AsyncStreamingResultSet
124-
participant Handler as StreamingResultHandler
235+
participant EventLoop as Event Loop
125236
participant Driver as Cassandra Driver
237+
participant ThreadPool as Driver Thread Pool
126238
participant DB as Cassandra
127239
128240
App->>Session: await execute_stream(query, config)
129241
Session->>Driver: execute_async(query)
242+
Driver->>ThreadPool: Submit I/O task
243+
ThreadPool->>DB: Send CQL query
130244
Driver-->>Session: ResponseFuture
131-
Session->>Handler: StreamingResultHandler(future, config)
132-
Handler->>Stream: Create AsyncStreamingResultSet
245+
Session->>Stream: Create AsyncStreamingResultSet(future, config)
246+
Stream->>Driver: add_callbacks(callback, errback)
133247
Session-->>App: Return Stream
134248
135-
loop For each page request
249+
loop For each row iteration
136250
App->>Stream: async for row in stream
137-
Stream->>Handler: Request next page
138-
Handler->>Driver: Fetch page asynchronously
139-
Driver->>DB: Get page (fetch_size rows)
140-
DB-->>Driver: Page data
141-
Driver-->>Handler: Page received
142-
Handler-->>Stream: Yield rows
143-
Stream-->>App: Return row
251+
alt Current page has rows
252+
Stream-->>App: Return next row
253+
else Page exhausted, need next page
254+
Stream->>EventLoop: Create page_ready Event
255+
Note over App: Coroutine suspended
256+
DB-->>ThreadPool: Next page data
257+
ThreadPool->>Stream: _handle_page(rows) [in driver thread]
258+
Stream->>Stream: Lock, replace current page
259+
Stream->>EventLoop: call_soon_threadsafe(page_ready.set)
260+
EventLoop->>Stream: Resume iteration
261+
Stream-->>App: Return first row of new page
262+
end
144263
end
145264
```
146265

@@ -171,12 +290,55 @@ sequenceDiagram
171290
- Metrics and monitoring built-in
172291
- Battle-tested retry policies
173292

293+
## Thread Pool Architecture Summary
294+
295+
### The Two Worlds
296+
297+
async-cassandra bridges two distinct execution environments:
298+
299+
1. **Asyncio World** (Your Application)
300+
- Single-threaded event loop
301+
- Coroutines and async/await
302+
- Non-blocking I/O via event loop
303+
- Can handle thousands of concurrent operations
304+
305+
2. **Driver Thread Pool World** (DataStax Driver)
306+
- ThreadPoolExecutor with `executor_threads` workers (default: 2)
307+
- Blocking I/O operations
308+
- Callback-based completion
309+
- Limited by thread count
310+
311+
### The Bridge Mechanism
312+
313+
We don't "map" thread pools - we **bridge** between them:
314+
315+
```
316+
Your Async Code → asyncio.Future → Callback Bridge → Driver ResponseFuture → Thread Pool
317+
↑ ↓
318+
└──────── call_soon_threadsafe() ←──────────────────────┘
319+
```
320+
321+
Key bridging components:
322+
- **asyncio.Future**: Created in the event loop, awaited by your code
323+
- **ResponseFuture**: Created by the driver, completed by driver threads
324+
- **Callbacks**: Registered with ResponseFuture, called from driver threads
325+
- **call_soon_threadsafe()**: The critical method that safely notifies the event loop from driver threads
326+
327+
### Performance Implications
328+
329+
Since we use the driver's thread pool:
330+
- Maximum concurrent I/O operations = `executor_threads` (default: 2)
331+
- Under high load, increase `executor_threads` when creating the cluster
332+
- Each blocking operation ties up a thread until completion
333+
- The event loop stays free, but total throughput is limited by thread count
334+
174335
## Important Limitations
175336

176337
While async-cassandra provides async/await syntax, it's important to understand:
177338

178339
1. **The underlying I/O is still synchronous** - The DataStax driver uses blocking sockets in threads
179-
2. **Thread pool constraints apply** - Concurrency is limited by the driver's thread pool size
340+
2. **Thread pool constraints apply** - Concurrency is limited by the driver's thread pool size (default: 2 threads)
180341
3. **Not a true async driver** - This is a compatibility layer, not a ground-up async implementation
342+
4. **No thread pool multiplication** - We use the driver's thread pool as-is, we don't add additional threads
181343

182344
For more details on these limitations and when to use this wrapper, see [Why an Async Wrapper is Necessary](why-async-wrapper.md).

0 commit comments

Comments
 (0)