diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs new file mode 100644 index 00000000..1d32ebd7 --- /dev/null +++ b/crates/fluss/src/client/table/lookup.rs @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::bucketing::BucketingFunction; +use crate::client::connection::FlussConnection; +use crate::client::metadata::Metadata; +use crate::error::{Error, Result}; +use crate::metadata::{RowType, TableBucket, TableInfo}; +use crate::row::InternalRow; +use crate::row::compacted::CompactedRow; +use crate::row::encode::KeyEncoder; +use crate::rpc::ApiError; +use crate::rpc::message::LookupRequest; +use std::sync::Arc; + +/// The result of a lookup operation. +/// +/// Contains the rows returned from a lookup. For primary key lookups, +/// this will contain at most one row. For prefix key lookups (future), +/// this may contain multiple rows. +pub struct LookupResult<'a> { + rows: Vec>, + row_type: &'a RowType, +} + +impl<'a> LookupResult<'a> { + /// Creates a new LookupResult from a list of row bytes. + fn new(rows: Vec>, row_type: &'a RowType) -> Self { + Self { rows, row_type } + } + + /// Creates an empty LookupResult. + fn empty(row_type: &'a RowType) -> Self { + Self { + rows: Vec::new(), + row_type, + } + } + + /// Returns the only row in the result set as a [`CompactedRow`]. + /// + /// This method provides a zero-copy view of the row data, which means the returned + /// `CompactedRow` borrows from this result set and cannot outlive it. + /// + /// # Returns + /// - `Ok(Some(row))`: If exactly one row exists. + /// - `Ok(None)`: If the result set is empty. + /// - `Err(Error::UnexpectedError)`: If the result set contains more than one row. + /// + pub fn get_single_row(&self) -> Result>> { + match self.rows.len() { + 0 => Ok(None), + 1 => Ok(Some(CompactedRow::from_bytes(self.row_type, &self.rows[0]))), + _ => Err(Error::UnexpectedError { + message: "LookupResult contains multiple rows, use get_rows() instead".to_string(), + source: None, + }), + } + } + + /// Returns all rows as CompactedRows. + pub fn get_rows(&self) -> Vec> { + self.rows + .iter() + .map(|bytes| CompactedRow::from_bytes(self.row_type, bytes)) + .collect() + } +} + +/// Configuration and factory struct for creating lookup operations. +/// +/// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`, +/// providing a builder-style API for configuring lookup operations before +/// creating the actual `Lookuper`. +/// +/// # Example +/// ```ignore +/// let table = conn.get_table(&table_path).await?; +/// let lookuper = table.new_lookup()?.create_lookuper()?; +/// let result = lookuper.lookup(&row).await?; +/// if let Some(value) = result.get_single_row() { +/// println!("Found: {:?}", value); +/// } +/// ``` +// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper) +// TODO: Add create_typed_lookuper() for typed lookups with POJO mapping +pub struct TableLookup<'a> { + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, +} + +impl<'a> TableLookup<'a> { + pub(super) fn new( + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, + ) -> Self { + Self { + conn, + table_info, + metadata, + } + } + + /// Creates a `Lookuper` for performing key-based lookups. + /// + /// The lookuper will automatically encode the key and compute the bucket + /// for each lookup using the appropriate bucketing function. + pub fn create_lookuper(self) -> Result> { + let num_buckets = self.table_info.get_num_buckets(); + + // Get data lake format from table config for bucketing function + let data_lake_format = self.table_info.get_table_config().get_datalake_format()?; + let bucketing_function = ::of(data_lake_format.as_ref()); + + // Create key encoder for the primary key fields + let pk_fields = self.table_info.get_physical_primary_keys().to_vec(); + let key_encoder = + ::of(self.table_info.row_type(), pk_fields, data_lake_format)?; + + Ok(Lookuper { + conn: self.conn, + table_info: self.table_info, + metadata: self.metadata, + bucketing_function, + key_encoder, + num_buckets, + }) + } +} + +/// Performs key-based lookups against a primary key table. +/// +/// The `Lookuper` automatically encodes the lookup key, computes the target +/// bucket, finds the appropriate tablet server, and retrieves the value. +/// +/// # Example +/// ```ignore +/// let lookuper = table.new_lookup()?.create_lookuper()?; +/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key +/// let result = lookuper.lookup(&row).await?; +/// ``` +// TODO: Support partitioned tables (extract partition from key) +pub struct Lookuper<'a> { + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, + bucketing_function: Box, + key_encoder: Box, + num_buckets: i32, +} + +impl<'a> Lookuper<'a> { + /// Looks up a value by its primary key. + /// + /// The key is encoded and the bucket is automatically computed using + /// the table's bucketing function. + /// + /// # Arguments + /// * `row` - The row containing the primary key field values + /// + /// # Returns + /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found) + /// * `Err(Error)` - If the lookup fails + pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result> { + // todo: support batch lookup + // Encode the key from the row + let encoded_key = self.key_encoder.encode_key(row)?; + let key_bytes = encoded_key.to_vec(); + + // Compute bucket from encoded key + let bucket_id = self + .bucketing_function + .bucketing(&key_bytes, self.num_buckets)?; + + let table_id = self.table_info.get_table_id(); + let table_bucket = TableBucket::new(table_id, bucket_id); + + // Find the leader for this bucket + let cluster = self.metadata.get_cluster(); + let leader = + cluster + .leader_for(&table_bucket) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!("No leader found for table bucket: {table_bucket}"), + })?; + + // Get connection to the tablet server + let tablet_server = + cluster + .get_tablet_server(leader.id()) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!( + "Tablet server {} is not found in metadata cache", + leader.id() + ), + })?; + + let connections = self.conn.get_connections(); + let connection = connections.get_connection(tablet_server).await?; + + // Send lookup request + let request = LookupRequest::new(table_id, None, bucket_id, vec![key_bytes]); + let response = connection.request(request).await?; + + // Extract the values from response + if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { + // Check for errors + if let Some(error_code) = bucket_resp.error_code { + if error_code != 0 { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: error_code, + message: bucket_resp.error_message.unwrap_or_default(), + }, + }); + } + } + + // Collect all values + let rows: Vec> = bucket_resp + .values + .into_iter() + .filter_map(|pb_value| pb_value.values) + .collect(); + + return Ok(LookupResult::new(rows, self.table_info.row_type())); + } + + Ok(LookupResult::empty(self.table_info.row_type())) + } + + /// Returns a reference to the table info. + pub fn table_info(&self) -> &TableInfo { + &self.table_info + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 26341d70..7356be23 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -17,14 +17,14 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; +use crate::error::{Error, Result}; use crate::metadata::{TableInfo, TablePath}; use std::sync::Arc; -use crate::error::Result; - pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod lookup; mod log_fetch_buffer; mod remote_log; @@ -32,6 +32,7 @@ mod scanner; mod writer; pub use append::{AppendWriter, TableAppend}; +pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; #[allow(dead_code)] @@ -85,6 +86,39 @@ impl<'a> FlussTable<'a> { pub fn has_primary_key(&self) -> bool { self.has_primary_key } + + /// Creates a new `TableLookup` for configuring lookup operations. + /// + /// This follows the same pattern as `new_scan()` and `new_append()`, + /// returning a configuration object that can be used to create a `Lookuper`. + /// + /// The table must have a primary key (be a primary key table). + /// + /// # Returns + /// * `Ok(TableLookup)` - A lookup configuration object + /// * `Err(Error)` - If the table doesn't have a primary key + /// + /// # Example + /// ```ignore + /// let table = conn.get_table(&table_path).await?; + /// let lookuper = table.new_lookup()?.create_lookuper()?; + /// let key = vec![1, 2, 3]; // encoded primary key bytes + /// if let Some(value) = lookuper.lookup(key).await? { + /// println!("Found value: {:?}", value); + /// } + /// ``` + pub fn new_lookup(&self) -> Result> { + if !self.has_primary_key { + return Err(Error::UnsupportedOperation { + message: "Lookup is only supported for primary key tables".to_string(), + }); + } + Ok(TableLookup::new( + self.conn, + self.table_info.clone(), + self.metadata.clone(), + )) + } } impl<'a> Drop for FlussTable<'a> { diff --git a/crates/fluss/src/client/write/write_format.rs b/crates/fluss/src/client/write/write_format.rs index d65e42de..4a0c0d8a 100644 --- a/crates/fluss/src/client/write/write_format.rs +++ b/crates/fluss/src/client/write/write_format.rs @@ -39,7 +39,7 @@ impl WriteFormat { match self { WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED), other => Err(IllegalArgument { - message: format!("WriteFormat `{}` is not a KvFormat", other), + message: format!("WriteFormat `{other}` is not a KvFormat"), }), } } @@ -48,7 +48,7 @@ impl WriteFormat { match kv_format { KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv), other => Err(IllegalArgument { - message: format!("Unknown KvFormat: `{}`", other), + message: format!("Unknown KvFormat: `{other}`"), }), } } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index b1e8a90b..da85b0c2 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -729,6 +729,7 @@ impl TableConfig { ArrowCompressionInfo::from_conf(&self.properties) } + /// Returns the data lake format if configured, or None if not set. pub fn get_datalake_format(&self) -> Result> { self.properties .get("table.datalake.format") diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index dbbb45da..b4ae8405 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -317,4 +317,32 @@ message GetFileSystemSecurityTokenResponse { required bytes token = 2; optional int64 expiration_time = 3; repeated PbKeyValue addition_info = 4; +} + +// lookup request and response +message LookupRequest { + required int64 table_id = 1; + repeated PbLookupReqForBucket buckets_req = 2; +} + +message LookupResponse { + repeated PbLookupRespForBucket buckets_resp = 1; +} + +message PbLookupReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated bytes key = 3; +} + +message PbLookupRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 error_code = 3; + optional string error_message = 4; + repeated PbValue values = 5; +} + +message PbValue { + optional bytes values = 1; } \ No newline at end of file diff --git a/crates/fluss/src/record/kv/kv_record.rs b/crates/fluss/src/record/kv/kv_record.rs index 8c30713d..ab8c2ac1 100644 --- a/crates/fluss/src/record/kv/kv_record.rs +++ b/crates/fluss/src/record/kv/kv_record.rs @@ -101,7 +101,7 @@ impl KvRecord { let size_i32 = i32::try_from(size_in_bytes).map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, - format!("Record size {} exceeds i32::MAX", size_in_bytes), + format!("Record size {size_in_bytes} exceeds i32::MAX"), ) })?; buf.put_i32_le(size_i32); @@ -141,7 +141,7 @@ impl KvRecord { if size_in_bytes_i32 < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid record length: {}", size_in_bytes_i32), + format!("Invalid record length: {size_in_bytes_i32}"), )); } @@ -150,10 +150,7 @@ impl KvRecord { let total_size = size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidData, - format!( - "Record size overflow: {} + {}", - size_in_bytes, LENGTH_LENGTH - ), + format!("Record size overflow: {size_in_bytes} + {LENGTH_LENGTH}"), ) })?; @@ -162,8 +159,7 @@ impl KvRecord { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, format!( - "Not enough bytes to read record: expected {}, available {}", - total_size, available + "Not enough bytes to read record: expected {total_size}, available {available}" ), )); } diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index 6ead6427..eb3c09ad 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -96,7 +96,7 @@ impl KvRecordBatch { if length_i32 < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid batch length: {}", length_i32), + format!("Invalid batch length: {length_i32}"), )); } @@ -150,10 +150,7 @@ impl KvRecordBatch { if size < RECORD_BATCH_HEADER_SIZE { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!( - "Batch size {} is less than header size {}", - size, RECORD_BATCH_HEADER_SIZE - ), + format!("Batch size {size} is less than header size {RECORD_BATCH_HEADER_SIZE}"), )); } @@ -276,7 +273,7 @@ impl KvRecordBatch { if count < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid record count: {}", count), + format!("Invalid record count: {count}"), )); } Ok(KvRecordIterator { @@ -321,7 +318,7 @@ impl Iterator for KvRecordIterator { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataTypes, KvFormat}; + use crate::metadata::{DataTypes, KvFormat, RowType}; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; use crate::row::binary::BinaryWriter; use crate::row::compacted::CompactedRow; @@ -366,8 +363,8 @@ mod tests { let mut value1_writer = CompactedRowWriter::new(1); value1_writer.write_bytes(&[1, 2, 3, 4, 5]); - let data_types = &[DataTypes::bytes()]; - let row = &CompactedRow::from_bytes(data_types, value1_writer.buffer()); + let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec()); + let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer()); builder.append_row(key1, Some(row)).unwrap(); let key2 = b"key2"; diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 7d1a7972..c36a8612 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -248,7 +248,7 @@ impl KvRecordBatchBuilder { let total_size = i32::try_from(size_without_length).map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, - format!("Batch size {} exceeds i32::MAX", size_without_length), + format!("Batch size {size_without_length} exceeds i32::MAX"), ) })?; @@ -317,14 +317,16 @@ impl Drop for KvRecordBatchBuilder { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataType, DataTypes}; + use crate::metadata::{DataTypes, RowType}; use crate::row::binary::BinaryWriter; use crate::row::compacted::{CompactedRow, CompactedRowWriter}; + use std::sync::LazyLock; + static TEST_ROW_TYPE: LazyLock = + LazyLock::new(|| RowType::with_data_types(vec![DataTypes::bytes()])); // Helper function to create a CompactedRowWriter with a single bytes field for testing fn create_test_row(data: &[u8]) -> CompactedRow<'_> { - const DATA_TYPE: &[DataType] = &[DataTypes::bytes()]; - CompactedRow::from_bytes(DATA_TYPE, data) + CompactedRow::from_bytes(&TEST_ROW_TYPE, data) } #[test] @@ -483,7 +485,6 @@ mod tests { #[test] fn test_builder_with_compacted_row_writer() { - use crate::metadata::{DataType, IntType, StringType}; use crate::record::kv::KvRecordBatch; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; @@ -491,18 +492,13 @@ mod tests { let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); - let types = vec![ - DataType::Int(IntType::new()), - DataType::String(StringType::new()), - ]; - // Create and append first record with CompactedRowWriter let mut row_writer1 = CompactedRowWriter::new(2); row_writer1.write_int(42); row_writer1.write_string("hello"); - let data_types = &[DataTypes::int(), DataTypes::string()]; - let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer()); + let row_type = RowType::with_data_types([DataTypes::int(), DataTypes::string()].to_vec()); + let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer()); let key1 = b"key1"; assert!(builder.has_room_for_row(key1, Some(row1))); @@ -513,7 +509,7 @@ mod tests { row_writer2.write_int(100); row_writer2.write_string("world"); - let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer()); + let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer()); let key2 = b"key2"; builder.append_row(key2, Some(row2)).unwrap(); @@ -539,14 +535,14 @@ mod tests { // Verify first record let record1 = records[0].as_ref().unwrap(); assert_eq!(record1.key().as_ref(), key1); - let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap()); + let row1 = CompactedRow::from_bytes(&row_type, record1.value().unwrap()); assert_eq!(row1.get_int(0), 42); assert_eq!(row1.get_string(1), "hello"); // Verify second record let record2 = records[1].as_ref().unwrap(); assert_eq!(record2.key().as_ref(), key2); - let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap()); + let row2 = CompactedRow::from_bytes(&row_type, record2.value().unwrap()); assert_eq!(row2.get_int(0), 100); assert_eq!(row2.get_string(1), "world"); @@ -561,8 +557,8 @@ mod tests { let mut row_writer = CompactedRowWriter::new(1); row_writer.write_int(42); - let data_types = &[DataTypes::int()]; - let row = &CompactedRow::from_bytes(data_types, row_writer.buffer()); + let row_type = RowType::with_data_types([DataTypes::int()].to_vec()); + let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); // INDEXED format should reject append_row let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 9ff3b5ff..144f8985 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::metadata::DataType; +use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; use crate::row::{BinaryRow, GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; @@ -38,10 +38,10 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { #[allow(dead_code)] impl<'a> CompactedRow<'a> { - pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self { + pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self { Self::deserialize( - Arc::new(CompactedRowDeserializer::new(data_types)), - data_types.len(), + Arc::new(CompactedRowDeserializer::new(row_type)), + row_type.fields().len(), data, ) } @@ -84,7 +84,10 @@ impl<'a> InternalRow for CompactedRow<'a> { } fn is_null_at(&self, pos: usize) -> bool { - self.deserializer.get_data_types()[pos].is_nullable() && self.reader.is_null_at(pos) + self.deserializer.get_row_type().fields().as_slice()[pos] + .data_type + .is_nullable() + && self.reader.is_null_at(pos) } fn get_boolean(&self, pos: usize) -> bool { @@ -138,7 +141,7 @@ mod tests { use crate::row::binary::BinaryWriter; use crate::metadata::{ - BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, SmallIntType, + BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, IntType, SmallIntType, StringType, TinyIntType, }; use crate::row::compacted::compacted_row_writer::CompactedRowWriter; @@ -146,7 +149,7 @@ mod tests { #[test] fn test_compacted_row() { // Test all primitive types - let types = vec![ + let row_type = RowType::with_data_types(vec![ DataType::Boolean(BooleanType::new()), DataType::TinyInt(TinyIntType::new()), DataType::SmallInt(SmallIntType::new()), @@ -156,9 +159,9 @@ mod tests { DataType::Double(DoubleType::new()), DataType::String(StringType::new()), DataType::Bytes(BytesType::new()), - ]; + ]); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_boolean(true); writer.write_byte(1); @@ -171,7 +174,7 @@ mod tests { writer.write_bytes(&[1, 2, 3, 4, 5]); let bytes = writer.to_bytes(); - let mut row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert_eq!(row.get_field_count(), 9); assert!(row.get_boolean(0)); @@ -185,20 +188,23 @@ mod tests { assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]); // Test with nulls - let types = vec![ - DataType::Int(IntType::new()), - DataType::String(StringType::new()), - DataType::Double(DoubleType::new()), - ]; + let row_type = RowType::with_data_types( + [ + DataType::Int(IntType::new()), + DataType::String(StringType::new()), + DataType::Double(DoubleType::new()), + ] + .to_vec(), + ); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_int(100); writer.set_null_at(1); writer.write_double(2.71); let bytes = writer.to_bytes(); - row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert!(!row.is_null_at(0)); assert!(row.is_null_at(1)); @@ -211,26 +217,28 @@ mod tests { assert_eq!(row.get_int(0), 100); // Test from_bytes - let types = vec![ + let row_type = RowType::with_data_types(vec![ DataType::Int(IntType::new()), DataType::String(StringType::new()), - ]; + ]); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_int(-1); writer.write_string("test"); let bytes = writer.to_bytes(); - let mut row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert_eq!(row.get_int(0), -1); assert_eq!(row.get_string(1), "test"); // Test large row let num_fields = 100; - let types: Vec = (0..num_fields) - .map(|_| DataType::Int(IntType::new())) - .collect(); + let row_type = RowType::with_data_types( + (0..num_fields) + .map(|_| DataType::Int(IntType::new())) + .collect(), + ); let mut writer = CompactedRowWriter::new(num_fields); @@ -239,7 +247,7 @@ mod tests { } let bytes = writer.to_bytes(); - row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); for i in 0..num_fields { assert_eq!(row.get_int(i), (i * 10) as i32); diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 9ce50952..408706cc 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::metadata::RowType; use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; use crate::{ metadata::DataType, @@ -27,31 +28,32 @@ use std::str::from_utf8; #[allow(dead_code)] #[derive(Clone)] pub struct CompactedRowDeserializer<'a> { - schema: Cow<'a, [DataType]>, + row_type: Cow<'a, RowType>, } #[allow(dead_code)] impl<'a> CompactedRowDeserializer<'a> { - pub fn new(schema: &'a [DataType]) -> Self { + pub fn new(row_type: &'a RowType) -> Self { Self { - schema: Cow::Borrowed(schema), + row_type: Cow::Borrowed(row_type), } } - pub fn new_from_owned(schema: Vec) -> Self { + pub fn new_from_owned(row_type: RowType) -> Self { Self { - schema: Cow::Owned(schema), + row_type: Cow::Owned(row_type), } } - pub fn get_data_types(&self) -> &[DataType] { - self.schema.as_ref() + pub fn get_row_type(&self) -> &RowType { + self.row_type.as_ref() } pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> { let mut row = GenericRow::new(); let mut cursor = reader.initial_position(); - for (col_pos, dtype) in self.schema.iter().enumerate() { + for (col_pos, data_field) in self.row_type.fields().iter().enumerate() { + let dtype = &data_field.data_type; if dtype.is_nullable() && reader.is_null_at(col_pos) { row.set_field(col_pos, Datum::Null); continue; diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs b/crates/fluss/src/row/encode/compacted_row_encoder.rs index fc39bb7a..48b9f3ff 100644 --- a/crates/fluss/src/row/encode/compacted_row_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs @@ -17,7 +17,7 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::DataType; +use crate::metadata::RowType; use crate::row::Datum; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, CompactedRowWriter}; @@ -33,18 +33,18 @@ pub struct CompactedRowEncoder<'a> { } impl<'a> CompactedRowEncoder<'a> { - pub fn new(field_data_types: Vec) -> Result { - let field_writers = field_data_types - .iter() + pub fn new(row_type: RowType) -> Result { + let field_writers = row_type + .field_types() .map(|d| ValueWriter::create_value_writer(d, Some(&BinaryRowFormat::Compacted))) .collect::>>()?; Ok(Self { - arity: field_data_types.len(), - writer: CompactedRowWriter::new(field_data_types.len()), + arity: field_writers.len(), + writer: CompactedRowWriter::new(field_writers.len()), field_writers, compacted_row_deserializer: Arc::new(CompactedRowDeserializer::new_from_owned( - field_data_types, + row_type, )), }) } @@ -60,10 +60,7 @@ impl RowEncoder for CompactedRowEncoder<'_> { self.field_writers .get(pos) .ok_or_else(|| IllegalArgument { - message: format!( - "invalid position {} when attempting to encode value {}", - pos, value - ), + message: format!("invalid position {pos} when attempting to encode value {value}"), })? .write_value(&mut self.writer, pos, &value) } diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 34863aba..c294ecf1 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -19,7 +19,7 @@ mod compacted_key_encoder; mod compacted_row_encoder; use crate::error::Result; -use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType}; +use crate::metadata::{DataLakeFormat, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; use crate::row::{BinaryRow, Datum, InternalRow}; @@ -111,18 +111,18 @@ pub struct RowEncoderFactory {} #[allow(dead_code)] impl RowEncoderFactory { pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result { - Self::create_for_field_types(kv_format, row_type.field_types().cloned().collect()) + Self::create_for_field_types(kv_format, row_type.clone()) } pub fn create_for_field_types( kv_format: KvFormat, - field_data_types: Vec, + row_type: RowType, ) -> Result { match kv_format { KvFormat::INDEXED => { todo!() } - KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types), + KvFormat::COMPACTED => CompactedRowEncoder::new(row_type), } } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 49960635..3477f1de 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -21,11 +21,13 @@ mod datum; pub mod binary; pub mod compacted; -mod encode; +pub mod encode; mod field_getter; pub use column::*; +pub use compacted::CompactedRow; pub use datum::*; +pub use encode::KeyEncoder; pub trait BinaryRow: InternalRow { /// Returns the binary representation of this row as a byte slice. diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index c5153964..9f9268e8 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -31,6 +31,7 @@ pub enum ApiKey { MetaData, ProduceLog, FetchLog, + Lookup, ListOffsets, GetFileSystemSecurityToken, GetDatabaseInfo, @@ -53,6 +54,7 @@ impl From for ApiKey { 1012 => ApiKey::MetaData, 1014 => ApiKey::ProduceLog, 1015 => ApiKey::FetchLog, + 1017 => ApiKey::Lookup, 1021 => ApiKey::ListOffsets, 1025 => ApiKey::GetFileSystemSecurityToken, 1032 => ApiKey::GetLatestLakeSnapshot, @@ -77,6 +79,7 @@ impl From for i16 { ApiKey::MetaData => 1012, ApiKey::ProduceLog => 1014, ApiKey::FetchLog => 1015, + ApiKey::Lookup => 1017, ApiKey::ListOffsets => 1021, ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::GetLatestLakeSnapshot => 1032, @@ -105,6 +108,7 @@ mod tests { (1012, ApiKey::MetaData), (1014, ApiKey::ProduceLog), (1015, ApiKey::FetchLog), + (1017, ApiKey::Lookup), (1021, ApiKey::ListOffsets), (1025, ApiKey::GetFileSystemSecurityToken), (1032, ApiKey::GetLatestLakeSnapshot), diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs new file mode 100644 index 00000000..3de47d64 --- /dev/null +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::LookupResponse; +use crate::rpc::frame::ReadError; + +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use prost::Message; + +use bytes::{Buf, BufMut}; + +pub struct LookupRequest { + pub inner_request: proto::LookupRequest, +} + +impl LookupRequest { + pub fn new( + table_id: i64, + partition_id: Option, + bucket_id: i32, + keys: Vec>, + ) -> Self { + let bucket_req = proto::PbLookupReqForBucket { + partition_id, + bucket_id, + key: keys, + }; + + let request = proto::LookupRequest { + table_id, + buckets_req: vec![bucket_req], + }; + + Self { + inner_request: request, + } + } +} + +impl RequestBody for LookupRequest { + type ResponseBody = LookupResponse; + + const API_KEY: ApiKey = ApiKey::Lookup; + + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(LookupRequest); +impl_read_version_type!(LookupResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index b619ee40..2fe506bc 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -34,6 +34,7 @@ mod header; mod list_databases; mod list_offsets; mod list_tables; +mod lookup; mod produce_log; mod table_exists; mod update_metadata; @@ -53,6 +54,7 @@ pub use header::*; pub use list_databases::*; pub use list_offsets::*; pub use list_tables::*; +pub use lookup::*; pub use produce_log::*; pub use table_exists::*; pub use update_metadata::*; diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs index 96fd1f50..83a75f6c 100644 --- a/crates/fluss/src/util/varint.rs +++ b/crates/fluss/src/util/varint.rs @@ -364,12 +364,11 @@ mod tests { let mut reader = Cursor::new(&buffer); let read_value = read_unsigned_varint(&mut reader).unwrap(); - assert_eq!(value, read_value, "Round trip failed for value {}", value); + assert_eq!(value, read_value, "Round trip failed for value {value}"); assert_eq!( written, buffer.len(), - "Bytes written mismatch for value {}", - value + "Bytes written mismatch for value {value}" ); // Test with BufMut @@ -382,22 +381,19 @@ mod tests { assert_eq!( calculated_size, buffer.len(), - "Size calculation failed for value {}", - value + "Size calculation failed for value {value}" ); // Test reading from bytes let (read_value_bytes, bytes_read) = read_unsigned_varint_bytes(&buffer).unwrap(); assert_eq!( value, read_value_bytes, - "Bytes read failed for value {}", - value + "Bytes read failed for value {value}" ); assert_eq!( bytes_read, buffer.len(), - "Bytes read count mismatch for value {}", - value + "Bytes read count mismatch for value {value}" ); } }