Skip to content

Commit 6519960

Browse files
committed
init
1 parent f5d07f7 commit 6519960

File tree

8 files changed

+1790
-2
lines changed

8 files changed

+1790
-2
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# SPLIT Partitioning Strategy
2+
3+
The SPLIT strategy provides manual control over Dask partition count by splitting each Cassandra token range into N sub-partitions.
4+
5+
## When to Use
6+
7+
Use the SPLIT strategy when:
8+
- Automatic partition calculations are too conservative
9+
- You need more parallelism for large datasets
10+
- Token ranges contain uneven data distribution
11+
- You want fine-grained control over partition count
12+
13+
## Usage
14+
15+
```python
16+
import async_cassandra_dataframe as cdf
17+
18+
# Split each token range into 3 sub-partitions
19+
df = await cdf.read_cassandra_table(
20+
"my_table",
21+
session=session,
22+
partitioning_strategy="split", # Use SPLIT strategy
23+
split_factor=3, # Split each range into 3
24+
)
25+
26+
# Example: 17 token ranges * 3 splits = 51 Dask partitions
27+
```
28+
29+
## How It Works
30+
31+
1. Discovers natural token ranges from Cassandra cluster
32+
2. Splits each token range into N equal sub-ranges
33+
3. Creates one Dask partition per sub-range
34+
35+
## Examples
36+
37+
### Basic Usage
38+
```python
39+
# Default AUTO strategy (conservative)
40+
df_auto = await cdf.read_cassandra_table("my_table", session=session)
41+
# Result: 2 partitions for medium dataset
42+
43+
# SPLIT strategy with factor 5
44+
df_split = await cdf.read_cassandra_table(
45+
"my_table",
46+
session=session,
47+
partitioning_strategy="split",
48+
split_factor=5,
49+
)
50+
# Result: 85 partitions (17 ranges * 5)
51+
```
52+
53+
### High Parallelism
54+
```python
55+
# For CPU-intensive processing, increase parallelism
56+
df = await cdf.read_cassandra_table(
57+
"large_table",
58+
session=session,
59+
partitioning_strategy="split",
60+
split_factor=10, # 10x more partitions
61+
)
62+
63+
# Process with Dask
64+
result = df.map_partitions(expensive_computation).compute()
65+
```
66+
67+
### Comparison with Other Strategies
68+
69+
| Strategy | Use Case | Partition Count |
70+
|----------|----------|-----------------|
71+
| AUTO | General purpose | Conservative (2-10) |
72+
| NATURAL | Maximum parallelism | One per token range |
73+
| COMPACT | Memory-bounded | Based on target size |
74+
| FIXED | Specific count | User-specified |
75+
| SPLIT | Manual control | Token ranges * split_factor |
76+
77+
## Performance Considerations
78+
79+
- Higher split_factor = more parallelism but also more overhead
80+
- Each partition requires a separate Cassandra query
81+
- Optimal split_factor depends on:
82+
- Data volume per token range
83+
- Available CPU cores
84+
- Processing complexity
85+
- Network latency
86+
87+
## Recommendations
88+
89+
- Start with split_factor=2-5 for most cases
90+
- Use 10+ for CPU-intensive processing on large clusters
91+
- Monitor partition sizes with logging
92+
- Adjust based on performance measurements

libs/async-cassandra-dataframe/src/async_cassandra_dataframe/partition_strategy.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class PartitioningStrategy(str, Enum):
2323
NATURAL = "natural" # One partition per token range
2424
COMPACT = "compact" # Balance parallelism and overhead
2525
FIXED = "fixed" # User-specified partition count
26+
SPLIT = "split" # Split each token range into N sub-partitions
2627

2728

2829
@dataclass
@@ -70,6 +71,7 @@ def group_token_ranges(
7071
strategy: PartitioningStrategy = PartitioningStrategy.AUTO,
7172
target_partition_count: int | None = None,
7273
target_partition_size_mb: int | None = None,
74+
split_factor: int | None = None,
7375
) -> list[PartitionGroup]:
7476
"""
7577
Group token ranges into partitions based on strategy.
@@ -79,6 +81,7 @@ def group_token_ranges(
7981
strategy: Partitioning strategy to use
8082
target_partition_count: Desired number of partitions (for FIXED strategy)
8183
target_partition_size_mb: Target size per partition
84+
split_factor: Number of sub-partitions per token range (for SPLIT strategy)
8285
8386
Returns:
8487
List of partition groups
@@ -96,6 +99,10 @@ def group_token_ranges(
9699
if target_partition_count is None:
97100
raise ValueError("FIXED strategy requires target_partition_count")
98101
return self._fixed_grouping(token_ranges, target_partition_count)
102+
elif strategy == PartitioningStrategy.SPLIT:
103+
if split_factor is None:
104+
raise ValueError("SPLIT strategy requires split_factor")
105+
return self._split_grouping(token_ranges, split_factor)
99106
else: # AUTO
100107
return self._auto_grouping(token_ranges, target_size)
101108

@@ -260,6 +267,47 @@ def _auto_grouping(
260267
target_partitions = max(len(token_ranges) // 2, unique_nodes * 4)
261268
return self._fixed_grouping(token_ranges, target_partitions)
262269

270+
def _split_grouping(
271+
self, token_ranges: list[TokenRange], split_factor: int
272+
) -> list[PartitionGroup]:
273+
"""
274+
Split each token range into N sub-partitions.
275+
276+
Args:
277+
token_ranges: Original token ranges from Cassandra
278+
split_factor: Number of sub-partitions per token range
279+
280+
Returns:
281+
List of partition groups, one per sub-range
282+
"""
283+
groups = []
284+
partition_id = 0
285+
286+
for token_range in token_ranges:
287+
# Split the token range into sub-ranges
288+
sub_ranges = token_range.split(split_factor)
289+
290+
# Create a partition group for each sub-range
291+
for sub_range in sub_ranges:
292+
# Estimate size based on fraction
293+
estimated_size = self.default_partition_size_mb * sub_range.fraction
294+
295+
group = PartitionGroup(
296+
partition_id=partition_id,
297+
token_ranges=[sub_range],
298+
estimated_size_mb=estimated_size,
299+
primary_replica=sub_range.replicas[0] if sub_range.replicas else None,
300+
)
301+
groups.append(group)
302+
partition_id += 1
303+
304+
logger.info(
305+
f"Split partitioning: {len(token_ranges)} ranges split by {split_factor} "
306+
f"= {len(groups)} partitions"
307+
)
308+
309+
return groups
310+
263311
def _group_by_replica(self, token_ranges: list[TokenRange]) -> dict[str, list[TokenRange]]:
264312
"""Group token ranges by their primary replica."""
265313
ranges_by_replica: dict[str, list[TokenRange]] = {}

libs/async-cassandra-dataframe/src/async_cassandra_dataframe/reader.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ async def read(
172172
# Partitioning strategy
173173
partition_strategy: str = "auto",
174174
target_partition_size_mb: int = 1024,
175+
split_factor: int | None = None,
175176
# Validation
176177
require_partition_key_predicate: bool = False,
177178
# Progress
@@ -224,6 +225,7 @@ async def read(
224225
pushdown_predicates,
225226
partition_strategy,
226227
target_partition_size_mb,
228+
split_factor,
227229
)
228230

229231
# Normalize snapshot time
@@ -454,6 +456,7 @@ async def _create_partitions(
454456
pushdown_predicates: list,
455457
partition_strategy: str,
456458
target_partition_size_mb: int,
459+
split_factor: int | None,
457460
) -> list[dict[str, Any]]:
458461
"""Create partition definitions."""
459462
# Create partition strategy
@@ -482,6 +485,7 @@ async def _create_partitions(
482485
columns,
483486
None, # writetime_columns
484487
None, # ttl_columns
488+
split_factor,
485489
)
486490
except Exception as e:
487491
logger.warning(f"Could not apply partitioning strategy: {e}")
@@ -497,6 +501,7 @@ async def _create_grouped_partitions(
497501
columns: list[str],
498502
writetime_columns: list[str] | None,
499503
ttl_columns: list[str] | None,
504+
split_factor: int | None,
500505
) -> list[dict[str, Any]]:
501506
"""Create grouped partitions based on partitioning strategy."""
502507
# Get natural token ranges
@@ -513,6 +518,7 @@ async def _create_grouped_partitions(
513518
strategy=strategy_enum,
514519
target_partition_count=partition_count,
515520
target_partition_size_mb=target_partition_size_mb,
521+
split_factor=split_factor,
516522
)
517523

518524
# Log partitioning info
@@ -650,7 +656,9 @@ async def read_cassandra_table(
650656
adaptive_page_size: bool = False,
651657
# Partitioning strategy
652658
partition_strategy: str = "auto",
659+
partitioning_strategy: str | None = None, # Alias for backward compatibility
653660
target_partition_size_mb: int = 1024,
661+
split_factor: int | None = None,
654662
# Validation
655663
require_partition_key_predicate: bool = False,
656664
# Progress
@@ -687,8 +695,9 @@ async def read_cassandra_table(
687695
max_concurrent_partitions=max_concurrent_partitions,
688696
page_size=page_size,
689697
adaptive_page_size=adaptive_page_size,
690-
partition_strategy=partition_strategy,
698+
partition_strategy=partitioning_strategy or partition_strategy, # Use alias if provided
691699
target_partition_size_mb=target_partition_size_mb,
700+
split_factor=split_factor,
692701
require_partition_key_predicate=require_partition_key_predicate,
693702
progress_callback=progress_callback,
694703
client=client,

libs/async-cassandra-dataframe/src/async_cassandra_dataframe/token_ranges.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,68 @@ def contains_token(self, token: int) -> bool:
6363
# Wraparound: token is either after start OR before end
6464
return token >= self.start or token <= self.end
6565

66+
def split(self, split_factor: int) -> list["TokenRange"]:
67+
"""
68+
Split this token range into N equal sub-ranges.
69+
70+
Args:
71+
split_factor: Number of sub-ranges to create
72+
73+
Returns:
74+
List of sub-ranges that cover this range
75+
76+
Raises:
77+
ValueError: If split_factor is not positive
78+
"""
79+
if split_factor < 1:
80+
raise ValueError("split_factor must be positive")
81+
82+
if split_factor == 1:
83+
return [self]
84+
85+
# Handle wraparound ranges
86+
if self.is_wraparound:
87+
# Split into two non-wraparound ranges first
88+
first_part = TokenRange(start=self.start, end=MAX_TOKEN, replicas=self.replicas)
89+
second_part = TokenRange(start=MIN_TOKEN, end=self.end, replicas=self.replicas)
90+
91+
# Calculate how to distribute splits between the two parts
92+
first_size = first_part.size
93+
second_size = second_part.size
94+
total_size = first_size + second_size
95+
96+
# Allocate splits proportionally
97+
first_splits = max(1, round(split_factor * first_size / total_size))
98+
second_splits = max(1, split_factor - first_splits)
99+
100+
result = []
101+
result.extend(first_part.split(first_splits))
102+
result.extend(second_part.split(second_splits))
103+
return result
104+
105+
# Calculate split size
106+
range_size = self.size
107+
if range_size < split_factor:
108+
# Can't split into more parts than tokens available
109+
# Still create the requested number of splits, some may be very small
110+
pass
111+
112+
splits = []
113+
for i in range(split_factor):
114+
# Calculate boundaries for this split
115+
if i == split_factor - 1:
116+
# Last split gets any remainder
117+
start = self.start + (range_size * i // split_factor)
118+
end = self.end
119+
else:
120+
start = self.start + (range_size * i // split_factor)
121+
end = self.start + (range_size * (i + 1) // split_factor)
122+
123+
# Create sub-range with proportional fraction
124+
splits.append(TokenRange(start=start, end=end, replicas=self.replicas))
125+
126+
return splits
127+
66128

67129
async def discover_token_ranges(session: Any, keyspace: str) -> list[TokenRange]:
68130
"""

0 commit comments

Comments
 (0)