diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0523f943..0b1e67d3 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -178,6 +178,28 @@ async def main(): except Exception as e: print(f"Error during scanning: {e}") + # Demo: Column projection + print("\n--- Testing Column Projection ---") + try: + # Project specific columns by index + print("\n1. Projection by index [0, 1] (id, name):") + scanner_index = await table.new_log_scanner(project=[0, 1]) + scanner_index.subscribe(None, None) + df_projected = scanner_index.to_pandas() + print(df_projected.head()) + print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}") + + # Project specific columns by name (Pythonic!) + print("\n2. Projection by name ['name', 'score'] (Pythonic):") + scanner_names = await table.new_log_scanner(columns=["name", "score"]) + scanner_names.subscribe(None, None) + df_named = scanner_names.to_pandas() + print(df_named.head()) + print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") + + except Exception as e: + print(f"Error during projection: {e}") + # Close connection conn.close() print("\nConnection closed") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8a116485..6cd13c4f 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -34,6 +34,12 @@ pub struct FlussTable { has_primary_key: bool, } +/// Internal enum to represent different projection types +enum ProjectionType { + Indices(Vec), + Names(Vec), +} + #[pymethods] impl FlussTable { /// Create a new append writer for the table @@ -57,32 +63,39 @@ impl FlussTable { }) } - /// Create a new log scanner for the table - fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult> { - let conn = self.connection.clone(); - let metadata = self.metadata.clone(); - let table_info = self.table_info.clone(); - - future_into_py(py, async move { - let fluss_table = - fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); - - let table_scan = fluss_table.new_scan(); - - let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - PyErr::new::(format!( - "Failed to create log scanner: {e:?}" - )) - })?; - - let admin = conn - .get_admin() - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; + /// Create a new log scanner for the table. + /// + /// Args: + /// project: Optional list of column indices (0-based) to include in the scan. + /// columns: Optional list of column names to include in the scan. + /// + /// Returns: + /// LogScanner, optionally with projection applied + /// + /// Note: + /// Specify only one of 'project' or 'columns'. + /// If neither is specified, all columns are included. + /// Rust side will validate the projection parameters. + /// + #[pyo3(signature = (project=None, columns=None))] + pub fn new_log_scanner<'py>( + &self, + py: Python<'py>, + project: Option>, + columns: Option>, + ) -> PyResult> { + let projection = match (project, columns) { + (Some(_), Some(_)) => { + return Err(FlussError::new_err( + "Specify only one of 'project' or 'columns'".to_string(), + )); + } + (Some(indices), None) => Some(ProjectionType::Indices(indices)), + (None, Some(names)) => Some(ProjectionType::Names(names)), + (None, None) => None, + }; - let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); - Python::attach(|py| Py::new(py, py_scanner)) - }) + self.create_log_scanner_internal(py, projection) } /// Get table information @@ -126,6 +139,55 @@ impl FlussTable { has_primary_key, } } + + /// Internal helper to create log scanner with optional projection + fn create_log_scanner_internal<'py>( + &self, + py: Python<'py>, + projection: Option, + ) -> PyResult> { + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + let mut table_scan = fluss_table.new_scan(); + + // Apply projection if specified + if let Some(proj) = projection { + table_scan = match proj { + ProjectionType::Indices(indices) => { + table_scan.project(&indices).map_err(|e| { + FlussError::new_err(format!("Failed to project columns: {e}")) + })? + } + ProjectionType::Names(names) => { + // Convert Vec to Vec<&str> for the API + let column_name_refs: Vec<&str> = + names.iter().map(|s| s.as_str()).collect(); + table_scan.project_by_name(&column_name_refs).map_err(|e| { + FlussError::new_err(format!("Failed to project columns: {e}")) + })? + } + }; + } + + let rust_scanner = table_scan + .create_log_scanner() + .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } } /// Writer for appending data to a Fluss table