diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index 9917c7b7..117ecaad 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -52,14 +52,11 @@ pub trait BinaryWriter { fn write_binary(&mut self, bytes: &[u8], length: usize); - // TODO Decimal type - // fn write_decimal(&mut self, pos: i32, value: f64); + fn write_decimal(&mut self, value: &rust_decimal::Decimal, precision: u32); - // TODO Timestamp type - // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + fn write_timestamp_ntz(&mut self, value: i64, precision: u32); - // TODO Timestamp type - // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + fn write_timestamp_ltz(&mut self, value: i64, precision: u32); // TODO InternalArray, ArraySerializer // fn write_array(&mut self, pos: i32, value: i64); @@ -125,7 +122,11 @@ pub enum InnerValueWriter { BigInt, Float, Double, - // TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, TimestampWithLocalTimeZone, Array, Row + Decimal(u32), // precision + Date, + TimestampNtz(u32), // precision + TimestampLtz(u32), // precision + // TODO TimeWithoutTimeZone, Array, Row } /// Accessor for writing the fields/elements of a binary writer during runtime, the @@ -147,6 +148,10 @@ impl InnerValueWriter { DataType::BigInt(_) => Ok(InnerValueWriter::BigInt), DataType::Float(_) => Ok(InnerValueWriter::Float), DataType::Double(_) => Ok(InnerValueWriter::Double), + DataType::Decimal(d) => Ok(InnerValueWriter::Decimal(d.precision())), + DataType::Date(_) => Ok(InnerValueWriter::Date), + DataType::Timestamp(t) => Ok(InnerValueWriter::TimestampNtz(t.precision())), + DataType::TimestampLTz(t) => Ok(InnerValueWriter::TimestampLtz(t.precision())), _ => unimplemented!( "ValueWriter for DataType {:?} is currently not implemented", data_type @@ -194,6 +199,18 @@ impl InnerValueWriter { (InnerValueWriter::Double, Datum::Float64(v)) => { writer.write_double(v.into_inner()); } + (InnerValueWriter::Decimal(p), Datum::Decimal(v)) => { + writer.write_decimal(v, *p); + } + (InnerValueWriter::Date, Datum::Date(d)) => { + writer.write_int(d.get_inner()); + } + (InnerValueWriter::TimestampNtz(p), Datum::Timestamp(ts)) => { + writer.write_timestamp_ntz(ts.get_inner(), *p); + } + (InnerValueWriter::TimestampLtz(p), Datum::TimestampTz(ts)) => { + writer.write_timestamp_ltz(ts.get_inner(), *p); + } _ => { return Err(IllegalArgument { message: format!("{self:?} used to write value {value:?}"), diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 31f0fdf2..757cc6ba 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -126,6 +126,18 @@ impl InternalRow for ColumnarRow { .value(self.row_id) } + fn get_date(&self, pos: usize) -> i32 { + self.get_int(pos) + } + + fn get_timestamp_ntz(&self, pos: usize) -> i64 { + self.get_long(pos) + } + + fn get_timestamp_ltz(&self, pos: usize) -> i64 { + self.get_long(pos) + } + fn get_char(&self, pos: usize, _length: usize) -> &str { let array = self .record_batch diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs b/crates/fluss/src/row/compacted/compacted_key_writer.rs index 84a6b227..038a6157 100644 --- a/crates/fluss/src/row/compacted/compacted_key_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs @@ -22,6 +22,7 @@ use crate::error::Result; use crate::metadata::DataType; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; use delegate::delegate; +use rust_decimal::Decimal; /// A wrapping of [`CompactedRowWriter`] used to encode key columns. /// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to @@ -87,6 +88,12 @@ impl BinaryWriter for CompactedKeyWriter { fn write_double(&mut self, value: f64); + fn write_decimal(&mut self, value: &Decimal, precision: u32); + + fn write_timestamp_ntz(&mut self, value: i64, precision: u32); + + fn write_timestamp_ltz(&mut self, value: i64, precision: u32); + } } diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index 83451235..e44b54f0 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -153,4 +153,21 @@ impl CompactedRowWriter { pub fn write_double(&mut self, value: f64) { self.write_raw(&value.to_ne_bytes()); } + + pub fn write_decimal(&mut self, value: &rust_decimal::Decimal, _precision: u32) { + // For now, serialize decimal to its string representation and write as bytes + // TODO: implement compact decimal encoding based on precision similar to Java implementation + let s = value.to_string(); + self.write_bytes(s.as_bytes()); + } + + pub fn write_timestamp_ntz(&mut self, value: i64, _precision: u32) { + // Currently write timestamp as a long (epoch millis or other unit depending on upstream) + self.write_long(value); + } + + pub fn write_timestamp_ltz(&mut self, value: i64, _precision: u32) { + // Currently write timestamp as a long (epoch millis or other unit depending on upstream) + self.write_long(value); + } } diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index fa85ded4..af972474 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -352,9 +352,29 @@ pub struct Date(i32); #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct Timestamp(i64); +impl Timestamp { + pub const fn new(inner: i64) -> Self { + Timestamp(inner) + } + + pub fn get_inner(&self) -> i64 { + self.0 + } +} + #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct TimestampLtz(i64); +impl TimestampLtz { + pub const fn new(inner: i64) -> Self { + TimestampLtz(inner) + } + + pub fn get_inner(&self) -> i64 { + self.0 + } +} + pub type Blob<'a> = Cow<'a, [u8]>; impl<'a> From> for Datum<'a> { @@ -403,7 +423,7 @@ mod tests { #[test] fn datum_accessors_and_conversions() { - let datum = Datum::String("value"); + let datum = Datum::String(Cow::Borrowed("value")); assert_eq!(datum.as_str(), "value"); assert!(!datum.is_null()); diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs b/crates/fluss/src/row/encode/compacted_key_encoder.rs index ebe3da2a..eef71b32 100644 --- a/crates/fluss/src/row/encode/compacted_key_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs @@ -247,14 +247,14 @@ mod tests { DataTypes::bigint(), DataTypes::float(), DataTypes::double(), - // TODO Date - // TODO Time + DataTypes::date(), + // TIME is not yet represented in Datum + DataTypes::timestamp(), DataTypes::binary(20), DataTypes::bytes(), DataTypes::char(2), DataTypes::string(), // TODO Decimal - // TODO Timestamp // TODO Timestamp LTZ // TODO Array of Int // TODO Array of Float @@ -270,14 +270,13 @@ mod tests { Datum::from(-6101065172474983726i64), // from Java test case: new BigInteger("12345678901234567890").longValue() Datum::from(13.2f32), Datum::from(15.21f64), - // TODO Date - // TODO Time + Datum::Date(crate::row::datum::Date::new(5)), + Datum::Timestamp(crate::row::datum::Timestamp::new(13)), Datum::from("1234567890".as_bytes()), Datum::from("20".as_bytes()), Datum::from("1"), Datum::from("hello"), // TODO Decimal - // TODO Timestamp // TODO Timestamp LTZ // TODO Array of Int // TODO Array of Float @@ -304,6 +303,10 @@ mod tests { expected.extend(vec![0x33, 0x33, 0x53, 0x41]); // DOUBLE: 15.21 expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]); + // DATE: 5 + expected.extend(vec![0x05]); + // TIMESTAMP: 13 + expected.extend(vec![0x0D]); // BINARY(20): "1234567890".getBytes() expected.extend(vec![ 0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index 8e529e54..6ab9bf9c 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -55,6 +55,9 @@ impl FieldGetter { DataType::BigInt(_) => InnerFieldGetter::BigInt { pos }, DataType::Float(_) => InnerFieldGetter::Float { pos }, DataType::Double(_) => InnerFieldGetter::Double { pos }, + DataType::Date(_) => InnerFieldGetter::Date { pos }, + DataType::Timestamp(_) => InnerFieldGetter::Timestamp { pos }, + DataType::TimestampLTz(_) => InnerFieldGetter::TimestampLtz { pos }, _ => unimplemented!("DataType {:?} is currently unimplemented", data_type), }; @@ -78,6 +81,9 @@ pub enum InnerFieldGetter { BigInt { pos: usize }, Float { pos: usize }, Double { pos: usize }, + Date { pos: usize }, + Timestamp { pos: usize }, + TimestampLtz { pos: usize }, } impl InnerFieldGetter { @@ -94,7 +100,15 @@ impl InnerFieldGetter { InnerFieldGetter::BigInt { pos } => Datum::from(row.get_long(*pos)), InnerFieldGetter::Float { pos } => Datum::from(row.get_float(*pos)), InnerFieldGetter::Double { pos } => Datum::from(row.get_double(*pos)), - //TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map, Row + InnerFieldGetter::Date { pos } => { + Datum::Date(crate::row::datum::Date::new(row.get_date(*pos))) + } + InnerFieldGetter::Timestamp { pos } => Datum::Timestamp( + crate::row::datum::Timestamp::new(row.get_timestamp_ntz(*pos)), + ), + InnerFieldGetter::TimestampLtz { pos } => Datum::TimestampTz( + crate::row::datum::TimestampLtz::new(row.get_timestamp_ltz(*pos)), + ), //TODO Decimal, Time, Array, Map, Row } } @@ -110,7 +124,10 @@ impl InnerFieldGetter { | Self::Int { pos } | Self::BigInt { pos } | Self::Float { pos, .. } - | Self::Double { pos } => *pos, + | Self::Double { pos } + | Self::Date { pos } + | Self::Timestamp { pos } + | Self::TimestampLtz { pos } => *pos, } } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index c321ab9d..6521b277 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -65,11 +65,14 @@ pub trait InternalRow { // /// Returns the decimal value at the given position // fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Decimal; - // /// Returns the timestamp value at the given position - // fn get_timestamp_ntz(&self, pos: usize, precision: usize) -> TimestampNtz; + /// Returns the date value at the given position (date as days since epoch) + fn get_date(&self, pos: usize) -> i32; - // /// Returns the timestamp value at the given position - // fn get_timestamp_ltz(&self, pos: usize, precision: usize) -> TimestampLtz; + /// Returns the timestamp value at the given position (timestamp without timezone) + fn get_timestamp_ntz(&self, pos: usize) -> i64; + + /// Returns the timestamp value at the given position (timestamp with local timezone) + fn get_timestamp_ltz(&self, pos: usize) -> i64; /// Returns the binary value at the given position with fixed length fn get_binary(&self, pos: usize, length: usize) -> &[u8]; @@ -114,6 +117,30 @@ impl<'a> InternalRow for GenericRow<'a> { self.values.get(_pos).unwrap().try_into().unwrap() } + fn get_date(&self, pos: usize) -> i32 { + match self.values.get(pos).unwrap() { + Datum::Date(d) => d.get_inner(), + Datum::Int32(i) => *i, + other => panic!("Expected Date or Int32 at pos {pos:?}, got {other:?}"), + } + } + + fn get_timestamp_ntz(&self, pos: usize) -> i64 { + match self.values.get(pos).unwrap() { + Datum::Timestamp(t) => t.get_inner(), + Datum::Int64(i) => *i, + other => panic!("Expected Timestamp or Int64 at pos {pos:?}, got {other:?}"), + } + } + + fn get_timestamp_ltz(&self, pos: usize) -> i64 { + match self.values.get(pos).unwrap() { + Datum::TimestampTz(t) => t.get_inner(), + Datum::Int64(i) => *i, + other => panic!("Expected TimestampTz or Int64 at pos {pos:?}, got {other:?}"), + } + } + fn get_float(&self, pos: usize) -> f32 { self.values.get(pos).unwrap().try_into().unwrap() }