From da129cbd0eb6b6b07b44abaff6f5eb27c26310b1 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 11 Jan 2026 05:30:19 +0000 Subject: [PATCH 1/4] [ISSUE-137] Python bindings row-based append API --- .gitignore | 11 +- bindings/python/example/example.py | 14 +- bindings/python/fluss/__init__.pyi | 29 ++++ bindings/python/src/table.rs | 229 ++++++++++++++++++++++++++++- crates/fluss/src/row/datum.rs | 2 +- 5 files changed, 277 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index c6edfb70..8202bbca 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,13 @@ Cargo.lock # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ \ No newline at end of file diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0523f943..8af5f9d9 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -118,11 +118,21 @@ async def main(): append_writer.write_arrow_batch(pa_record_batch) print("Successfully wrote PyArrow RecordBatch") - # Test 3: Write Pandas DataFrame + # Test 3: Append single rows + print("\n--- Testing single row append ---") + # Dict input + append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26}) + print("Successfully appended row (dict)") + + # List input + append_writer.append([9, "Ivan", 90.0, 31]) + print("Successfully appended row (list)") + + # Test 4: Write Pandas DataFrame print("\n--- Testing Pandas DataFrame write ---") df = pd.DataFrame( { - "id": [6, 7], + "id": [10, 11], "name": ["Frank", "Grace"], "score": [89.3, 94.7], "age": [29, 27], diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 45652425..afe21db9 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -68,6 +68,35 @@ class FlussTable: def __repr__(self) -> str: ... class AppendWriter: + def append(self, row: dict | list | tuple) -> None: + """Append a single row to the table. + + Args: + row: Dictionary mapping field names to values, or + list/tuple of values in schema order + + Supported Types: + Currently supports primitive types only: + - Boolean, TinyInt, SmallInt, Int, BigInt (integers) + - Float, Double (floating point) + - String, Char (text) + - Bytes, Binary (binary data) + - Null values + + Temporal types (Date, Timestamp, Decimal) are not yet supported. + + Example: + # Dict input + writer.append({'id': 1, 'name': 'Alice', 'score': 95.5}) + + # List input + writer.append([1, 'Alice', 95.5]) + + Note: + For high-throughput bulk loading, prefer write_arrow_batch(). + Use append() for streaming, CDC, or real-time single-record ingestion. + """ + ... def write_arrow(self, table: pa.Table) -> None: ... def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ... def write_pandas(self, df: pd.DataFrame) -> None: ... diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8a116485..27ece8e5 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -43,7 +43,7 @@ impl FlussTable { let table_info = self.table_info.clone(); future_into_py(py, async move { - let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info); + let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone()); let table_append = fluss_table .new_append() @@ -51,7 +51,7 @@ impl FlussTable { let rust_writer = table_append.create_writer(); - let py_writer = AppendWriter::from_core(rust_writer); + let py_writer = AppendWriter::from_core(rust_writer, table_info); Python::attach(|py| Py::new(py, py_writer)) }) @@ -132,6 +132,7 @@ impl FlussTable { #[pyclass] pub struct AppendWriter { inner: fcore::client::AppendWriter, + table_info: fcore::metadata::TableInfo, } #[pymethods] @@ -163,6 +164,16 @@ impl AppendWriter { result.map_err(|e| FlussError::new_err(e.to_string())) } + /// Append a single row to the table + pub fn append<'py>(&mut self, py: Python<'py>, row: &Bound<'py, PyAny>) -> PyResult<()> { + let generic_row = python_to_generic_row(row, &self.table_info)?; + + let result = + py.detach(|| TOKIO_RUNTIME.block_on(async { self.inner.append(generic_row).await })); + + result.map_err(|e| FlussError::new_err(e.to_string())) + } + /// Write Pandas DataFrame data pub fn write_pandas(&mut self, py: Python, df: Py) -> PyResult<()> { // Import pyarrow module @@ -195,8 +206,218 @@ impl AppendWriter { impl AppendWriter { /// Create a AppendWriter from a core append writer - pub fn from_core(append: fcore::client::AppendWriter) -> Self { - Self { inner: append } + pub fn from_core( + append: fcore::client::AppendWriter, + table_info: fcore::metadata::TableInfo, + ) -> Self { + Self { + inner: append, + table_info, + } + } +} + +/// Represents different input shapes for a row +#[derive(FromPyObject)] +enum RowInput<'py> { + Dict(Bound<'py, pyo3::types::PyDict>), + Tuple(Bound<'py, pyo3::types::PyTuple>), + List(Bound<'py, pyo3::types::PyList>), +} + +/// Convert Python row (dict/list/tuple) to GenericRow based on schema +fn python_to_generic_row( + row: &Bound, + table_info: &fcore::metadata::TableInfo, +) -> PyResult> { + // Extract with user-friendly error message + let row_input: RowInput = row.extract().map_err(|_| { + let type_name = row + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + FlussError::new_err(format!( + "Row must be a dict, list, or tuple; got {}", + type_name + )) + })?; + let schema = table_info.row_type(); + let fields = schema.fields(); + let mut datums = Vec::with_capacity(fields.len()); + + match row_input { + RowInput::Dict(dict) => { + // Strict: reject unknown keys (and also reject non-str keys nicely) + for (k, _) in dict.iter() { + let key_str = k.extract::<&str>().map_err(|_| { + let key_type = k + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + FlussError::new_err(format!("Row dict keys must be strings; got {}", key_type)) + })?; + + if fields.iter().all(|f| f.name() != key_str) { + let expected = fields + .iter() + .map(|f| f.name()) + .collect::>() + .join(", "); + return Err(FlussError::new_err(format!( + "Unknown field '{}'. Expected fields: {}", + key_str, expected + ))); + } + } + + for field in fields { + let value = dict.get_item(field.name())?.ok_or_else(|| { + FlussError::new_err(format!("Missing field: {}", field.name())) + })?; + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!("Field '{}': {}", field.name(), e)) + })?, + ); + } + } + + RowInput::List(list) => { + if list.len() != fields.len() { + return Err(FlussError::new_err(format!( + "Expected {} values, got {}", + fields.len(), + list.len() + ))); + } + + for (i, (field, value)) in fields.iter().zip(list.iter()).enumerate() { + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!( + "Field '{}' (index {}): {}", + field.name(), + i, + e + )) + })?, + ); + } + } + + RowInput::Tuple(tuple) => { + if tuple.len() != fields.len() { + return Err(FlussError::new_err(format!( + "Expected {} values, got {}", + fields.len(), + tuple.len() + ))); + } + + for (i, (field, value)) in fields.iter().zip(tuple.iter()).enumerate() { + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!( + "Field '{}' (index {}): {}", + field.name(), + i, + e + )) + })?, + ); + } + } + } + + Ok(fcore::row::GenericRow { values: datums }) +} + +/// Convert Python value to Datum based on data type +fn python_value_to_datum( + value: &Bound, + data_type: &fcore::metadata::DataType, +) -> PyResult> { + use fcore::row::{Datum, F32, F64}; + + if value.is_none() { + return Ok(Datum::Null); + } + + match data_type { + fcore::metadata::DataType::Boolean(_) => { + let v: bool = value.extract()?; + Ok(Datum::Bool(v)) + } + fcore::metadata::DataType::TinyInt(_) => { + // Strict type checking: reject bool for int columns + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i8 = value.extract()?; + Ok(Datum::Int8(v)) + } + fcore::metadata::DataType::SmallInt(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for SmallInt column, got bool. Use 0 or 1 explicitly." + .to_string(), + )); + } + let v: i16 = value.extract()?; + Ok(Datum::Int16(v)) + } + fcore::metadata::DataType::Int(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i32 = value.extract()?; + Ok(Datum::Int32(v)) + } + fcore::metadata::DataType::BigInt(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i64 = value.extract()?; + Ok(Datum::Int64(v)) + } + fcore::metadata::DataType::Float(_) => { + let v: f32 = value.extract()?; + Ok(Datum::Float32(F32::from(v))) + } + fcore::metadata::DataType::Double(_) => { + let v: f64 = value.extract()?; + Ok(Datum::Float64(F64::from(v))) + } + fcore::metadata::DataType::String(_) | fcore::metadata::DataType::Char(_) => { + let v: String = value.extract()?; + Ok(v.into()) + } + fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => { + // Efficient extraction: downcast to specific type and use bulk copy + // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies, + if let Ok(bytes) = value.downcast::() { + Ok(bytes.as_bytes().to_vec().into()) + } else if let Ok(bytearray) = value.downcast::() { + Ok(bytearray.to_vec().into()) + } else { + Err(FlussError::new_err(format!( + "Expected bytes or bytearray, got {}", + value.get_type().name()? + ))) + } + } + _ => Err(FlussError::new_err(format!( + "Unsupported data type for row-level operations: {:?}", + data_type + ))), } } diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index fa85ded4..fbb24b3c 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -403,7 +403,7 @@ mod tests { #[test] fn datum_accessors_and_conversions() { - let datum = Datum::String("value"); + let datum: Datum = "value".into(); assert_eq!(datum.as_str(), "value"); assert!(!datum.is_null()); From cf093109e620fe5c3f7e8f26fd0ac6d8d8d20ce7 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 15 Jan 2026 16:41:56 +0000 Subject: [PATCH 2/4] cleanup and address review --- bindings/python/src/table.rs | 85 +++++++++++++++--------------------- 1 file changed, 36 insertions(+), 49 deletions(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 27ece8e5..cdaa4ae5 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -225,6 +225,34 @@ enum RowInput<'py> { List(Bound<'py, pyo3::types::PyList>), } +/// Helper function to process sequence types (list/tuple) into datums +fn process_sequence_to_datums<'a, I>( + values: I, + len: usize, + fields: &[fcore::metadata::DataField], +) -> PyResult>> +where + I: Iterator>, +{ + if len != fields.len() { + return Err(FlussError::new_err(format!( + "Expected {} values, got {}", + fields.len(), + len + ))); + } + + let mut datums = Vec::with_capacity(fields.len()); + for (i, (field, value)) in fields.iter().zip(values).enumerate() { + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e)) + })?, + ); + } + Ok(datums) +} + /// Convert Python row (dict/list/tuple) to GenericRow based on schema fn python_to_generic_row( row: &Bound, @@ -244,9 +272,8 @@ fn python_to_generic_row( })?; let schema = table_info.row_type(); let fields = schema.fields(); - let mut datums = Vec::with_capacity(fields.len()); - match row_input { + let datums = match row_input { RowInput::Dict(dict) => { // Strict: reject unknown keys (and also reject non-str keys nicely) for (k, _) in dict.iter() { @@ -272,6 +299,7 @@ fn python_to_generic_row( } } + let mut datums = Vec::with_capacity(fields.len()); for field in fields { let value = dict.get_item(field.name())?.ok_or_else(|| { FlussError::new_err(format!("Missing field: {}", field.name())) @@ -282,54 +310,13 @@ fn python_to_generic_row( })?, ); } + datums } - RowInput::List(list) => { - if list.len() != fields.len() { - return Err(FlussError::new_err(format!( - "Expected {} values, got {}", - fields.len(), - list.len() - ))); - } - - for (i, (field, value)) in fields.iter().zip(list.iter()).enumerate() { - datums.push( - python_value_to_datum(&value, field.data_type()).map_err(|e| { - FlussError::new_err(format!( - "Field '{}' (index {}): {}", - field.name(), - i, - e - )) - })?, - ); - } - } - - RowInput::Tuple(tuple) => { - if tuple.len() != fields.len() { - return Err(FlussError::new_err(format!( - "Expected {} values, got {}", - fields.len(), - tuple.len() - ))); - } + RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?, - for (i, (field, value)) in fields.iter().zip(tuple.iter()).enumerate() { - datums.push( - python_value_to_datum(&value, field.data_type()).map_err(|e| { - FlussError::new_err(format!( - "Field '{}' (index {}): {}", - field.name(), - i, - e - )) - })?, - ); - } - } - } + RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?, + }; Ok(fcore::row::GenericRow { values: datums }) } @@ -401,8 +388,8 @@ fn python_value_to_datum( Ok(v.into()) } fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => { - // Efficient extraction: downcast to specific type and use bulk copy - // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies, + // Efficient extraction: downcast to specific type and use bulk copy. + // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data. if let Ok(bytes) = value.downcast::() { Ok(bytes.as_bytes().to_vec().into()) } else if let Ok(bytearray) = value.downcast::() { From fbacfc0e94c6aae9c544f389ef337377abd1508d Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 16 Jan 2026 12:05:05 +0000 Subject: [PATCH 3/4] make async --- bindings/python/example/example.py | 4 +-- bindings/python/fluss/__init__.pyi | 11 +++----- bindings/python/src/table.rs | 42 ++++++++++++++++++------------ 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 8af5f9d9..3bb60c2c 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -121,11 +121,11 @@ async def main(): # Test 3: Append single rows print("\n--- Testing single row append ---") # Dict input - append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26}) + await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26}) print("Successfully appended row (dict)") # List input - append_writer.append([9, "Ivan", 90.0, 31]) + await append_writer.append([9, "Ivan", 90.0, 31]) print("Successfully appended row (list)") # Test 4: Write Pandas DataFrame diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index afe21db9..6073070c 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -68,7 +68,7 @@ class FlussTable: def __repr__(self) -> str: ... class AppendWriter: - def append(self, row: dict | list | tuple) -> None: + async def append(self, row: dict | list | tuple) -> None: """Append a single row to the table. Args: @@ -86,15 +86,12 @@ class AppendWriter: Temporal types (Date, Timestamp, Decimal) are not yet supported. Example: - # Dict input - writer.append({'id': 1, 'name': 'Alice', 'score': 95.5}) - - # List input - writer.append([1, 'Alice', 95.5]) + await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5}) + await writer.append([1, 'Alice', 95.5]) Note: For high-throughput bulk loading, prefer write_arrow_batch(). - Use append() for streaming, CDC, or real-time single-record ingestion. + Use flush() to ensure all queued records are sent and acknowledged. """ ... def write_arrow(self, table: pa.Table) -> None: ... diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index cdaa4ae5..0c5b493f 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -131,14 +131,14 @@ impl FlussTable { /// Writer for appending data to a Fluss table #[pyclass] pub struct AppendWriter { - inner: fcore::client::AppendWriter, + inner: Arc, table_info: fcore::metadata::TableInfo, } #[pymethods] impl AppendWriter { /// Write Arrow table data - pub fn write_arrow(&mut self, py: Python, table: Py) -> PyResult<()> { + pub fn write_arrow(&self, py: Python, table: Py) -> PyResult<()> { // Convert Arrow Table to batches and write each batch let batches = table.call_method0(py, "to_batches")?; let batch_list: Vec> = batches.extract(py)?; @@ -150,32 +150,36 @@ impl AppendWriter { } /// Write Arrow batch data - pub fn write_arrow_batch(&mut self, py: Python, batch: Py) -> PyResult<()> { + pub fn write_arrow_batch(&self, py: Python, batch: Py) -> PyResult<()> { // This shares the underlying Arrow buffers without copying data let batch_bound = batch.bind(py); let rust_batch: RecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound) .map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?; + let inner = self.inner.clone(); // Release the GIL before blocking on async operation let result = py.detach(|| { - TOKIO_RUNTIME.block_on(async { self.inner.append_arrow_batch(rust_batch).await }) + TOKIO_RUNTIME.block_on(async { inner.append_arrow_batch(rust_batch).await }) }); result.map_err(|e| FlussError::new_err(e.to_string())) } /// Append a single row to the table - pub fn append<'py>(&mut self, py: Python<'py>, row: &Bound<'py, PyAny>) -> PyResult<()> { + pub fn append<'py>(&self, py: Python<'py>, row: &Bound<'py, PyAny>) -> PyResult> { let generic_row = python_to_generic_row(row, &self.table_info)?; + let inner = self.inner.clone(); - let result = - py.detach(|| TOKIO_RUNTIME.block_on(async { self.inner.append(generic_row).await })); - - result.map_err(|e| FlussError::new_err(e.to_string())) + future_into_py(py, async move { + inner + .append(generic_row) + .await + .map_err(|e| FlussError::new_err(e.to_string())) + }) } /// Write Pandas DataFrame data - pub fn write_pandas(&mut self, py: Python, df: Py) -> PyResult<()> { + pub fn write_pandas(&self, py: Python, df: Py) -> PyResult<()> { // Import pyarrow module let pyarrow = py.import("pyarrow")?; @@ -190,12 +194,16 @@ impl AppendWriter { } /// Flush any pending data - pub fn flush(&mut self) -> PyResult<()> { - TOKIO_RUNTIME.block_on(async { - self.inner - .flush() - .await - .map_err(|e| FlussError::new_err(e.to_string())) + pub fn flush(&self, py: Python) -> PyResult<()> { + let inner = self.inner.clone(); + // Release the GIL before blocking on I/O + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + inner + .flush() + .await + .map_err(|e| FlussError::new_err(e.to_string())) + }) }) } @@ -211,7 +219,7 @@ impl AppendWriter { table_info: fcore::metadata::TableInfo, ) -> Self { Self { - inner: append, + inner: Arc::new(append), table_info, } } From 560b8999d0bb71375fc667b087bfc85f8339be97 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 17 Jan 2026 01:47:49 +0000 Subject: [PATCH 4/4] fix fmt --- bindings/python/src/table.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 0c5b493f..a3f24441 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -166,7 +166,11 @@ impl AppendWriter { } /// Append a single row to the table - pub fn append<'py>(&self, py: Python<'py>, row: &Bound<'py, PyAny>) -> PyResult> { + pub fn append<'py>( + &self, + py: Python<'py>, + row: &Bound<'py, PyAny>, + ) -> PyResult> { let generic_row = python_to_generic_row(row, &self.table_info)?; let inner = self.inner.clone();