Skip to content

Commit f5d07f7

Browse files
committed
init
1 parent 0ad2b7a commit f5d07f7

File tree

1 file changed

+206
-59
lines changed

1 file changed

+206
-59
lines changed

libs/async-cassandra-dataframe/tests/integration/partitioning/test_automatic_partition_count.py

Lines changed: 206 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -29,77 +29,99 @@ class TestAutomaticPartitionCount:
2929
"""Test automatic partition count calculations based on token ranges."""
3030

3131
@pytest.mark.asyncio
32-
async def test_automatic_partition_count_small_table(self, session):
32+
async def test_automatic_partition_count_medium_table(self, session):
3333
"""
34-
Test that small tables get reasonable partition counts.
34+
Test partition counts with medium-sized dataset.
3535
36-
Given: A table with 1000 rows across 10 Cassandra partitions
36+
Given: A table with 20,000 rows across 100 Cassandra partitions
3737
When: Reading without specifying partition_count
38-
Then: Should create a reasonable number of Dask partitions based on token ranges
38+
Then: Should create multiple Dask partitions based on token ranges
3939
"""
4040

4141
# Create test table
4242
await session.execute(
4343
"""
44-
CREATE TABLE IF NOT EXISTS partition_test_small (
44+
CREATE TABLE IF NOT EXISTS partition_test_medium (
4545
partition_key INT,
4646
cluster_key INT,
4747
value TEXT,
48+
data TEXT,
4849
PRIMARY KEY (partition_key, cluster_key)
4950
)
5051
"""
5152
)
5253

53-
# Insert data - 10 partitions with 100 rows each
54+
# Insert data - 100 partitions with 200 rows each = 20,000 rows
5455
insert_stmt = await session.prepare(
5556
"""
56-
INSERT INTO partition_test_small (partition_key, cluster_key, value)
57-
VALUES (?, ?, ?)
57+
INSERT INTO partition_test_medium (partition_key, cluster_key, value, data)
58+
VALUES (?, ?, ?, ?)
5859
"""
5960
)
6061

61-
logger.info("Inserting 1000 rows across 10 partitions...")
62-
for partition in range(10):
63-
for cluster in range(100):
64-
await session.execute(
65-
insert_stmt, (partition, cluster, f"value_{partition}_{cluster}")
66-
)
62+
logger.info("Inserting 20,000 rows across 100 partitions...")
63+
# Use batching for efficiency
64+
from cassandra.query import BatchStatement
65+
66+
batch_size = 25 # Cassandra batch size limit
67+
rows_inserted = 0
68+
69+
for partition in range(100):
70+
for batch_start in range(0, 200, batch_size):
71+
batch = BatchStatement()
72+
for cluster in range(batch_start, min(batch_start + batch_size, 200)):
73+
batch.add(
74+
insert_stmt,
75+
(
76+
partition,
77+
cluster,
78+
f"value_{partition}_{cluster}",
79+
"x" * 500, # 500 bytes of data per row
80+
),
81+
)
82+
await session.execute(batch)
83+
rows_inserted += min(batch_size, 200 - batch_start)
84+
85+
if partition % 10 == 0:
86+
logger.info(f"Inserted partition {partition}/100 ({rows_inserted} total rows)")
6787

6888
# Read without specifying partition_count - should auto-calculate
69-
df = await cdf.read_cassandra_table("partition_test_small", session=session)
89+
df = await cdf.read_cassandra_table("partition_test_medium", session=session)
7090

71-
logger.info(f"Created {df.npartitions} Dask partitions automatically")
91+
logger.info(f"Created {df.npartitions} Dask partitions automatically for 20K rows")
7292

7393
# Verify we got all data
74-
result = df.compute()
75-
assert len(result) == 1000, f"Expected 1000 rows, got {len(result)}"
94+
total_rows = len(df)
95+
assert total_rows == 20000, f"Expected 20000 rows, got {total_rows}"
7696

77-
# With a single node cluster, we typically get 16-256 token ranges
78-
# The automatic calculation should create a reasonable number of partitions
79-
assert df.npartitions >= 1, "Should have at least 1 partition"
97+
# With 20K rows, should create multiple partitions
8098
assert (
81-
df.npartitions <= 50
82-
), f"Should not create too many partitions for small data, got {df.npartitions}"
99+
df.npartitions >= 2
100+
), f"Should have multiple partitions for 20K rows, got {df.npartitions}"
83101

84-
# Verify data is distributed across partitions
102+
# Log partition distribution
85103
partition_sizes = []
86104
for i in range(df.npartitions):
87105
partition_data = df.get_partition(i).compute()
88106
partition_sizes.append(len(partition_data))
89107
logger.info(f"Partition {i}: {len(partition_data)} rows")
90108

91-
# At least some partitions should have data
109+
# Check distribution
110+
avg_size = sum(partition_sizes) / len(partition_sizes)
111+
logger.info(f"Average partition size: {avg_size:.1f} rows")
112+
113+
# All partitions should have some data
92114
non_empty_partitions = sum(1 for size in partition_sizes if size > 0)
93-
assert non_empty_partitions >= 1, "Should have at least one non-empty partition"
115+
assert non_empty_partitions == df.npartitions, "All partitions should have data"
94116

95117
@pytest.mark.asyncio
96118
async def test_automatic_partition_count_large_table(self, session):
97119
"""
98-
Test that large tables get appropriate partition counts.
120+
Test partition counts with large dataset.
99121
100-
Given: A table with 50,000 rows across 100 Cassandra partitions
122+
Given: A table with 100,000 rows across 200 Cassandra partitions
101123
When: Reading without specifying partition_count
102-
Then: Should create more Dask partitions to handle the larger data volume
124+
Then: Should create appropriate number of Dask partitions for parallel processing
103125
"""
104126

105127
# Create test table
@@ -109,26 +131,32 @@ async def test_automatic_partition_count_large_table(self, session):
109131
partition_key INT,
110132
cluster_key INT,
111133
value TEXT,
112-
data BLOB,
134+
data TEXT,
135+
timestamp TIMESTAMP,
113136
PRIMARY KEY (partition_key, cluster_key)
114137
)
115138
"""
116139
)
117140

118-
# Insert data - 100 partitions with 500 rows each
141+
# Insert data - 200 partitions with 500 rows each = 100,000 rows
119142
insert_stmt = await session.prepare(
120143
"""
121-
INSERT INTO partition_test_large (partition_key, cluster_key, value, data)
122-
VALUES (?, ?, ?, ?)
144+
INSERT INTO partition_test_large (partition_key, cluster_key, value, data, timestamp)
145+
VALUES (?, ?, ?, ?, ?)
123146
"""
124147
)
125148

126-
logger.info("Inserting 50,000 rows across 100 partitions...")
149+
logger.info("Inserting 100,000 rows across 200 partitions...")
127150
# Insert in batches for efficiency
151+
from datetime import UTC, datetime
152+
128153
from cassandra.query import BatchStatement
129154

130-
batch_size = 100
131-
for partition in range(100):
155+
batch_size = 25
156+
rows_inserted = 0
157+
now = datetime.now(UTC)
158+
159+
for partition in range(200):
132160
for batch_start in range(0, 500, batch_size):
133161
batch = BatchStatement()
134162
for cluster in range(batch_start, min(batch_start + batch_size, 500)):
@@ -138,36 +166,51 @@ async def test_automatic_partition_count_large_table(self, session):
138166
partition,
139167
cluster,
140168
f"value_{partition}_{cluster}",
141-
b"x" * 100, # 100 bytes of data
169+
"x" * 1000, # 1KB of data per row
170+
now,
142171
),
143172
)
144173
await session.execute(batch)
174+
rows_inserted += min(batch_size, 500 - batch_start)
145175

146-
if partition % 10 == 0:
147-
logger.info(f"Inserted partition {partition}/100")
176+
if partition % 20 == 0:
177+
logger.info(f"Inserted partition {partition}/200 ({rows_inserted} total rows)")
148178

149179
# Read without specifying partition_count
150180
df = await cdf.read_cassandra_table(
151181
"partition_test_large",
152182
session=session,
153-
columns=["partition_key", "cluster_key", "value"], # Skip blob for performance
183+
columns=["partition_key", "cluster_key", "value"], # Skip large data column
154184
)
155185

156-
logger.info(f"Created {df.npartitions} Dask partitions automatically for large table")
186+
logger.info(f"Created {df.npartitions} Dask partitions automatically for 100K rows")
157187

158-
# Verify partition count is reasonable for larger data
159-
# Should create more partitions for larger tables
188+
# With 100K rows, should create multiple partitions for parallel processing
160189
assert (
161190
df.npartitions >= 2
162-
), f"Should have multiple partitions for large data, got {df.npartitions}"
191+
), f"Should have multiple partitions for 100K rows, got {df.npartitions}"
163192

164-
# Compute a sample to verify data
165-
sample = df.head(1000)
166-
assert len(sample) == 1000, f"Expected 1000 rows in sample, got {len(sample)}"
193+
# Log partition statistics
194+
partition_sizes = []
195+
min_rows = float("inf")
196+
max_rows = 0
197+
198+
for i in range(df.npartitions):
199+
partition_data = df.get_partition(i).compute()
200+
size = len(partition_data)
201+
partition_sizes.append(size)
202+
min_rows = min(min_rows, size)
203+
max_rows = max(max_rows, size)
204+
if i < 5 or i >= df.npartitions - 5: # Log first and last 5 partitions
205+
logger.info(f"Partition {i}: {size} rows")
206+
207+
# Calculate statistics
208+
avg_size = sum(partition_sizes) / len(partition_sizes)
209+
logger.info(f"Partition statistics: min={min_rows}, max={max_rows}, avg={avg_size:.1f}")
167210

168211
# Check total count
169-
total_rows = len(df)
170-
assert total_rows == 50000, f"Expected 50000 rows, got {total_rows}"
212+
total_rows = sum(partition_sizes)
213+
assert total_rows == 100000, f"Expected 100000 rows, got {total_rows}"
171214

172215
@pytest.mark.asyncio
173216
async def test_partition_count_with_token_ranges(self, session):
@@ -198,12 +241,20 @@ async def test_partition_count_with_token_ranges(self, session):
198241
"""
199242
)
200243

201-
logger.info("Inserting 5000 rows with random UUIDs for even token distribution...")
202-
for i in range(5000):
203-
await session.execute(insert_stmt, (uuid.uuid4(), f"value_{i}"))
244+
logger.info("Inserting 20,000 rows with random UUIDs for even token distribution...")
245+
# Batch inserts for better performance
246+
from cassandra.query import BatchStatement
247+
248+
batch_size = 25
249+
for i in range(0, 20000, batch_size):
250+
batch = BatchStatement()
251+
for j in range(batch_size):
252+
if i + j < 20000:
253+
batch.add(insert_stmt, (uuid.uuid4(), f"value_{i + j}"))
254+
await session.execute(batch)
204255

205-
if i % 1000 == 0:
206-
logger.info(f"Inserted {i}/5000 rows")
256+
if i % 2000 == 0:
257+
logger.info(f"Inserted {i}/20000 rows")
207258

208259
# Read and let it calculate partitions based on token ranges
209260
df = await cdf.read_cassandra_table("partition_test_tokens", session=session)
@@ -324,15 +375,35 @@ async def test_partition_count_with_filtering(self, session):
324375
"""
325376
)
326377

327-
logger.info("Inserting data across multiple years...")
378+
logger.info("Inserting 30,000+ rows across multiple years...")
379+
# Batch inserts for efficiency - 3 years * 12 months * 28 days * 30 events = 30,240 rows
380+
from cassandra.query import BatchStatement
381+
382+
batch_size = 25
383+
total_rows = 0
384+
328385
for year in [2022, 2023, 2024]:
329386
for month in range(1, 13):
330387
for day in range(1, 29): # Simplified - 28 days per month
331-
for _ in range(10): # 10 events per day
332-
await session.execute(
388+
batch = BatchStatement()
389+
for event in range(30): # 30 events per day
390+
batch.add(
333391
insert_stmt,
334-
(year, month, day, uuid.uuid4(), f"event_{year}_{month}_{day}"),
392+
(year, month, day, uuid.uuid4(), f"event_{year}_{month}_{day}_{event}"),
335393
)
394+
total_rows += 1
395+
396+
# Execute batch when full
397+
if len(batch) >= batch_size:
398+
await session.execute(batch)
399+
batch = BatchStatement()
400+
401+
# Execute remaining items in batch
402+
if batch:
403+
await session.execute(batch)
404+
405+
if month % 3 == 0:
406+
logger.info(f"Inserted {year}/{month} - {total_rows} total rows")
336407

337408
# Read all data - should create multiple partitions
338409
df_all = await cdf.read_cassandra_table("partition_test_filtered", session=session)
@@ -354,7 +425,9 @@ async def test_partition_count_with_filtering(self, session):
354425

355426
# Verify filtering worked
356427
assert len(df_filtered) < len(df_all)
357-
assert len(df_filtered) == 28 * 12 * 10 # 28 days * 12 months * 10 events
428+
assert (
429+
len(df_filtered) == 28 * 12 * 30
430+
) # 28 days * 12 months * 30 events = 10,080 rows for 2024
358431

359432
@pytest.mark.asyncio
360433
async def test_partition_memory_limits(self, session):
@@ -407,3 +480,77 @@ async def test_partition_memory_limits(self, session):
407480
# Verify we still get all data
408481
assert len(df_default) == 1000
409482
assert len(df_low_memory) == 1000
483+
484+
@pytest.mark.asyncio
485+
async def test_partition_count_scales_with_data(self, session):
486+
"""
487+
Test that partition count scales appropriately with data volume.
488+
489+
Given: Tables with different data volumes (1K, 10K, 50K rows)
490+
When: Reading with automatic partition calculation
491+
Then: Partition count should increase with data volume
492+
"""
493+
494+
# Test with three different data sizes
495+
test_cases = [
496+
(1000, "small"), # 1K rows
497+
(10000, "medium"), # 10K rows
498+
(50000, "large"), # 50K rows
499+
]
500+
501+
partition_counts = {}
502+
503+
for row_count, size_name in test_cases:
504+
table_name = f"partition_test_scale_{size_name}"
505+
506+
# Create table
507+
await session.execute(
508+
f"""
509+
CREATE TABLE IF NOT EXISTS {table_name} (
510+
id INT PRIMARY KEY,
511+
data TEXT
512+
)
513+
"""
514+
)
515+
516+
# Insert data in batches
517+
insert_stmt = await session.prepare(
518+
f"""
519+
INSERT INTO {table_name} (id, data) VALUES (?, ?)
520+
"""
521+
)
522+
523+
logger.info(f"Inserting {row_count} rows for {size_name} dataset...")
524+
525+
from cassandra.query import BatchStatement
526+
527+
batch_size = 100
528+
529+
for i in range(0, row_count, batch_size):
530+
batch = BatchStatement()
531+
for j in range(min(batch_size, row_count - i)):
532+
batch.add(insert_stmt, (i + j, "x" * 200)) # 200 bytes per row
533+
await session.execute(batch)
534+
535+
if i % 10000 == 0 and i > 0:
536+
logger.info(f" Inserted {i}/{row_count} rows")
537+
538+
# Read with automatic partitioning
539+
df = await cdf.read_cassandra_table(table_name, session=session)
540+
partition_counts[size_name] = df.npartitions
541+
542+
logger.info(f"{size_name} dataset ({row_count} rows): {df.npartitions} partitions")
543+
544+
# Verify row count
545+
assert len(df) == row_count, f"Expected {row_count} rows, got {len(df)}"
546+
547+
# Verify partition count scaling
548+
logger.info(f"Partition count scaling: {partition_counts}")
549+
550+
# Larger datasets should have same or more partitions
551+
assert (
552+
partition_counts["medium"] >= partition_counts["small"]
553+
), f"Medium dataset should have >= partitions than small: {partition_counts}"
554+
assert (
555+
partition_counts["large"] >= partition_counts["medium"]
556+
), f"Large dataset should have >= partitions than medium: {partition_counts}"

0 commit comments

Comments
 (0)