diff --git a/.gitignore b/.gitignore index c6edfb70..8202bbca 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,13 @@ Cargo.lock # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ \ No newline at end of file diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0523f943..3bb60c2c 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -118,11 +118,21 @@ async def main(): append_writer.write_arrow_batch(pa_record_batch) print("Successfully wrote PyArrow RecordBatch") - # Test 3: Write Pandas DataFrame + # Test 3: Append single rows + print("\n--- Testing single row append ---") + # Dict input + await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26}) + print("Successfully appended row (dict)") + + # List input + await append_writer.append([9, "Ivan", 90.0, 31]) + print("Successfully appended row (list)") + + # Test 4: Write Pandas DataFrame print("\n--- Testing Pandas DataFrame write ---") df = pd.DataFrame( { - "id": [6, 7], + "id": [10, 11], "name": ["Frank", "Grace"], "score": [89.3, 94.7], "age": [29, 27], diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 45652425..6073070c 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -68,6 +68,32 @@ class FlussTable: def __repr__(self) -> str: ... class AppendWriter: + async def append(self, row: dict | list | tuple) -> None: + """Append a single row to the table. + + Args: + row: Dictionary mapping field names to values, or + list/tuple of values in schema order + + Supported Types: + Currently supports primitive types only: + - Boolean, TinyInt, SmallInt, Int, BigInt (integers) + - Float, Double (floating point) + - String, Char (text) + - Bytes, Binary (binary data) + - Null values + + Temporal types (Date, Timestamp, Decimal) are not yet supported. + + Example: + await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5}) + await writer.append([1, 'Alice', 95.5]) + + Note: + For high-throughput bulk loading, prefer write_arrow_batch(). + Use flush() to ensure all queued records are sent and acknowledged. + """ + ... def write_arrow(self, table: pa.Table) -> None: ... def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ... def write_pandas(self, df: pd.DataFrame) -> None: ... diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8a116485..a3f24441 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -43,7 +43,7 @@ impl FlussTable { let table_info = self.table_info.clone(); future_into_py(py, async move { - let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info); + let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone()); let table_append = fluss_table .new_append() @@ -51,7 +51,7 @@ impl FlussTable { let rust_writer = table_append.create_writer(); - let py_writer = AppendWriter::from_core(rust_writer); + let py_writer = AppendWriter::from_core(rust_writer, table_info); Python::attach(|py| Py::new(py, py_writer)) }) @@ -131,13 +131,14 @@ impl FlussTable { /// Writer for appending data to a Fluss table #[pyclass] pub struct AppendWriter { - inner: fcore::client::AppendWriter, + inner: Arc, + table_info: fcore::metadata::TableInfo, } #[pymethods] impl AppendWriter { /// Write Arrow table data - pub fn write_arrow(&mut self, py: Python, table: Py) -> PyResult<()> { + pub fn write_arrow(&self, py: Python, table: Py) -> PyResult<()> { // Convert Arrow Table to batches and write each batch let batches = table.call_method0(py, "to_batches")?; let batch_list: Vec> = batches.extract(py)?; @@ -149,22 +150,40 @@ impl AppendWriter { } /// Write Arrow batch data - pub fn write_arrow_batch(&mut self, py: Python, batch: Py) -> PyResult<()> { + pub fn write_arrow_batch(&self, py: Python, batch: Py) -> PyResult<()> { // This shares the underlying Arrow buffers without copying data let batch_bound = batch.bind(py); let rust_batch: RecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound) .map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?; + let inner = self.inner.clone(); // Release the GIL before blocking on async operation let result = py.detach(|| { - TOKIO_RUNTIME.block_on(async { self.inner.append_arrow_batch(rust_batch).await }) + TOKIO_RUNTIME.block_on(async { inner.append_arrow_batch(rust_batch).await }) }); result.map_err(|e| FlussError::new_err(e.to_string())) } + /// Append a single row to the table + pub fn append<'py>( + &self, + py: Python<'py>, + row: &Bound<'py, PyAny>, + ) -> PyResult> { + let generic_row = python_to_generic_row(row, &self.table_info)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .append(generic_row) + .await + .map_err(|e| FlussError::new_err(e.to_string())) + }) + } + /// Write Pandas DataFrame data - pub fn write_pandas(&mut self, py: Python, df: Py) -> PyResult<()> { + pub fn write_pandas(&self, py: Python, df: Py) -> PyResult<()> { // Import pyarrow module let pyarrow = py.import("pyarrow")?; @@ -179,12 +198,16 @@ impl AppendWriter { } /// Flush any pending data - pub fn flush(&mut self) -> PyResult<()> { - TOKIO_RUNTIME.block_on(async { - self.inner - .flush() - .await - .map_err(|e| FlussError::new_err(e.to_string())) + pub fn flush(&self, py: Python) -> PyResult<()> { + let inner = self.inner.clone(); + // Release the GIL before blocking on I/O + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + inner + .flush() + .await + .map_err(|e| FlussError::new_err(e.to_string())) + }) }) } @@ -195,8 +218,205 @@ impl AppendWriter { impl AppendWriter { /// Create a AppendWriter from a core append writer - pub fn from_core(append: fcore::client::AppendWriter) -> Self { - Self { inner: append } + pub fn from_core( + append: fcore::client::AppendWriter, + table_info: fcore::metadata::TableInfo, + ) -> Self { + Self { + inner: Arc::new(append), + table_info, + } + } +} + +/// Represents different input shapes for a row +#[derive(FromPyObject)] +enum RowInput<'py> { + Dict(Bound<'py, pyo3::types::PyDict>), + Tuple(Bound<'py, pyo3::types::PyTuple>), + List(Bound<'py, pyo3::types::PyList>), +} + +/// Helper function to process sequence types (list/tuple) into datums +fn process_sequence_to_datums<'a, I>( + values: I, + len: usize, + fields: &[fcore::metadata::DataField], +) -> PyResult>> +where + I: Iterator>, +{ + if len != fields.len() { + return Err(FlussError::new_err(format!( + "Expected {} values, got {}", + fields.len(), + len + ))); + } + + let mut datums = Vec::with_capacity(fields.len()); + for (i, (field, value)) in fields.iter().zip(values).enumerate() { + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e)) + })?, + ); + } + Ok(datums) +} + +/// Convert Python row (dict/list/tuple) to GenericRow based on schema +fn python_to_generic_row( + row: &Bound, + table_info: &fcore::metadata::TableInfo, +) -> PyResult> { + // Extract with user-friendly error message + let row_input: RowInput = row.extract().map_err(|_| { + let type_name = row + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + FlussError::new_err(format!( + "Row must be a dict, list, or tuple; got {}", + type_name + )) + })?; + let schema = table_info.row_type(); + let fields = schema.fields(); + + let datums = match row_input { + RowInput::Dict(dict) => { + // Strict: reject unknown keys (and also reject non-str keys nicely) + for (k, _) in dict.iter() { + let key_str = k.extract::<&str>().map_err(|_| { + let key_type = k + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + FlussError::new_err(format!("Row dict keys must be strings; got {}", key_type)) + })?; + + if fields.iter().all(|f| f.name() != key_str) { + let expected = fields + .iter() + .map(|f| f.name()) + .collect::>() + .join(", "); + return Err(FlussError::new_err(format!( + "Unknown field '{}'. Expected fields: {}", + key_str, expected + ))); + } + } + + let mut datums = Vec::with_capacity(fields.len()); + for field in fields { + let value = dict.get_item(field.name())?.ok_or_else(|| { + FlussError::new_err(format!("Missing field: {}", field.name())) + })?; + datums.push( + python_value_to_datum(&value, field.data_type()).map_err(|e| { + FlussError::new_err(format!("Field '{}': {}", field.name(), e)) + })?, + ); + } + datums + } + + RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?, + + RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?, + }; + + Ok(fcore::row::GenericRow { values: datums }) +} + +/// Convert Python value to Datum based on data type +fn python_value_to_datum( + value: &Bound, + data_type: &fcore::metadata::DataType, +) -> PyResult> { + use fcore::row::{Datum, F32, F64}; + + if value.is_none() { + return Ok(Datum::Null); + } + + match data_type { + fcore::metadata::DataType::Boolean(_) => { + let v: bool = value.extract()?; + Ok(Datum::Bool(v)) + } + fcore::metadata::DataType::TinyInt(_) => { + // Strict type checking: reject bool for int columns + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i8 = value.extract()?; + Ok(Datum::Int8(v)) + } + fcore::metadata::DataType::SmallInt(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for SmallInt column, got bool. Use 0 or 1 explicitly." + .to_string(), + )); + } + let v: i16 = value.extract()?; + Ok(Datum::Int16(v)) + } + fcore::metadata::DataType::Int(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i32 = value.extract()?; + Ok(Datum::Int32(v)) + } + fcore::metadata::DataType::BigInt(_) => { + if value.is_instance_of::() { + return Err(FlussError::new_err( + "Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(), + )); + } + let v: i64 = value.extract()?; + Ok(Datum::Int64(v)) + } + fcore::metadata::DataType::Float(_) => { + let v: f32 = value.extract()?; + Ok(Datum::Float32(F32::from(v))) + } + fcore::metadata::DataType::Double(_) => { + let v: f64 = value.extract()?; + Ok(Datum::Float64(F64::from(v))) + } + fcore::metadata::DataType::String(_) | fcore::metadata::DataType::Char(_) => { + let v: String = value.extract()?; + Ok(v.into()) + } + fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => { + // Efficient extraction: downcast to specific type and use bulk copy. + // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data. + if let Ok(bytes) = value.downcast::() { + Ok(bytes.as_bytes().to_vec().into()) + } else if let Ok(bytearray) = value.downcast::() { + Ok(bytearray.to_vec().into()) + } else { + Err(FlussError::new_err(format!( + "Expected bytes or bytearray, got {}", + value.get_type().name()? + ))) + } + } + _ => Err(FlussError::new_err(format!( + "Unsupported data type for row-level operations: {:?}", + data_type + ))), } }