3232 logger .warning ("aiofiles not installed - using synchronous file I/O" )
3333
3434
35- async def count_table_rows (session , table_name : str ) -> int :
35+ async def count_table_rows (session , keyspace : str , table_name : str ) -> int :
3636 """Count total rows in a table (approximate for large tables)."""
3737 # Note: COUNT(*) can be slow on large tables
3838 # Consider using token ranges for very large tables
39- result = await session .execute (f"SELECT COUNT(*) FROM { table_name } " )
39+ # Using system schema to validate table exists and avoid SQL injection
40+ validation_query = await session .execute (
41+ "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?" ,
42+ [keyspace , table_name ],
43+ )
44+ if not validation_query .one ():
45+ raise ValueError (f"Table { keyspace } .{ table_name } does not exist" )
46+
47+ # Safe to use table name after validation - but still use qualified name
48+ # In production, consider using prepared statements even for COUNT queries
49+ result = await session .execute (f"SELECT COUNT(*) FROM { keyspace } .{ table_name } " )
4050 return result .one ()[0 ]
4151
4252
43- async def export_table_async (session , table_name : str , output_file : str ):
53+ async def export_table_async (session , keyspace : str , table_name : str , output_file : str ):
4454 """Export table using async file I/O (requires aiofiles)."""
45- logger .info (f"Starting async export of { table_name } to { output_file } " )
55+ logger .info (f"Starting async export of { keyspace } . { table_name } to { output_file } " )
4656
4757 # Get approximate row count for progress tracking
48- total_rows = await count_table_rows (session , table_name )
58+ total_rows = await count_table_rows (session , keyspace , table_name )
4959 logger .info (f"Table has approximately { total_rows :,} rows" )
5060
5161 # Configure streaming with progress callback
@@ -67,8 +77,16 @@ def progress_callback(page_num: int, rows_so_far: int):
6777 start_time = datetime .now ()
6878
6979 # CRITICAL: Use context manager for streaming to prevent memory leaks
80+ # Validate table exists before streaming
81+ validation_query = await session .execute (
82+ "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?" ,
83+ [keyspace , table_name ],
84+ )
85+ if not validation_query .one ():
86+ raise ValueError (f"Table { keyspace } .{ table_name } does not exist" )
87+
7088 async with await session .execute_stream (
71- f"SELECT * FROM { table_name } " , stream_config = config
89+ f"SELECT * FROM { keyspace } . { table_name } " , stream_config = config
7290 ) as result :
7391 # Export to CSV
7492 async with aiofiles .open (output_file , "w" , newline = "" ) as f :
@@ -111,13 +129,13 @@ def progress_callback(page_num: int, rows_so_far: int):
111129 logger .info (f"- File size: { os .path .getsize (output_file ):,} bytes" )
112130
113131
114- def export_table_sync (session , table_name : str , output_file : str ):
132+ def export_table_sync (session , keyspace : str , table_name : str , output_file : str ):
115133 """Export table using synchronous file I/O."""
116- logger .info (f"Starting sync export of { table_name } to { output_file } " )
134+ logger .info (f"Starting sync export of { keyspace } . { table_name } to { output_file } " )
117135
118136 async def _export ():
119137 # Get approximate row count
120- total_rows = await count_table_rows (session , table_name )
138+ total_rows = await count_table_rows (session , keyspace , table_name )
121139 logger .info (f"Table has approximately { total_rows :,} rows" )
122140
123141 # Configure streaming
@@ -133,8 +151,16 @@ async def _export():
133151 start_time = datetime .now ()
134152
135153 # Use context manager for proper streaming cleanup
154+ # Validate table exists before streaming
155+ validation_query = await session .execute (
156+ "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ?" ,
157+ [keyspace , table_name ],
158+ )
159+ if not validation_query .one ():
160+ raise ValueError (f"Table { keyspace } .{ table_name } does not exist" )
161+
136162 async with await session .execute_stream (
137- f"SELECT * FROM { table_name } " , stream_config = config
163+ f"SELECT * FROM { keyspace } . { table_name } " , stream_config = config
138164 ) as result :
139165 # Export to CSV synchronously
140166 with open (output_file , "w" , newline = "" ) as f :
@@ -272,9 +298,13 @@ async def main():
272298
273299 # Export using async I/O if available
274300 if ASYNC_FILE_IO :
275- await export_table_async (session , "products" , str (output_dir / "products_async.csv" ))
301+ await export_table_async (
302+ session , "export_example" , "products" , str (output_dir / "products_async.csv" )
303+ )
276304 else :
277- await export_table_sync (session , "products" , str (output_dir / "products_sync.csv" ))
305+ await export_table_sync (
306+ session , "export_example" , "products" , str (output_dir / "products_sync.csv" )
307+ )
278308
279309 # Cleanup (optional)
280310 logger .info ("\n Cleaning up..." )
0 commit comments