Skip to content

Commit 482a936

Browse files
committed
init
1 parent 32f5416 commit 482a936

File tree

8 files changed

+1639
-15
lines changed

8 files changed

+1639
-15
lines changed

examples/bulk_operations/bulk_operations/bulk_operator.py

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -333,21 +333,6 @@ async def export_by_token_ranges(
333333

334334
stats.end_time = time.time()
335335

336-
async def export_to_iceberg(
337-
self,
338-
source_keyspace: str,
339-
source_table: str,
340-
iceberg_warehouse_path: str,
341-
iceberg_table: str,
342-
partition_by: list[str] | None = None,
343-
split_count: int | None = None,
344-
batch_size: int = 10000,
345-
progress_callback: Callable[[BulkOperationStats], None] | None = None,
346-
) -> BulkOperationStats:
347-
"""Export Cassandra table to Iceberg format."""
348-
# This will be implemented when we add Iceberg integration
349-
raise NotImplementedError("Iceberg export will be implemented in next phase")
350-
351336
async def import_from_iceberg(
352337
self,
353338
iceberg_warehouse_path: str,
@@ -519,3 +504,69 @@ async def export_to_parquet(
519504
parallelism=parallelism,
520505
progress_callback=progress_callback,
521506
)
507+
508+
async def export_to_iceberg(
509+
self,
510+
keyspace: str,
511+
table: str,
512+
namespace: str | None = None,
513+
table_name: str | None = None,
514+
catalog: Any | None = None,
515+
catalog_config: dict[str, Any] | None = None,
516+
warehouse_path: str | Path | None = None,
517+
partition_spec: Any | None = None,
518+
table_properties: dict[str, str] | None = None,
519+
compression: str = "snappy",
520+
row_group_size: int = 100000,
521+
columns: list[str] | None = None,
522+
split_count: int | None = None,
523+
parallelism: int | None = None,
524+
progress_callback: Any | None = None,
525+
) -> Any:
526+
"""Export table data to Apache Iceberg format.
527+
528+
This enables modern data lakehouse features like ACID transactions,
529+
time travel, and schema evolution.
530+
531+
Args:
532+
keyspace: Cassandra keyspace to export from
533+
table: Cassandra table to export
534+
namespace: Iceberg namespace (default: keyspace name)
535+
table_name: Iceberg table name (default: Cassandra table name)
536+
catalog: Pre-configured Iceberg catalog (optional)
537+
catalog_config: Custom catalog configuration (optional)
538+
warehouse_path: Path to Iceberg warehouse (for filesystem catalog)
539+
partition_spec: Iceberg partition specification
540+
table_properties: Additional Iceberg table properties
541+
compression: Parquet compression (default: snappy)
542+
row_group_size: Rows per Parquet file (default: 100000)
543+
columns: Columns to export (default: all)
544+
split_count: Number of token range splits
545+
parallelism: Max concurrent operations
546+
progress_callback: Progress callback function
547+
548+
Returns:
549+
ExportProgress with Iceberg metadata
550+
"""
551+
from .iceberg import IcebergExporter
552+
553+
exporter = IcebergExporter(
554+
self,
555+
catalog=catalog,
556+
catalog_config=catalog_config,
557+
warehouse_path=warehouse_path,
558+
compression=compression,
559+
row_group_size=row_group_size,
560+
)
561+
return await exporter.export(
562+
keyspace=keyspace,
563+
table=table,
564+
namespace=namespace,
565+
table_name=table_name,
566+
partition_spec=partition_spec,
567+
table_properties=table_properties,
568+
columns=columns,
569+
split_count=split_count,
570+
parallelism=parallelism,
571+
progress_callback=progress_callback,
572+
)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Apache Iceberg integration for Cassandra bulk operations.
2+
3+
This module provides functionality to export Cassandra data to Apache Iceberg tables,
4+
enabling modern data lakehouse capabilities including:
5+
- ACID transactions
6+
- Schema evolution
7+
- Time travel
8+
- Hidden partitioning
9+
- Efficient analytics
10+
"""
11+
12+
from bulk_operations.iceberg.exporter import IcebergExporter
13+
from bulk_operations.iceberg.schema_mapper import CassandraToIcebergSchemaMapper
14+
15+
__all__ = ["IcebergExporter", "CassandraToIcebergSchemaMapper"]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Iceberg catalog configuration for filesystem-based tables."""
2+
3+
from pathlib import Path
4+
from typing import Any
5+
6+
from pyiceberg.catalog import Catalog, load_catalog
7+
from pyiceberg.catalog.sql import SqlCatalog
8+
9+
10+
def create_filesystem_catalog(
11+
name: str = "cassandra_export",
12+
warehouse_path: str | Path | None = None,
13+
) -> Catalog:
14+
"""Create a filesystem-based Iceberg catalog.
15+
16+
What this does:
17+
--------------
18+
1. Creates a local filesystem catalog using SQLite
19+
2. Stores table metadata in SQLite database
20+
3. Stores actual data files in warehouse directory
21+
4. No external dependencies (S3, Hive, etc.)
22+
23+
Why this matters:
24+
----------------
25+
- Simple setup for development and testing
26+
- No cloud dependencies
27+
- Easy to inspect and debug
28+
- Can be migrated to production catalogs later
29+
30+
Args:
31+
name: Catalog name
32+
warehouse_path: Path to warehouse directory (default: ./iceberg_warehouse)
33+
34+
Returns:
35+
Iceberg catalog instance
36+
"""
37+
if warehouse_path is None:
38+
warehouse_path = Path.cwd() / "iceberg_warehouse"
39+
else:
40+
warehouse_path = Path(warehouse_path)
41+
42+
# Create warehouse directory if it doesn't exist
43+
warehouse_path.mkdir(parents=True, exist_ok=True)
44+
45+
# SQLite catalog configuration
46+
catalog_config = {
47+
"type": "sql",
48+
"uri": f"sqlite:///{warehouse_path / 'catalog.db'}",
49+
"warehouse": str(warehouse_path),
50+
}
51+
52+
# Create catalog
53+
catalog = SqlCatalog(name, **catalog_config)
54+
55+
return catalog
56+
57+
58+
def get_or_create_catalog(
59+
catalog_name: str = "cassandra_export",
60+
warehouse_path: str | Path | None = None,
61+
config: dict[str, Any] | None = None,
62+
) -> Catalog:
63+
"""Get existing catalog or create a new one.
64+
65+
This allows for custom catalog configurations while providing
66+
sensible defaults for filesystem-based catalogs.
67+
68+
Args:
69+
catalog_name: Name of the catalog
70+
warehouse_path: Path to warehouse (for filesystem catalogs)
71+
config: Custom catalog configuration (overrides defaults)
72+
73+
Returns:
74+
Iceberg catalog instance
75+
"""
76+
if config is not None:
77+
# Use custom configuration
78+
return load_catalog(catalog_name, **config)
79+
else:
80+
# Use filesystem catalog
81+
return create_filesystem_catalog(catalog_name, warehouse_path)

0 commit comments

Comments
 (0)