Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,11 @@ pub trait BinaryWriter {

fn write_binary(&mut self, bytes: &[u8], length: usize);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for miss that, can we also support write_time in this pr?

// TODO Decimal type
// fn write_decimal(&mut self, pos: i32, value: f64);
fn write_decimal(&mut self, value: &rust_decimal::Decimal, precision: u32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: decimal as name should suffice, none other args within this trait starts with rust_


// TODO Timestamp type
// fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
fn write_timestamp_ntz(&mut self, value: i64, precision: u32);

// TODO Timestamp type
// fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
fn write_timestamp_ltz(&mut self, value: i64, precision: u32);

// TODO InternalArray, ArraySerializer
// fn write_array(&mut self, pos: i32, value: i64);
Expand Down Expand Up @@ -125,7 +122,11 @@ pub enum InnerValueWriter {
BigInt,
Float,
Double,
// TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, TimestampWithLocalTimeZone, Array, Row
Decimal(u32), // precision
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Decimal(u32), // precision
Decimal(u32, u32), // precision, scale

Date,
TimestampNtz(u32), // precision
TimestampLtz(u32), // precision
// TODO TimeWithoutTimeZone, Array, Row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also support Time in this pr?

}

/// Accessor for writing the fields/elements of a binary writer during runtime, the
Expand All @@ -147,6 +148,10 @@ impl InnerValueWriter {
DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
DataType::Float(_) => Ok(InnerValueWriter::Float),
DataType::Double(_) => Ok(InnerValueWriter::Double),
DataType::Decimal(d) => Ok(InnerValueWriter::Decimal(d.precision())),
DataType::Date(_) => Ok(InnerValueWriter::Date),
DataType::Timestamp(t) => Ok(InnerValueWriter::TimestampNtz(t.precision())),
DataType::TimestampLTz(t) => Ok(InnerValueWriter::TimestampLtz(t.precision())),
_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
Expand Down Expand Up @@ -194,6 +199,18 @@ impl InnerValueWriter {
(InnerValueWriter::Double, Datum::Float64(v)) => {
writer.write_double(v.into_inner());
}
(InnerValueWriter::Decimal(p), Datum::Decimal(v)) => {
writer.write_decimal(v, *p);
}
(InnerValueWriter::Date, Datum::Date(d)) => {
writer.write_int(d.get_inner());
}
(InnerValueWriter::TimestampNtz(p), Datum::Timestamp(ts)) => {
writer.write_timestamp_ntz(ts.get_inner(), *p);
}
(InnerValueWriter::TimestampLtz(p), Datum::TimestampTz(ts)) => {
writer.write_timestamp_ltz(ts.get_inner(), *p);
}
_ => {
return Err(IllegalArgument {
message: format!("{self:?} used to write value {value:?}"),
Expand Down
12 changes: 12 additions & 0 deletions crates/fluss/src/row/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ impl InternalRow for ColumnarRow {
.value(self.row_id)
}

fn get_date(&self, pos: usize) -> i32 {
self.get_int(pos)
}

fn get_timestamp_ntz(&self, pos: usize) -> i64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can return Timestamp directly

self.get_long(pos)
}

fn get_timestamp_ltz(&self, pos: usize) -> i64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn get_timestamp_ltz(&self, pos: usize) -> i64 {
fn get_timestamp_ltz(&self, pos: usize) -> TimestampLtz {

self.get_long(pos)
}

fn get_char(&self, pos: usize, _length: usize) -> &str {
let array = self
.record_batch
Expand Down
7 changes: 7 additions & 0 deletions crates/fluss/src/row/compacted/compacted_key_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::Result;
use crate::metadata::DataType;
use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
use delegate::delegate;
use rust_decimal::Decimal;

/// A wrapping of [`CompactedRowWriter`] used to encode key columns.
/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to
Expand Down Expand Up @@ -87,6 +88,12 @@ impl BinaryWriter for CompactedKeyWriter {

fn write_double(&mut self, value: f64);

fn write_decimal(&mut self, value: &Decimal, precision: u32);

fn write_timestamp_ntz(&mut self, value: i64, precision: u32);

fn write_timestamp_ltz(&mut self, value: i64, precision: u32);


}
}
Expand Down
17 changes: 17 additions & 0 deletions crates/fluss/src/row/compacted/compacted_row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,21 @@ impl CompactedRowWriter {
pub fn write_double(&mut self, value: f64) {
self.write_raw(&value.to_ne_bytes());
}

pub fn write_decimal(&mut self, value: &rust_decimal::Decimal, _precision: u32) {
// For now, serialize decimal to its string representation and write as bytes
// TODO: implement compact decimal encoding based on precision similar to Java implementation
let s = value.to_string();
self.write_bytes(s.as_bytes());
Comment on lines +158 to +161
Copy link
Contributor

@leekeiabstraction leekeiabstraction Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really have an implementation is consistent with Java's for this task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

pub fn write_timestamp_ntz(&mut self, value: i64, _precision: u32) {
// Currently write timestamp as a long (epoch millis or other unit depending on upstream)
self.write_long(value);
Comment on lines +165 to +166
Copy link
Contributor

@leekeiabstraction leekeiabstraction Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really have an implementation is consistent with Java's for this task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

pub fn write_timestamp_ltz(&mut self, value: i64, _precision: u32) {
// Currently write timestamp as a long (epoch millis or other unit depending on upstream)
self.write_long(value);
Comment on lines +170 to +171
Copy link
Contributor

@leekeiabstraction leekeiabstraction Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really have an implementation is consistent with Java's for this task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}
22 changes: 21 additions & 1 deletion crates/fluss/src/row/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,29 @@ pub struct Date(i32);
#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
pub struct Timestamp(i64);

impl Timestamp {
pub const fn new(inner: i64) -> Self {
Timestamp(inner)
}

pub fn get_inner(&self) -> i64 {
self.0
}
}

#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
pub struct TimestampLtz(i64);

impl TimestampLtz {
pub const fn new(inner: i64) -> Self {
TimestampLtz(inner)
}

pub fn get_inner(&self) -> i64 {
self.0
}
}

pub type Blob<'a> = Cow<'a, [u8]>;

impl<'a> From<Vec<u8>> for Datum<'a> {
Expand Down Expand Up @@ -403,7 +423,7 @@ mod tests {

#[test]
fn datum_accessors_and_conversions() {
let datum = Datum::String("value");
let datum = Datum::String(Cow::Borrowed("value"));
assert_eq!(datum.as_str(), "value");
assert!(!datum.is_null());

Expand Down
15 changes: 9 additions & 6 deletions crates/fluss/src/row/encode/compacted_key_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ mod tests {
DataTypes::bigint(),
DataTypes::float(),
DataTypes::double(),
// TODO Date
// TODO Time
DataTypes::date(),
// TIME is not yet represented in Datum
DataTypes::timestamp(),
DataTypes::binary(20),
DataTypes::bytes(),
DataTypes::char(2),
DataTypes::string(),
// TODO Decimal
// TODO Timestamp
// TODO Timestamp LTZ
// TODO Array of Int
// TODO Array of Float
Expand All @@ -270,14 +270,13 @@ mod tests {
Datum::from(-6101065172474983726i64), // from Java test case: new BigInteger("12345678901234567890").longValue()
Datum::from(13.2f32),
Datum::from(15.21f64),
// TODO Date
// TODO Time
Datum::Date(crate::row::datum::Date::new(5)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does 5 mean?

I suggest using the date 2023-10-25 to keep test case parity with java side.

Ref here: https://github.com/apache/fluss/blob/2b0e2b1b08ede61874e71e21877134d8945fe8c8/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java#L196

Datum::Timestamp(crate::row::datum::Timestamp::new(13)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datum::from("1234567890".as_bytes()),
Datum::from("20".as_bytes()),
Datum::from("1"),
Datum::from("hello"),
// TODO Decimal
// TODO Timestamp
// TODO Timestamp LTZ
// TODO Array of Int
// TODO Array of Float
Expand All @@ -304,6 +303,10 @@ mod tests {
expected.extend(vec![0x33, 0x33, 0x53, 0x41]);
// DOUBLE: 15.21
expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]);
// DATE: 5
expected.extend(vec![0x05]);
// TIMESTAMP: 13
expected.extend(vec![0x0D]);
// BINARY(20): "1234567890".getBytes()
expected.extend(vec![
0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30,
Expand Down
21 changes: 19 additions & 2 deletions crates/fluss/src/row/field_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl FieldGetter {
DataType::BigInt(_) => InnerFieldGetter::BigInt { pos },
DataType::Float(_) => InnerFieldGetter::Float { pos },
DataType::Double(_) => InnerFieldGetter::Double { pos },
DataType::Date(_) => InnerFieldGetter::Date { pos },
DataType::Timestamp(_) => InnerFieldGetter::Timestamp { pos },
DataType::TimestampLTz(_) => InnerFieldGetter::TimestampLtz { pos },
_ => unimplemented!("DataType {:?} is currently unimplemented", data_type),
};

Expand All @@ -78,6 +81,9 @@ pub enum InnerFieldGetter {
BigInt { pos: usize },
Float { pos: usize },
Double { pos: usize },
Date { pos: usize },
Timestamp { pos: usize },
TimestampLtz { pos: usize },
}

impl InnerFieldGetter {
Expand All @@ -94,7 +100,15 @@ impl InnerFieldGetter {
InnerFieldGetter::BigInt { pos } => Datum::from(row.get_long(*pos)),
InnerFieldGetter::Float { pos } => Datum::from(row.get_float(*pos)),
InnerFieldGetter::Double { pos } => Datum::from(row.get_double(*pos)),
//TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map, Row
InnerFieldGetter::Date { pos } => {
Datum::Date(crate::row::datum::Date::new(row.get_date(*pos)))
}
InnerFieldGetter::Timestamp { pos } => Datum::Timestamp(
crate::row::datum::Timestamp::new(row.get_timestamp_ntz(*pos)),
),
InnerFieldGetter::TimestampLtz { pos } => Datum::TimestampTz(
crate::row::datum::TimestampLtz::new(row.get_timestamp_ltz(*pos)),
), //TODO Decimal, Time, Array, Map, Row
}
}

Expand All @@ -110,7 +124,10 @@ impl InnerFieldGetter {
| Self::Int { pos }
| Self::BigInt { pos }
| Self::Float { pos, .. }
| Self::Double { pos } => *pos,
| Self::Double { pos }
| Self::Date { pos }
| Self::Timestamp { pos }
| Self::TimestampLtz { pos } => *pos,
}
}
}
35 changes: 31 additions & 4 deletions crates/fluss/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ pub trait InternalRow {
// /// Returns the decimal value at the given position
// fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Decimal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please also support this method?


// /// Returns the timestamp value at the given position
// fn get_timestamp_ntz(&self, pos: usize, precision: usize) -> TimestampNtz;
/// Returns the date value at the given position (date as days since epoch)
fn get_date(&self, pos: usize) -> i32;

// /// Returns the timestamp value at the given position
// fn get_timestamp_ltz(&self, pos: usize, precision: usize) -> TimestampLtz;
/// Returns the timestamp value at the given position (timestamp without timezone)
fn get_timestamp_ntz(&self, pos: usize) -> i64;

/// Returns the timestamp value at the given position (timestamp with local timezone)
fn get_timestamp_ltz(&self, pos: usize) -> i64;

/// Returns the binary value at the given position with fixed length
fn get_binary(&self, pos: usize, length: usize) -> &[u8];
Expand Down Expand Up @@ -114,6 +117,30 @@ impl<'a> InternalRow for GenericRow<'a> {
self.values.get(_pos).unwrap().try_into().unwrap()
}

fn get_date(&self, pos: usize) -> i32 {
match self.values.get(pos).unwrap() {
Datum::Date(d) => d.get_inner(),
Datum::Int32(i) => *i,
other => panic!("Expected Date or Int32 at pos {pos:?}, got {other:?}"),
}
}

fn get_timestamp_ntz(&self, pos: usize) -> i64 {
match self.values.get(pos).unwrap() {
Datum::Timestamp(t) => t.get_inner(),
Datum::Int64(i) => *i,
other => panic!("Expected Timestamp or Int64 at pos {pos:?}, got {other:?}"),
}
}

fn get_timestamp_ltz(&self, pos: usize) -> i64 {
match self.values.get(pos).unwrap() {
Datum::TimestampTz(t) => t.get_inner(),
Datum::Int64(i) => *i,
other => panic!("Expected TimestampTz or Int64 at pos {pos:?}, got {other:?}"),
}
}

fn get_float(&self, pos: usize) -> f32 {
self.values.get(pos).unwrap().try_into().unwrap()
}
Expand Down
Loading