Skip to content

Commit dad425d

Browse files
committed
tests again
1 parent cf7d75e commit dad425d

File tree

4 files changed

+1185
-2
lines changed

4 files changed

+1185
-2
lines changed

src/async_cassandra/result.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def _handle_page(self, rows: List[Any]) -> None:
5858
else:
5959
# All pages fetched
6060
# Create a copy of rows to avoid reference issues
61-
final_result = AsyncResultSet(list(self.rows))
61+
final_result = AsyncResultSet(list(self.rows), self.response_future)
6262

6363
if self._future and not self._future.done():
6464
loop = getattr(self, "_loop", None)
@@ -141,9 +141,10 @@ class AsyncResultSet:
141141
Provides async iteration over result rows and metadata access.
142142
"""
143143

144-
def __init__(self, rows: List[Any]):
144+
def __init__(self, rows: List[Any], response_future: Any = None):
145145
self._rows = rows
146146
self._index = 0
147+
self._response_future = response_future
147148

148149
def __aiter__(self) -> AsyncIterator[Any]:
149150
"""Return async iterator for the result set."""
@@ -189,3 +190,14 @@ def all(self) -> List[Any]:
189190
List of all rows in the result set.
190191
"""
191192
return self._rows
193+
194+
def get_query_trace(self) -> Any:
195+
"""
196+
Get the query trace if available.
197+
198+
Returns:
199+
Query trace object or None if tracing wasn't enabled.
200+
"""
201+
if self._response_future and hasattr(self._response_future, "get_query_trace"):
202+
return self._response_future.get_query_trace()
203+
return None
Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
"""
2+
Integration tests for batch operations.
3+
4+
Tests logged batches, unlogged batches, and counter batches.
5+
"""
6+
7+
import asyncio
8+
import uuid
9+
10+
import pytest
11+
from cassandra import ConsistencyLevel
12+
from cassandra.query import BatchStatement, BatchType, SimpleStatement
13+
14+
15+
@pytest.mark.integration
16+
class TestBatchOperations:
17+
"""Test various batch operation types."""
18+
19+
@pytest.fixture
20+
async def batch_tables(self, session_with_keyspace):
21+
"""Create tables for batch testing."""
22+
session, keyspace = session_with_keyspace
23+
24+
# Regular table
25+
regular_table = f"batch_regular_{uuid.uuid4().hex[:8]}"
26+
await session.execute(
27+
f"""
28+
CREATE TABLE {regular_table} (
29+
id int PRIMARY KEY,
30+
name text,
31+
value int
32+
)
33+
"""
34+
)
35+
36+
# Counter table
37+
counter_table = f"batch_counters_{uuid.uuid4().hex[:8]}"
38+
await session.execute(
39+
f"""
40+
CREATE TABLE {counter_table} (
41+
id text PRIMARY KEY,
42+
count1 counter,
43+
count2 counter
44+
)
45+
"""
46+
)
47+
48+
yield regular_table, counter_table
49+
50+
# Cleanup
51+
await session.execute(f"DROP TABLE IF EXISTS {regular_table}")
52+
await session.execute(f"DROP TABLE IF EXISTS {counter_table}")
53+
54+
@pytest.mark.asyncio
55+
async def test_logged_batch(self, session_with_keyspace, batch_tables):
56+
"""Test logged batch operations (default)."""
57+
session, _ = session_with_keyspace
58+
regular_table, _ = batch_tables
59+
60+
# Create logged batch (default)
61+
batch = BatchStatement()
62+
63+
# Prepare statement
64+
insert_stmt = await session.prepare(
65+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
66+
)
67+
68+
# Add multiple statements
69+
batch.add(insert_stmt, (1, "Alice", 100))
70+
batch.add(insert_stmt, (2, "Bob", 200))
71+
batch.add(insert_stmt, (3, "Charlie", 300))
72+
73+
# Execute batch
74+
await session.execute(batch)
75+
76+
# Verify all inserts
77+
for id_val in [1, 2, 3]:
78+
result = await session.execute(f"SELECT * FROM {regular_table} WHERE id = {id_val}")
79+
row = result.one()
80+
assert row is not None
81+
assert row.id == id_val
82+
83+
@pytest.mark.asyncio
84+
async def test_unlogged_batch(self, session_with_keyspace, batch_tables):
85+
"""Test unlogged batch operations for performance."""
86+
session, _ = session_with_keyspace
87+
regular_table, _ = batch_tables
88+
89+
# Create unlogged batch
90+
batch = BatchStatement(batch_type=BatchType.UNLOGGED)
91+
92+
# Prepare statement
93+
insert_stmt = await session.prepare(
94+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
95+
)
96+
97+
# Add statements - unlogged batches are good for multiple partitions
98+
for i in range(10):
99+
batch.add(insert_stmt, (100 + i, f"User{i}", i * 10))
100+
101+
# Execute batch
102+
await session.execute(batch)
103+
104+
# Verify inserts
105+
result = await session.execute(
106+
f"SELECT COUNT(*) FROM {regular_table} WHERE id >= 100 AND id < 110 ALLOW FILTERING"
107+
)
108+
count = result.one()[0]
109+
assert count == 10
110+
111+
@pytest.mark.asyncio
112+
async def test_counter_batch(self, session_with_keyspace, batch_tables):
113+
"""Test counter batch operations."""
114+
session, _ = session_with_keyspace
115+
_, counter_table = batch_tables
116+
117+
# Create counter batch
118+
batch = BatchStatement(batch_type=BatchType.COUNTER)
119+
120+
# Prepare counter statements
121+
update_count1_stmt = await session.prepare(
122+
f"UPDATE {counter_table} SET count1 = count1 + ? WHERE id = ?"
123+
)
124+
update_count2_stmt = await session.prepare(
125+
f"UPDATE {counter_table} SET count2 = count2 + ? WHERE id = ?"
126+
)
127+
128+
# Add counter updates
129+
batch.add(update_count1_stmt, (5, "counter1"))
130+
batch.add(update_count2_stmt, (10, "counter1"))
131+
batch.add(update_count1_stmt, (3, "counter2"))
132+
133+
# Execute counter batch
134+
await session.execute(batch)
135+
136+
# Verify counters
137+
result = await session.execute(f"SELECT * FROM {counter_table} WHERE id = 'counter1'")
138+
row = result.one()
139+
assert row.count1 == 5
140+
assert row.count2 == 10
141+
142+
result = await session.execute(f"SELECT * FROM {counter_table} WHERE id = 'counter2'")
143+
row = result.one()
144+
assert row.count1 == 3
145+
assert row.count2 is None # Not updated
146+
147+
@pytest.mark.asyncio
148+
async def test_mixed_batch_types_error(self, session_with_keyspace, batch_tables):
149+
"""Test that mixing regular and counter operations in same batch fails."""
150+
session, _ = session_with_keyspace
151+
regular_table, counter_table = batch_tables
152+
153+
# Try to mix regular and counter operations
154+
batch = BatchStatement()
155+
156+
# This test needs to use SimpleStatement to demonstrate the error
157+
batch.add(
158+
SimpleStatement(f"INSERT INTO {regular_table} (id, name, value) VALUES (%s, %s, %s)"),
159+
(999, "Test", 999),
160+
)
161+
162+
batch.add(
163+
SimpleStatement(f"UPDATE {counter_table} SET count1 = count1 + %s WHERE id = %s"),
164+
(1, "error-test"),
165+
)
166+
167+
# Should fail
168+
with pytest.raises(Exception) as exc_info:
169+
await session.execute(batch)
170+
171+
assert "counter" in str(exc_info.value).lower() or "batch" in str(exc_info.value).lower()
172+
173+
@pytest.mark.asyncio
174+
async def test_batch_with_prepared_statements(self, session_with_keyspace, batch_tables):
175+
"""Test batches with prepared statements."""
176+
session, _ = session_with_keyspace
177+
regular_table, _ = batch_tables
178+
179+
# Prepare statements
180+
insert_stmt = await session.prepare(
181+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
182+
)
183+
update_stmt = await session.prepare(f"UPDATE {regular_table} SET value = ? WHERE id = ?")
184+
185+
# Create batch with prepared statements
186+
batch = BatchStatement()
187+
188+
# Add prepared statements
189+
batch.add(insert_stmt, (200, "PrepUser1", 1000))
190+
batch.add(insert_stmt, (201, "PrepUser2", 2000))
191+
batch.add(update_stmt, (3000, 200)) # Update first one
192+
193+
await session.execute(batch)
194+
195+
# Verify
196+
result = await session.execute(f"SELECT * FROM {regular_table} WHERE id = 200")
197+
row = result.one()
198+
assert row.name == "PrepUser1"
199+
assert row.value == 3000 # Updated value
200+
201+
@pytest.mark.asyncio
202+
async def test_batch_consistency_levels(self, session_with_keyspace, batch_tables):
203+
"""Test batch operations with different consistency levels."""
204+
session, _ = session_with_keyspace
205+
regular_table, _ = batch_tables
206+
207+
# Prepare statement
208+
insert_stmt = await session.prepare(
209+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
210+
)
211+
212+
# Create batch with specific consistency
213+
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
214+
215+
batch.add(insert_stmt, (300, "ConsistencyTest", 300))
216+
217+
await session.execute(batch)
218+
219+
# Read with different consistency
220+
select_stmt = await session.prepare(f"SELECT * FROM {regular_table} WHERE id = ?")
221+
select_stmt.consistency_level = ConsistencyLevel.ONE
222+
223+
result = await session.execute(select_stmt, [300])
224+
assert result.one() is not None
225+
226+
@pytest.mark.asyncio
227+
async def test_large_batch_warning(self, session_with_keyspace, batch_tables):
228+
"""Test that large batches work but may generate warnings."""
229+
session, _ = session_with_keyspace
230+
regular_table, _ = batch_tables
231+
232+
# Create large unlogged batch
233+
batch = BatchStatement(batch_type=BatchType.UNLOGGED)
234+
235+
# Prepare statement
236+
insert_stmt = await session.prepare(
237+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
238+
)
239+
240+
# Add many statements (Cassandra warns at 50KB)
241+
for i in range(100):
242+
batch.add(insert_stmt, (1000 + i, f"LargeUser{i}" * 10, i)) # Make data larger
243+
244+
# Should succeed but may warn in logs
245+
await session.execute(batch)
246+
247+
# Verify some inserts
248+
result = await session.execute(
249+
f"SELECT COUNT(*) FROM {regular_table} WHERE id >= 1000 AND id < 1100 ALLOW FILTERING"
250+
)
251+
count = result.one()[0]
252+
assert count == 100
253+
254+
@pytest.mark.asyncio
255+
async def test_conditional_batch(self, session_with_keyspace, batch_tables):
256+
"""Test batch with conditional statements (LWT)."""
257+
session, _ = session_with_keyspace
258+
regular_table, _ = batch_tables
259+
260+
# Insert initial data
261+
insert_stmt = await session.prepare(
262+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?)"
263+
)
264+
await session.execute(insert_stmt, [400, "Conditional", 400])
265+
266+
# Test conditional insert (single statement, not batch)
267+
insert_if_not_exists = await session.prepare(
268+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?) IF NOT EXISTS"
269+
)
270+
result = await session.execute(insert_if_not_exists, (401, "NewConditional", 401))
271+
assert result.one().applied # Should succeed
272+
273+
# Test conditional update
274+
update_if_value = await session.prepare(
275+
f"UPDATE {regular_table} SET value = ? WHERE id = ? IF value = ?"
276+
)
277+
result = await session.execute(update_if_value, (500, 400, 400))
278+
assert result.one().applied # Should succeed
279+
280+
# Test failed conditional (current value is now 500, not 400)
281+
result = await session.execute(update_if_value, (600, 400, 400)) # Wrong current value
282+
assert not result.one().applied # Should fail
283+
284+
@pytest.mark.asyncio
285+
async def test_counter_batch_concurrent(self, session_with_keyspace, batch_tables):
286+
"""Test concurrent counter batch operations."""
287+
session, _ = session_with_keyspace
288+
_, counter_table = batch_tables
289+
290+
# Prepare statement outside the async function
291+
update_stmt = await session.prepare(
292+
f"UPDATE {counter_table} SET count1 = count1 + ? WHERE id = ?"
293+
)
294+
295+
async def increment_batch(batch_id):
296+
batch = BatchStatement(batch_type=BatchType.COUNTER)
297+
298+
# Each batch increments different counters
299+
for i in range(10):
300+
batch.add(update_stmt, (1, f"concurrent_{i}"))
301+
302+
await session.execute(batch)
303+
304+
# Execute 10 batches concurrently
305+
tasks = [increment_batch(i) for i in range(10)]
306+
await asyncio.gather(*tasks)
307+
308+
# Verify all counters
309+
for i in range(10):
310+
result = await session.execute(
311+
f"SELECT count1 FROM {counter_table} WHERE id = 'concurrent_{i}'"
312+
)
313+
row = result.one()
314+
assert row.count1 == 10 # Each was incremented 10 times
315+
316+
@pytest.mark.asyncio
317+
async def test_batch_with_custom_timestamp(self, session_with_keyspace, batch_tables):
318+
"""Test batch operations with custom timestamp."""
319+
session, _ = session_with_keyspace
320+
regular_table, _ = batch_tables
321+
322+
# Create batch with custom timestamp
323+
custom_timestamp = 1234567890123456 # microseconds
324+
batch = BatchStatement()
325+
326+
# Add statement with USING TIMESTAMP
327+
# Prepare statement with timestamp
328+
insert_with_timestamp = await session.prepare(
329+
f"INSERT INTO {regular_table} (id, name, value) VALUES (?, ?, ?) USING TIMESTAMP ?"
330+
)
331+
332+
# Add statement with custom timestamp
333+
batch.add(insert_with_timestamp, (500, "TimestampTest", 500, custom_timestamp))
334+
335+
await session.execute(batch)
336+
337+
# Verify insert (we can't easily verify the timestamp was used)
338+
result = await session.execute(f"SELECT * FROM {regular_table} WHERE id = 500")
339+
assert result.one() is not None

0 commit comments

Comments
 (0)