Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,13 @@ Cargo.lock
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.vscode/
.vscode/

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
*.egg-info/
dist/
build/
14 changes: 12 additions & 2 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,21 @@ async def main():
append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")

# Test 3: Write Pandas DataFrame
# Test 3: Append single rows
print("\n--- Testing single row append ---")
# Dict input
await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26})
print("Successfully appended row (dict)")

# List input
await append_writer.append([9, "Ivan", 90.0, 31])
print("Successfully appended row (list)")

# Test 4: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
"id": [6, 7],
"id": [10, 11],
"name": ["Frank", "Grace"],
"score": [89.3, 94.7],
"age": [29, 27],
Expand Down
26 changes: 26 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ class FlussTable:
def __repr__(self) -> str: ...

class AppendWriter:
async def append(self, row: dict | list | tuple) -> None:
"""Append a single row to the table.

Args:
row: Dictionary mapping field names to values, or
list/tuple of values in schema order

Supported Types:
Currently supports primitive types only:
- Boolean, TinyInt, SmallInt, Int, BigInt (integers)
- Float, Double (floating point)
- String, Char (text)
- Bytes, Binary (binary data)
- Null values

Temporal types (Date, Timestamp, Decimal) are not yet supported.

Example:
await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
await writer.append([1, 'Alice', 95.5])

Note:
For high-throughput bulk loading, prefer write_arrow_batch().
Use flush() to ensure all queued records are sent and acknowledged.
"""
...
def write_arrow(self, table: pa.Table) -> None: ...
def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
def write_pandas(self, df: pd.DataFrame) -> None: ...
Expand Down
250 changes: 235 additions & 15 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ impl FlussTable {
let table_info = self.table_info.clone();

future_into_py(py, async move {
let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info);
let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone());

let table_append = fluss_table
.new_append()
.map_err(|e| FlussError::new_err(e.to_string()))?;

let rust_writer = table_append.create_writer();

let py_writer = AppendWriter::from_core(rust_writer);
let py_writer = AppendWriter::from_core(rust_writer, table_info);

Python::attach(|py| Py::new(py, py_writer))
})
Expand Down Expand Up @@ -131,13 +131,14 @@ impl FlussTable {
/// Writer for appending data to a Fluss table
#[pyclass]
pub struct AppendWriter {
inner: fcore::client::AppendWriter,
inner: Arc<fcore::client::AppendWriter>,
table_info: fcore::metadata::TableInfo,
}

#[pymethods]
impl AppendWriter {
/// Write Arrow table data
pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> PyResult<()> {
pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
// Convert Arrow Table to batches and write each batch
let batches = table.call_method0(py, "to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
Expand All @@ -149,22 +150,40 @@ impl AppendWriter {
}

/// Write Arrow batch data
pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
let rust_batch: RecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?;

let inner = self.inner.clone();
// Release the GIL before blocking on async operation
let result = py.detach(|| {
TOKIO_RUNTIME.block_on(async { self.inner.append_arrow_batch(rust_batch).await })
TOKIO_RUNTIME.block_on(async { inner.append_arrow_batch(rust_batch).await })
});

result.map_err(|e| FlussError::new_err(e.to_string()))
}

/// Append a single row to the table
pub fn append<'py>(
&self,
py: Python<'py>,
row: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let generic_row = python_to_generic_row(row, &self.table_info)?;
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.append(generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
}

/// Write Pandas DataFrame data
pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;

Expand All @@ -179,12 +198,16 @@ impl AppendWriter {
}

/// Flush any pending data
pub fn flush(&mut self) -> PyResult<()> {
TOKIO_RUNTIME.block_on(async {
self.inner
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
pub fn flush(&self, py: Python) -> PyResult<()> {
let inner = self.inner.clone();
// Release the GIL before blocking on I/O
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
inner
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}

Expand All @@ -195,8 +218,205 @@ impl AppendWriter {

impl AppendWriter {
/// Create a AppendWriter from a core append writer
pub fn from_core(append: fcore::client::AppendWriter) -> Self {
Self { inner: append }
pub fn from_core(
append: fcore::client::AppendWriter,
table_info: fcore::metadata::TableInfo,
) -> Self {
Self {
inner: Arc::new(append),
table_info,
}
}
}

/// Represents different input shapes for a row
#[derive(FromPyObject)]
enum RowInput<'py> {
Dict(Bound<'py, pyo3::types::PyDict>),
Tuple(Bound<'py, pyo3::types::PyTuple>),
List(Bound<'py, pyo3::types::PyList>),
}

/// Helper function to process sequence types (list/tuple) into datums
fn process_sequence_to_datums<'a, I>(
values: I,
len: usize,
fields: &[fcore::metadata::DataField],
) -> PyResult<Vec<fcore::row::Datum<'static>>>
where
I: Iterator<Item = Bound<'a, PyAny>>,
{
if len != fields.len() {
return Err(FlussError::new_err(format!(
"Expected {} values, got {}",
fields.len(),
len
)));
}

let mut datums = Vec::with_capacity(fields.len());
for (i, (field, value)) in fields.iter().zip(values).enumerate() {
datums.push(
python_value_to_datum(&value, field.data_type()).map_err(|e| {
FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e))
})?,
);
}
Ok(datums)
}

/// Convert Python row (dict/list/tuple) to GenericRow based on schema
fn python_to_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
) -> PyResult<fcore::row::GenericRow<'static>> {
// Extract with user-friendly error message
let row_input: RowInput = row.extract().map_err(|_| {
let type_name = row
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!(
"Row must be a dict, list, or tuple; got {}",
type_name
))
})?;
let schema = table_info.row_type();
let fields = schema.fields();

let datums = match row_input {
RowInput::Dict(dict) => {
// Strict: reject unknown keys (and also reject non-str keys nicely)
for (k, _) in dict.iter() {
let key_str = k.extract::<&str>().map_err(|_| {
let key_type = k
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!("Row dict keys must be strings; got {}", key_type))
})?;

if fields.iter().all(|f| f.name() != key_str) {
let expected = fields
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
.join(", ");
return Err(FlussError::new_err(format!(
"Unknown field '{}'. Expected fields: {}",
key_str, expected
)));
}
}

let mut datums = Vec::with_capacity(fields.len());
for field in fields {
let value = dict.get_item(field.name())?.ok_or_else(|| {
FlussError::new_err(format!("Missing field: {}", field.name()))
})?;
datums.push(
python_value_to_datum(&value, field.data_type()).map_err(|e| {
FlussError::new_err(format!("Field '{}': {}", field.name(), e))
})?,
);
}
datums
}

RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?,

RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?,
};

Ok(fcore::row::GenericRow { values: datums })
}

/// Convert Python value to Datum based on data type
fn python_value_to_datum(
value: &Bound<PyAny>,
data_type: &fcore::metadata::DataType,
) -> PyResult<fcore::row::Datum<'static>> {
use fcore::row::{Datum, F32, F64};

if value.is_none() {
return Ok(Datum::Null);
}

match data_type {
fcore::metadata::DataType::Boolean(_) => {
let v: bool = value.extract()?;
Ok(Datum::Bool(v))
}
fcore::metadata::DataType::TinyInt(_) => {
// Strict type checking: reject bool for int columns
if value.is_instance_of::<pyo3::types::PyBool>() {
return Err(FlussError::new_err(
"Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(),
));
}
let v: i8 = value.extract()?;
Ok(Datum::Int8(v))
}
fcore::metadata::DataType::SmallInt(_) => {
if value.is_instance_of::<pyo3::types::PyBool>() {
return Err(FlussError::new_err(
"Expected int for SmallInt column, got bool. Use 0 or 1 explicitly."
.to_string(),
));
}
let v: i16 = value.extract()?;
Ok(Datum::Int16(v))
}
fcore::metadata::DataType::Int(_) => {
if value.is_instance_of::<pyo3::types::PyBool>() {
return Err(FlussError::new_err(
"Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(),
));
}
let v: i32 = value.extract()?;
Ok(Datum::Int32(v))
}
fcore::metadata::DataType::BigInt(_) => {
if value.is_instance_of::<pyo3::types::PyBool>() {
return Err(FlussError::new_err(
"Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(),
));
}
let v: i64 = value.extract()?;
Ok(Datum::Int64(v))
}
fcore::metadata::DataType::Float(_) => {
let v: f32 = value.extract()?;
Ok(Datum::Float32(F32::from(v)))
}
fcore::metadata::DataType::Double(_) => {
let v: f64 = value.extract()?;
Ok(Datum::Float64(F64::from(v)))
}
fcore::metadata::DataType::String(_) | fcore::metadata::DataType::Char(_) => {
let v: String = value.extract()?;
Ok(v.into())
}
fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => {
// Efficient extraction: downcast to specific type and use bulk copy.
// PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data.
if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
Ok(bytes.as_bytes().to_vec().into())
} else if let Ok(bytearray) = value.downcast::<pyo3::types::PyByteArray>() {
Ok(bytearray.to_vec().into())
} else {
Err(FlussError::new_err(format!(
"Expected bytes or bytearray, got {}",
value.get_type().name()?
)))
}
}
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {:?}",
data_type
))),
}
}

Expand Down
Loading