From 6d190c12b7a12c9dc5cbaadb86436f8870378fba Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 11 Jan 2026 18:34:42 +0000 Subject: [PATCH 1/5] [ISSUE-149] Add column projection support to Python LogScanner --- bindings/python/example/example.py | 22 ++++++ bindings/python/src/table.rs | 105 +++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0523f943..9e5f9b3f 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -178,6 +178,28 @@ async def main(): except Exception as e: print(f"Error during scanning: {e}") + # Demo: Column projection + print("\n--- Testing Column Projection ---") + try: + # Project specific columns by index (C++ parity) + print("\n1. Projection by index [0, 1] (id, name):") + scanner_index = await table.new_log_scanner_with_projection([0, 1]) + scanner_index.subscribe(None, None) + df_projected = scanner_index.to_pandas() + print(df_projected.head()) + print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}") + + # Project specific columns by name (Python-specific, more idiomatic!) + print("\n2. Projection by name ['name', 'score'] (Pythonic):") + scanner_names = await table.new_log_scanner_with_column_names(["name", "score"]) + scanner_names.subscribe(None, None) + df_named = scanner_names.to_pandas() + print(df_named.head()) + print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") + + except Exception as e: + print(f"Error during projection: {e}") + # Close connection conn.close() print("\nConnection closed") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8a116485..2030d1dc 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -85,6 +85,111 @@ impl FlussTable { }) } + /// Create a new log scanner with column projection (by index). + /// + /// Args: + /// column_indices: List of column indices to include in the scan (0-based) + /// + /// Returns: + /// LogScanner with projection applied + /// + /// Example: + /// >>> scanner = await table.new_log_scanner_with_projection([0, 2, 4]) + pub fn new_log_scanner_with_projection<'py>( + &self, + py: Python<'py>, + column_indices: Vec, + ) -> PyResult> { + // Validate early with Python-friendly error + if column_indices.is_empty() { + return Err(FlussError::new_err( + "column_indices cannot be empty".to_string(), + )); + } + + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + let table_scan = fluss_table.new_scan(); + let table_scan = table_scan + .project(&column_indices) + .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; + + let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + FlussError::new_err(format!("Failed to create log scanner: {e}")) + })?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } + + /// Create a new log scanner with column projection (by name). + /// + /// This is the more Pythonic way to specify projections using column names + /// instead of indices. + /// + /// Args: + /// column_names: List of column names to include in the scan + /// + /// Returns: + /// LogScanner with projection applied + /// + /// Example: + /// >>> scanner = await table.new_log_scanner_with_column_names(["id", "name", "email"]) + pub fn new_log_scanner_with_column_names<'py>( + &self, + py: Python<'py>, + column_names: Vec, + ) -> PyResult> { + // Validate early with Python-friendly error + if column_names.is_empty() { + return Err(FlussError::new_err( + "column_names cannot be empty".to_string(), + )); + } + + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + // Convert Vec to Vec<&str> for the API + // Safe: project_by_name validates names immediately, doesn't store refs + let column_name_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect(); + + let table_scan = fluss_table.new_scan(); + let table_scan = table_scan + .project_by_name(&column_name_refs) + .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; + + let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + FlussError::new_err(format!("Failed to create log scanner: {e}")) + })?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } + /// Get table information pub fn get_table_info(&self) -> TableInfo { TableInfo::from_core(self.table_info.clone()) From 5f128af956772045d598b86af52a947d4a499999 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 11 Jan 2026 22:26:56 +0000 Subject: [PATCH 2/5] fix PyError to FlussError --- bindings/python/src/table.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 2030d1dc..36bb9b5e 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -70,9 +70,7 @@ impl FlussTable { let table_scan = fluss_table.new_scan(); let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - PyErr::new::(format!( - "Failed to create log scanner: {e:?}" - )) + FlussError::new_err(format!("Failed to create log scanner: {e}")) })?; let admin = conn From 0ff57cdf5962869f2422c08541c93b6c0c56f4f1 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 15 Jan 2026 15:22:13 +0000 Subject: [PATCH 3/5] cleanup --- bindings/python/example/example.py | 8 +- bindings/python/src/table.rs | 265 ++++++++++++++++------------- 2 files changed, 152 insertions(+), 121 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 9e5f9b3f..0b1e67d3 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -181,17 +181,17 @@ async def main(): # Demo: Column projection print("\n--- Testing Column Projection ---") try: - # Project specific columns by index (C++ parity) + # Project specific columns by index print("\n1. Projection by index [0, 1] (id, name):") - scanner_index = await table.new_log_scanner_with_projection([0, 1]) + scanner_index = await table.new_log_scanner(project=[0, 1]) scanner_index.subscribe(None, None) df_projected = scanner_index.to_pandas() print(df_projected.head()) print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}") - # Project specific columns by name (Python-specific, more idiomatic!) + # Project specific columns by name (Pythonic!) print("\n2. Projection by name ['name', 'score'] (Pythonic):") - scanner_names = await table.new_log_scanner_with_column_names(["name", "score"]) + scanner_names = await table.new_log_scanner(columns=["name", "score"]) scanner_names.subscribe(None, None) df_named = scanner_names.to_pandas() print(df_named.head()) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 36bb9b5e..dc7cf592 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -34,6 +34,12 @@ pub struct FlussTable { has_primary_key: bool, } +/// Internal enum to represent different projection types +enum ProjectionType { + Indices(Vec), + Names(Vec), +} + #[pymethods] impl FlussTable { /// Create a new append writer for the table @@ -57,135 +63,64 @@ impl FlussTable { }) } - /// Create a new log scanner for the table - fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult> { - let conn = self.connection.clone(); - let metadata = self.metadata.clone(); - let table_info = self.table_info.clone(); - - future_into_py(py, async move { - let fluss_table = - fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); - - let table_scan = fluss_table.new_scan(); - - let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - FlussError::new_err(format!("Failed to create log scanner: {e}")) - })?; - - let admin = conn - .get_admin() - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - - let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); - Python::attach(|py| Py::new(py, py_scanner)) - }) - } - - /// Create a new log scanner with column projection (by index). + /// Create a new log scanner for the table. /// /// Args: - /// column_indices: List of column indices to include in the scan (0-based) + /// project: Optional list of column indices (0-based) to include in the scan. + /// Empty list means all columns. + /// columns: Optional list of column names to include in the scan. + /// Empty list means all columns. /// /// Returns: - /// LogScanner with projection applied + /// LogScanner, optionally with projection applied /// - /// Example: - /// >>> scanner = await table.new_log_scanner_with_projection([0, 2, 4]) - pub fn new_log_scanner_with_projection<'py>( - &self, - py: Python<'py>, - column_indices: Vec, - ) -> PyResult> { - // Validate early with Python-friendly error - if column_indices.is_empty() { - return Err(FlussError::new_err( - "column_indices cannot be empty".to_string(), - )); - } - - let conn = self.connection.clone(); - let metadata = self.metadata.clone(); - let table_info = self.table_info.clone(); - - future_into_py(py, async move { - let fluss_table = - fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); - - let table_scan = fluss_table.new_scan(); - let table_scan = table_scan - .project(&column_indices) - .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; - - let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - FlussError::new_err(format!("Failed to create log scanner: {e}")) - })?; - - let admin = conn - .get_admin() - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - - let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); - Python::attach(|py| Py::new(py, py_scanner)) - }) - } - - /// Create a new log scanner with column projection (by name). - /// - /// This is the more Pythonic way to specify projections using column names - /// instead of indices. + /// Note: + /// Specify only one of 'project' or 'columns'. + /// If neither is specified, all columns are included. /// - /// Args: - /// column_names: List of column names to include in the scan + /// Examples: + /// >>> # Scan all columns + /// >>> scanner = await table.new_log_scanner() /// - /// Returns: - /// LogScanner with projection applied + /// >>> # Scan specific columns by index + /// >>> scanner = await table.new_log_scanner(project=[0, 2, 4]) /// - /// Example: - /// >>> scanner = await table.new_log_scanner_with_column_names(["id", "name", "email"]) - pub fn new_log_scanner_with_column_names<'py>( + /// >>> # Scan specific columns by name (more Pythonic) + /// >>> scanner = await table.new_log_scanner(columns=["id", "name", "email"]) + #[pyo3(signature = (project=None, columns=None))] + pub fn new_log_scanner<'py>( &self, py: Python<'py>, - column_names: Vec, + project: Option>, + columns: Option>, ) -> PyResult> { - // Validate early with Python-friendly error - if column_names.is_empty() { - return Err(FlussError::new_err( - "column_names cannot be empty".to_string(), - )); - } - - let conn = self.connection.clone(); - let metadata = self.metadata.clone(); - let table_info = self.table_info.clone(); - - future_into_py(py, async move { - let fluss_table = - fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); - - // Convert Vec to Vec<&str> for the API - // Safe: project_by_name validates names immediately, doesn't store refs - let column_name_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect(); - - let table_scan = fluss_table.new_scan(); - let table_scan = table_scan - .project_by_name(&column_name_refs) - .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; - - let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - FlussError::new_err(format!("Failed to create log scanner: {e}")) - })?; - - let admin = conn - .get_admin() - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; + // Validate mutually exclusive parameters and normalize empty lists + let projection = match (project, columns) { + (Some(_), Some(_)) => { + return Err(FlussError::new_err( + "Specify only one of 'project' or 'columns'".to_string(), + )); + } + (Some(indices), None) => { + if indices.is_empty() { + None // Empty list = all columns + } else { + let deduped = self.validate_and_dedupe_indices(&indices)?; + Some(ProjectionType::Indices(deduped)) + } + } + (None, Some(names)) => { + if names.is_empty() { + None // Empty list = all columns + } else { + let deduped = self.validate_and_dedupe_names(&names)?; + Some(ProjectionType::Names(deduped)) + } + } + (None, None) => None, // No projection - all columns + }; - let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); - Python::attach(|py| Py::new(py, py_scanner)) - }) + self.create_log_scanner_internal(py, projection) } /// Get table information @@ -229,6 +164,102 @@ impl FlussTable { has_primary_key, } } + + /// Validate and deduplicate column indices (preserving order) + fn validate_and_dedupe_indices(&self, indices: &[usize]) -> PyResult> { + use std::collections::HashSet; + + let field_count = self.table_info.row_type().fields().len(); + let mut seen = HashSet::new(); + let mut deduped = Vec::with_capacity(indices.len()); + + for &idx in indices { + if idx >= field_count { + return Err(FlussError::new_err(format!( + "Column index {idx} out of range (field count: {field_count})" + ))); + } + if seen.insert(idx) { + deduped.push(idx); + } + } + + Ok(deduped) + } + + /// Validate and deduplicate column names (preserving order) + fn validate_and_dedupe_names(&self, names: &[String]) -> PyResult> { + use std::collections::HashSet; + + let fields = self.table_info.row_type().fields(); + let valid_names: HashSet<&str> = fields.iter().map(|f| f.name()).collect(); + + let mut seen: HashSet = HashSet::new(); + let mut deduped = Vec::with_capacity(names.len()); + + for name in names { + if !valid_names.contains(name.as_str()) { + return Err(FlussError::new_err(format!( + "Column '{}' not found in table schema", + name + ))); + } + if seen.insert(name.clone()) { + deduped.push(name.clone()); + } + } + + Ok(deduped) + } + + /// Internal helper to create log scanner with optional projection + fn create_log_scanner_internal<'py>( + &self, + py: Python<'py>, + projection: Option, + ) -> PyResult> { + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + let mut table_scan = fluss_table.new_scan(); + + // Apply projection if specified + if let Some(proj) = projection { + table_scan = match proj { + ProjectionType::Indices(indices) => { + table_scan.project(&indices).map_err(|e| { + FlussError::new_err(format!("Failed to project columns: {e}")) + })? + } + ProjectionType::Names(names) => { + // Convert Vec to Vec<&str> for the API + let column_name_refs: Vec<&str> = + names.iter().map(|s| s.as_str()).collect(); + table_scan.project_by_name(&column_name_refs).map_err(|e| { + FlussError::new_err(format!("Failed to project columns: {e}")) + })? + } + }; + } + + let rust_scanner = table_scan + .create_log_scanner() + .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } } /// Writer for appending data to a Fluss table From f587309c702873572131b024edf9e4960e7cabfa Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 15 Jan 2026 16:21:30 +0000 Subject: [PATCH 4/5] empty list -> error --- bindings/python/src/table.rs | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index dc7cf592..ef9a9a10 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -67,9 +67,9 @@ impl FlussTable { /// /// Args: /// project: Optional list of column indices (0-based) to include in the scan. - /// Empty list means all columns. + /// Must be non-empty if specified. /// columns: Optional list of column names to include in the scan. - /// Empty list means all columns. + /// Must be non-empty if specified. /// /// Returns: /// LogScanner, optionally with projection applied @@ -78,15 +78,6 @@ impl FlussTable { /// Specify only one of 'project' or 'columns'. /// If neither is specified, all columns are included. /// - /// Examples: - /// >>> # Scan all columns - /// >>> scanner = await table.new_log_scanner() - /// - /// >>> # Scan specific columns by index - /// >>> scanner = await table.new_log_scanner(project=[0, 2, 4]) - /// - /// >>> # Scan specific columns by name (more Pythonic) - /// >>> scanner = await table.new_log_scanner(columns=["id", "name", "email"]) #[pyo3(signature = (project=None, columns=None))] pub fn new_log_scanner<'py>( &self, @@ -94,7 +85,7 @@ impl FlussTable { project: Option>, columns: Option>, ) -> PyResult> { - // Validate mutually exclusive parameters and normalize empty lists + // Validate mutually exclusive parameters let projection = match (project, columns) { (Some(_), Some(_)) => { return Err(FlussError::new_err( @@ -103,19 +94,21 @@ impl FlussTable { } (Some(indices), None) => { if indices.is_empty() { - None // Empty list = all columns - } else { - let deduped = self.validate_and_dedupe_indices(&indices)?; - Some(ProjectionType::Indices(deduped)) + return Err(FlussError::new_err( + "project list cannot be empty".to_string(), + )); } + let deduped = self.validate_and_dedupe_indices(&indices)?; + Some(ProjectionType::Indices(deduped)) } (None, Some(names)) => { if names.is_empty() { - None // Empty list = all columns - } else { - let deduped = self.validate_and_dedupe_names(&names)?; - Some(ProjectionType::Names(deduped)) + return Err(FlussError::new_err( + "columns list cannot be empty".to_string(), + )); } + let deduped = self.validate_and_dedupe_names(&names)?; + Some(ProjectionType::Names(deduped)) } (None, None) => None, // No projection - all columns }; From 2b10dc4b5e92b129f08d68f50adf83a64bb17605 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 16 Jan 2026 02:09:10 +0000 Subject: [PATCH 5/5] remove validation and dedupe --- bindings/python/src/table.rs | 73 ++---------------------------------- 1 file changed, 4 insertions(+), 69 deletions(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index ef9a9a10..6cd13c4f 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -67,9 +67,7 @@ impl FlussTable { /// /// Args: /// project: Optional list of column indices (0-based) to include in the scan. - /// Must be non-empty if specified. /// columns: Optional list of column names to include in the scan. - /// Must be non-empty if specified. /// /// Returns: /// LogScanner, optionally with projection applied @@ -77,6 +75,7 @@ impl FlussTable { /// Note: /// Specify only one of 'project' or 'columns'. /// If neither is specified, all columns are included. + /// Rust side will validate the projection parameters. /// #[pyo3(signature = (project=None, columns=None))] pub fn new_log_scanner<'py>( @@ -85,32 +84,15 @@ impl FlussTable { project: Option>, columns: Option>, ) -> PyResult> { - // Validate mutually exclusive parameters let projection = match (project, columns) { (Some(_), Some(_)) => { return Err(FlussError::new_err( "Specify only one of 'project' or 'columns'".to_string(), )); } - (Some(indices), None) => { - if indices.is_empty() { - return Err(FlussError::new_err( - "project list cannot be empty".to_string(), - )); - } - let deduped = self.validate_and_dedupe_indices(&indices)?; - Some(ProjectionType::Indices(deduped)) - } - (None, Some(names)) => { - if names.is_empty() { - return Err(FlussError::new_err( - "columns list cannot be empty".to_string(), - )); - } - let deduped = self.validate_and_dedupe_names(&names)?; - Some(ProjectionType::Names(deduped)) - } - (None, None) => None, // No projection - all columns + (Some(indices), None) => Some(ProjectionType::Indices(indices)), + (None, Some(names)) => Some(ProjectionType::Names(names)), + (None, None) => None, }; self.create_log_scanner_internal(py, projection) @@ -158,53 +140,6 @@ impl FlussTable { } } - /// Validate and deduplicate column indices (preserving order) - fn validate_and_dedupe_indices(&self, indices: &[usize]) -> PyResult> { - use std::collections::HashSet; - - let field_count = self.table_info.row_type().fields().len(); - let mut seen = HashSet::new(); - let mut deduped = Vec::with_capacity(indices.len()); - - for &idx in indices { - if idx >= field_count { - return Err(FlussError::new_err(format!( - "Column index {idx} out of range (field count: {field_count})" - ))); - } - if seen.insert(idx) { - deduped.push(idx); - } - } - - Ok(deduped) - } - - /// Validate and deduplicate column names (preserving order) - fn validate_and_dedupe_names(&self, names: &[String]) -> PyResult> { - use std::collections::HashSet; - - let fields = self.table_info.row_type().fields(); - let valid_names: HashSet<&str> = fields.iter().map(|f| f.name()).collect(); - - let mut seen: HashSet = HashSet::new(); - let mut deduped = Vec::with_capacity(names.len()); - - for name in names { - if !valid_names.contains(name.as_str()) { - return Err(FlussError::new_err(format!( - "Column '{}' not found in table schema", - name - ))); - } - if seen.insert(name.clone()) { - deduped.push(name.clone()); - } - } - - Ok(deduped) - } - /// Internal helper to create log scanner with optional projection fn create_log_scanner_internal<'py>( &self,