Skip to content

Commit 1d72b74

Browse files
committed
tests
1 parent fe90109 commit 1d72b74

38 files changed

+5452
-4594
lines changed

libs/async-cassandra-bulk/src/async_cassandra_bulk/operators/bulk_operator.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,16 @@ def _parse_timestamp_to_micros(self, timestamp: Union[str, int, float, datetime]
110110
if timestamp < 0:
111111
raise ValueError("Timestamp cannot be negative")
112112

113-
# Detect if it's seconds or milliseconds
113+
# Detect if it's seconds, milliseconds, or microseconds
114114
# If timestamp is less than year 3000 in seconds, assume seconds
115115
if timestamp < 32503680000: # Jan 1, 3000 in seconds
116116
return int(timestamp * 1_000_000)
117-
else:
118-
# Assume milliseconds
117+
# If timestamp is less than year 3000 in milliseconds, assume milliseconds
118+
elif timestamp < 32503680000000: # Jan 1, 3000 in milliseconds
119119
return int(timestamp * 1_000)
120+
else:
121+
# Assume microseconds (already in the correct unit)
122+
return int(timestamp)
120123

121124
else:
122125
raise TypeError(f"Unsupported timestamp type: {type(timestamp)}")
@@ -212,6 +215,9 @@ async def export(
212215
- writetime_before: Export rows where ANY column was written before this time
213216
- writetime_filter_mode: "any" (default) or "all" - whether ANY or ALL
214217
writetime columns must match the filter criteria
218+
- include_ttl: Include TTL (time to live) for columns (default: False)
219+
- ttl_columns: List of columns to get TTL for
220+
(default: None, use ["*"] for all non-key columns)
215221
csv_options: CSV-specific options
216222
json_options: JSON-specific options
217223
parquet_options: Parquet-specific options
@@ -264,6 +270,14 @@ async def export(
264270
if export_options.get("include_writetime") and not writetime_columns:
265271
# Default to all columns if include_writetime is True
266272
writetime_columns = ["*"]
273+
# Update the options dict so validation sees it
274+
export_options["writetime_columns"] = writetime_columns
275+
276+
# Extract TTL options
277+
ttl_columns = export_options.get("ttl_columns")
278+
if export_options.get("include_ttl") and not ttl_columns:
279+
# Default to all columns if include_ttl is True
280+
ttl_columns = ["*"]
267281

268282
# Validate writetime options
269283
self._validate_writetime_options(export_options)
@@ -287,6 +301,7 @@ async def export(
287301
resume_from=resume_from,
288302
columns=columns,
289303
writetime_columns=writetime_columns,
304+
ttl_columns=ttl_columns,
290305
writetime_after_micros=writetime_after_micros,
291306
writetime_before_micros=writetime_before_micros,
292307
writetime_filter_mode=writetime_filter_mode,

libs/async-cassandra-bulk/src/async_cassandra_bulk/parallel_export.py

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
resume_from: Optional[Dict[str, Any]] = None,
4646
columns: Optional[List[str]] = None,
4747
writetime_columns: Optional[List[str]] = None,
48+
ttl_columns: Optional[List[str]] = None,
4849
writetime_after_micros: Optional[int] = None,
4950
writetime_before_micros: Optional[int] = None,
5051
writetime_filter_mode: str = "any",
@@ -64,6 +65,7 @@ def __init__(
6465
resume_from: Previous checkpoint to resume from
6566
columns: Optional list of columns to export (default: all)
6667
writetime_columns: Optional list of columns to get writetime for
68+
ttl_columns: Optional list of columns to get TTL for
6769
writetime_after_micros: Only export rows with writetime after this (microseconds)
6870
writetime_before_micros: Only export rows with writetime before this (microseconds)
6971
writetime_filter_mode: "any" or "all" - how to combine writetime filters
@@ -79,6 +81,7 @@ def __init__(
7981
self.resume_from = resume_from
8082
self.columns = columns
8183
self.writetime_columns = writetime_columns
84+
self.ttl_columns = ttl_columns
8285
self.writetime_after_micros = writetime_after_micros
8386
self.writetime_before_micros = writetime_before_micros
8487
self.writetime_filter_mode = writetime_filter_mode
@@ -129,6 +132,11 @@ def _load_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
129132
f"Writetime columns changed from {config['writetime_columns']} to {self.writetime_columns}"
130133
)
131134

135+
if config.get("ttl_columns") != self.ttl_columns:
136+
logger.warning(
137+
f"TTL columns changed from {config['ttl_columns']} to {self.ttl_columns}"
138+
)
139+
132140
# Check writetime filter changes
133141
if config.get("writetime_after_micros") != self.writetime_after_micros:
134142
logger.warning(
@@ -220,11 +228,19 @@ def _should_filter_row(self, row_dict: Dict[str, Any]) -> bool:
220228
else:
221229
writetime_values.append(value)
222230

231+
# DEBUG
232+
if row_dict.get("id") == 4:
233+
logger.info(f"DEBUG: Row 4 writetime values: {writetime_values}")
234+
logger.info(f"DEBUG: Filtering with after={self.writetime_after_micros}")
235+
logger.info(f"DEBUG: Row 4 full dict keys: {list(row_dict.keys())}")
236+
wt_entries = {k: v for k, v in row_dict.items() if "_writetime" in k}
237+
logger.info(f"DEBUG: Row 4 writetime entries: {wt_entries}")
238+
223239
if not writetime_values:
224-
# No writetime values found - this shouldn't happen if writetime filtering is enabled
225-
# but if it does, we'll include the row to be safe
226-
logger.warning("No writetime values found in row for filtering")
227-
return False
240+
# No writetime values found - all columns are NULL or primary keys
241+
# When filtering by writetime, rows with no writetime values should be excluded
242+
# as they cannot match any writetime criteria
243+
return True # Filter out the row
228244

229245
# Apply filtering based on mode
230246
if self.writetime_filter_mode == "any":
@@ -290,6 +306,7 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
290306
),
291307
self._resolved_columns or self.columns,
292308
self.writetime_columns,
309+
self.ttl_columns,
293310
clustering_keys,
294311
counter_columns,
295312
)
@@ -302,7 +319,10 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
302319
row_dict[field] = getattr(row, field)
303320

304321
# Apply writetime filtering if enabled
305-
if not self._should_filter_row(row_dict):
322+
should_filter = self._should_filter_row(row_dict)
323+
if row_dict.get("id") == 4:
324+
logger.info(f"DEBUG: Row 4 should_filter={should_filter}")
325+
if not should_filter:
306326
await self.exporter.write_row(row_dict)
307327
row_count += 1
308328
stats.rows_processed += 1
@@ -315,6 +335,7 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
315335
TokenRange(start=MIN_TOKEN, end=token_range.end, replicas=token_range.replicas),
316336
self._resolved_columns or self.columns,
317337
self.writetime_columns,
338+
self.ttl_columns,
318339
clustering_keys,
319340
counter_columns,
320341
)
@@ -327,7 +348,10 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
327348
row_dict[field] = getattr(row, field)
328349

329350
# Apply writetime filtering if enabled
330-
if not self._should_filter_row(row_dict):
351+
should_filter = self._should_filter_row(row_dict)
352+
if row_dict.get("id") == 4:
353+
logger.info(f"DEBUG: Row 4 should_filter={should_filter}")
354+
if not should_filter:
331355
await self.exporter.write_row(row_dict)
332356
row_count += 1
333357
stats.rows_processed += 1
@@ -340,6 +364,7 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
340364
token_range,
341365
self._resolved_columns or self.columns,
342366
self.writetime_columns,
367+
self.ttl_columns,
343368
clustering_keys,
344369
counter_columns,
345370
)
@@ -352,7 +377,10 @@ async def _export_range(self, token_range: TokenRange, stats: BulkOperationStats
352377
row_dict[field] = getattr(row, field)
353378

354379
# Apply writetime filtering if enabled
355-
if not self._should_filter_row(row_dict):
380+
should_filter = self._should_filter_row(row_dict)
381+
if row_dict.get("id") == 4:
382+
logger.info(f"DEBUG: Row 4 should_filter={should_filter}")
383+
if not should_filter:
356384
await self.exporter.write_row(row_dict)
357385
row_count += 1
358386
stats.rows_processed += 1
@@ -424,6 +452,7 @@ async def _save_checkpoint(self, stats: BulkOperationStats) -> None:
424452
"table": self.table,
425453
"columns": self.columns,
426454
"writetime_columns": self.writetime_columns,
455+
"ttl_columns": self.ttl_columns,
427456
"batch_size": self.batch_size,
428457
"concurrency": self.concurrency,
429458
"writetime_after_micros": self.writetime_after_micros,
@@ -527,21 +556,22 @@ async def export(self) -> BulkOperationStats:
527556

528557
# Write header including writetime columns
529558
header_columns = columns.copy()
530-
if self.writetime_columns:
531-
# Get key columns and counter columns to exclude
532-
cluster = self.session._session.cluster
533-
metadata = cluster.metadata
534-
table_meta = metadata.keyspaces[self.keyspace].tables[self.table_name]
535-
partition_keys = {col.name for col in table_meta.partition_key}
536-
clustering_keys = {col.name for col in table_meta.clustering_key}
537-
key_columns = partition_keys | clustering_keys
538559

539-
# Get counter columns (they don't support writetime)
540-
counter_columns = set()
541-
for col_name, col_meta in table_meta.columns.items():
542-
if col_meta.cql_type == "counter":
543-
counter_columns.add(col_name)
560+
# Get key columns and counter columns to exclude (needed for both writetime and TTL)
561+
cluster = self.session._session.cluster
562+
metadata = cluster.metadata
563+
table_meta = metadata.keyspaces[self.keyspace].tables[self.table_name]
564+
partition_keys = {col.name for col in table_meta.partition_key}
565+
clustering_keys = {col.name for col in table_meta.clustering_key}
566+
key_columns = partition_keys | clustering_keys
567+
568+
# Get counter columns (they don't support writetime or TTL)
569+
counter_columns = set()
570+
for col_name, col_meta in table_meta.columns.items():
571+
if col_meta.cql_type == "counter":
572+
counter_columns.add(col_name)
544573

574+
if self.writetime_columns:
545575
# Add writetime columns to header
546576
if self.writetime_columns == ["*"]:
547577
# Add writetime for all non-key, non-counter columns
@@ -554,6 +584,20 @@ async def export(self) -> BulkOperationStats:
554584
if col not in key_columns and col not in counter_columns:
555585
header_columns.append(f"{col}_writetime")
556586

587+
# Add TTL columns to header
588+
if self.ttl_columns:
589+
# TTL uses same exclusions as writetime
590+
if self.ttl_columns == ["*"]:
591+
# Add TTL for all non-key, non-counter columns
592+
for col in columns:
593+
if col not in key_columns and col not in counter_columns:
594+
header_columns.append(f"{col}_ttl")
595+
else:
596+
# Add TTL for specific columns (excluding keys and counters)
597+
for col in self.ttl_columns:
598+
if col not in key_columns and col not in counter_columns:
599+
header_columns.append(f"{col}_ttl")
600+
557601
# Write header only if not resuming
558602
if not self._header_written:
559603
await self.exporter.write_header(header_columns)

libs/async-cassandra-bulk/src/async_cassandra_bulk/serializers/writetime.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,24 @@ def serialize(self, value: Any, context: SerializationContext) -> Any:
4545
else:
4646
return None
4747

48-
# Convert microseconds to datetime
49-
# Cassandra writetime is microseconds since epoch
50-
timestamp = datetime.fromtimestamp(value / 1_000_000, tz=timezone.utc)
51-
52-
if context.format == "csv":
53-
# For CSV, use configurable format or ISO
54-
fmt = context.options.get("writetime_format")
55-
if fmt is None:
56-
fmt = "%Y-%m-%d %H:%M:%S.%f"
57-
return timestamp.strftime(fmt)
58-
elif context.format == "json":
59-
# For JSON, use ISO format with timezone
48+
# Check if raw writetime values are requested
49+
if context.options.get("writetime_raw", False):
50+
# Return raw microsecond value for exact precision
51+
return value
52+
53+
# For maximum precision, we need to handle large microsecond values carefully
54+
# Python's datetime has limitations with very large timestamps
55+
56+
if context.format in ("csv", "json"):
57+
# Convert to seconds and microseconds separately to avoid float precision loss
58+
seconds = value // 1_000_000
59+
microseconds = value % 1_000_000
60+
61+
# Create datetime from seconds, then adjust microseconds
62+
timestamp = datetime.fromtimestamp(seconds, tz=timezone.utc)
63+
timestamp = timestamp.replace(microsecond=microseconds)
64+
65+
# Return ISO format for both CSV and JSON
6066
return timestamp.isoformat()
6167
else:
6268
# For other formats, return as-is

0 commit comments

Comments
 (0)