From fee1a323cbe1f728491b830232942f38409e0d0b Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Wed, 14 Jan 2026 11:22:08 +0100 Subject: [PATCH 1/5] feat: add lookup support for primary key tables Implements lookup functionality to verify put kv operations (Issue #114). - Add Lookup API key (1017) to api_key.rs - Add lookup proto messages (LookupRequest, LookupResponse, etc.) - Create lookup.rs message module with LookupRequest - Add async lookup() method to FlussTable --- crates/fluss/src/client/table/mod.rs | 89 +++++++++++++++++++++++++- crates/fluss/src/proto/fluss_api.proto | 28 ++++++++ crates/fluss/src/rpc/api_key.rs | 4 ++ crates/fluss/src/rpc/message/lookup.rs | 67 +++++++++++++++++++ crates/fluss/src/rpc/message/mod.rs | 2 + 5 files changed, 187 insertions(+), 3 deletions(-) create mode 100644 crates/fluss/src/rpc/message/lookup.rs diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 26341d70..edebb0be 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -17,11 +17,12 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; -use crate::metadata::{TableInfo, TablePath}; +use crate::error::{Error, Result}; +use crate::metadata::{TableBucket, TableInfo, TablePath}; +use crate::rpc::ApiError; +use crate::rpc::message::LookupRequest; use std::sync::Arc; -use crate::error::Result; - pub const EARLIEST_OFFSET: i64 = -2; mod append; @@ -85,6 +86,88 @@ impl<'a> FlussTable<'a> { pub fn has_primary_key(&self) -> bool { self.has_primary_key } + + /// Lookup values by primary key in a key-value table. + /// + /// This method performs a direct lookup to retrieve the value associated with the given key + /// in the specified bucket. The table must have a primary key (be a primary key table). + /// + /// # Arguments + /// * `bucket_id` - The bucket ID to look up the key in + /// * `key` - The encoded primary key bytes to look up + /// + /// # Returns + /// * `Ok(Some(Vec))` - The value bytes if the key exists + /// * `Ok(None)` - If the key does not exist + /// * `Err(Error)` - If the lookup fails or the table doesn't have a primary key + /// + /// # Example + /// ```ignore + /// let table = conn.get_table(&table_path).await?; + /// let key = /* encoded key bytes */; + /// if let Some(value) = table.lookup(0, key).await? { + /// println!("Found value: {:?}", value); + /// } + /// ``` + pub async fn lookup(&self, bucket_id: i32, key: Vec) -> Result>> { + if !self.has_primary_key { + return Err(Error::UnsupportedOperation { + message: "Lookup is only supported for primary key tables".to_string(), + }); + } + + let table_id = self.table_info.get_table_id(); + let table_bucket = TableBucket::new(table_id, bucket_id); + + // Find the leader for this bucket + let cluster = self.metadata.get_cluster(); + let leader = + cluster + .leader_for(&table_bucket) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!("No leader found for table bucket: {table_bucket}"), + })?; + + // Get connection to the tablet server + let tablet_server = + cluster + .get_tablet_server(leader.id()) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!( + "Tablet server {} is not found in metadata cache", + leader.id() + ), + })?; + + let connections = self.conn.get_connections(); + let connection = connections.get_connection(tablet_server).await?; + + // Send lookup request + let request = LookupRequest::new(table_id, None, bucket_id, vec![key]); + let response = connection.request(request).await?; + + // Extract the value from response + if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { + // Check for errors + if let Some(error_code) = bucket_resp.error_code { + if error_code != 0 { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: error_code, + message: bucket_resp.error_message.unwrap_or_default(), + }, + }); + } + } + + // Get the first value (we only requested one key) + if let Some(pb_value) = bucket_resp.values.into_iter().next() { + return Ok(pb_value.values); + } + } + + Ok(None) + } } impl<'a> Drop for FlussTable<'a> { diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index dbbb45da..b4ae8405 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -317,4 +317,32 @@ message GetFileSystemSecurityTokenResponse { required bytes token = 2; optional int64 expiration_time = 3; repeated PbKeyValue addition_info = 4; +} + +// lookup request and response +message LookupRequest { + required int64 table_id = 1; + repeated PbLookupReqForBucket buckets_req = 2; +} + +message LookupResponse { + repeated PbLookupRespForBucket buckets_resp = 1; +} + +message PbLookupReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated bytes key = 3; +} + +message PbLookupRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 error_code = 3; + optional string error_message = 4; + repeated PbValue values = 5; +} + +message PbValue { + optional bytes values = 1; } \ No newline at end of file diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index c5153964..9f9268e8 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -31,6 +31,7 @@ pub enum ApiKey { MetaData, ProduceLog, FetchLog, + Lookup, ListOffsets, GetFileSystemSecurityToken, GetDatabaseInfo, @@ -53,6 +54,7 @@ impl From for ApiKey { 1012 => ApiKey::MetaData, 1014 => ApiKey::ProduceLog, 1015 => ApiKey::FetchLog, + 1017 => ApiKey::Lookup, 1021 => ApiKey::ListOffsets, 1025 => ApiKey::GetFileSystemSecurityToken, 1032 => ApiKey::GetLatestLakeSnapshot, @@ -77,6 +79,7 @@ impl From for i16 { ApiKey::MetaData => 1012, ApiKey::ProduceLog => 1014, ApiKey::FetchLog => 1015, + ApiKey::Lookup => 1017, ApiKey::ListOffsets => 1021, ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::GetLatestLakeSnapshot => 1032, @@ -105,6 +108,7 @@ mod tests { (1012, ApiKey::MetaData), (1014, ApiKey::ProduceLog), (1015, ApiKey::FetchLog), + (1017, ApiKey::Lookup), (1021, ApiKey::ListOffsets), (1025, ApiKey::GetFileSystemSecurityToken), (1032, ApiKey::GetLatestLakeSnapshot), diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs new file mode 100644 index 00000000..3de47d64 --- /dev/null +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::LookupResponse; +use crate::rpc::frame::ReadError; + +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use prost::Message; + +use bytes::{Buf, BufMut}; + +pub struct LookupRequest { + pub inner_request: proto::LookupRequest, +} + +impl LookupRequest { + pub fn new( + table_id: i64, + partition_id: Option, + bucket_id: i32, + keys: Vec>, + ) -> Self { + let bucket_req = proto::PbLookupReqForBucket { + partition_id, + bucket_id, + key: keys, + }; + + let request = proto::LookupRequest { + table_id, + buckets_req: vec![bucket_req], + }; + + Self { + inner_request: request, + } + } +} + +impl RequestBody for LookupRequest { + type ResponseBody = LookupResponse; + + const API_KEY: ApiKey = ApiKey::Lookup; + + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(LookupRequest); +impl_read_version_type!(LookupResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index b619ee40..2fe506bc 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -34,6 +34,7 @@ mod header; mod list_databases; mod list_offsets; mod list_tables; +mod lookup; mod produce_log; mod table_exists; mod update_metadata; @@ -53,6 +54,7 @@ pub use header::*; pub use list_databases::*; pub use list_offsets::*; pub use list_tables::*; +pub use lookup::*; pub use produce_log::*; pub use table_exists::*; pub use update_metadata::*; From 2a1e6ce14a49f9fbc711d0a374fa4b9b5969e8a8 Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Thu, 15 Jan 2026 17:54:05 +0100 Subject: [PATCH 2/5] modified Lookup implementation to align with Java API --- crates/fluss/src/client/table/lookup.rs | 170 ++++++++++++++++++++++++ crates/fluss/src/client/table/mod.rs | 85 +++--------- 2 files changed, 188 insertions(+), 67 deletions(-) create mode 100644 crates/fluss/src/client/table/lookup.rs diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs new file mode 100644 index 00000000..1b9171a8 --- /dev/null +++ b/crates/fluss/src/client/table/lookup.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::bucketing::BucketingFunction; +use crate::client::connection::FlussConnection; +use crate::client::metadata::Metadata; +use crate::error::{Error, Result}; +use crate::metadata::{TableBucket, TableInfo}; +use crate::rpc::ApiError; +use crate::rpc::message::LookupRequest; +use std::sync::Arc; + +/// Configuration and factory struct for creating lookup operations. +/// +/// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`, +/// providing a builder-style API for configuring lookup operations before +/// creating the actual `Lookuper`. +/// +/// # Example +/// ```ignore +/// let table = conn.get_table(&table_path).await?; +/// let lookuper = table.new_lookup()?.create_lookuper()?; +/// let value = lookuper.lookup(encoded_key).await?; +/// ``` +// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper) +// TODO: Add create_typed_lookuper() for typed lookups with POJO mapping +pub struct TableLookup<'a> { + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, +} + +impl<'a> TableLookup<'a> { + pub(super) fn new( + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, + ) -> Self { + Self { + conn, + table_info, + metadata, + } + } + + /// Creates a `Lookuper` for performing key-based lookups. + /// + /// The lookuper will automatically compute the bucket for each key + /// using the appropriate bucketing function. + pub fn create_lookuper(self) -> Result> { + let num_buckets = self.table_info.get_num_buckets(); + let bucketing_function = ::of(None); + + Ok(Lookuper { + conn: self.conn, + table_info: self.table_info, + metadata: self.metadata, + bucketing_function, + num_buckets, + }) + } +} + +/// Performs key-based lookups against a primary key table. +/// +/// The `Lookuper` automatically computes the target bucket from the key, +/// finds the appropriate tablet server, and retrieves the value. +/// +/// # Example +/// ```ignore +/// let lookuper = table.new_lookup()?.create_lookuper()?; +/// let key = vec![1, 2, 3]; // encoded primary key bytes +/// if let Some(value) = lookuper.lookup(key).await? { +/// println!("Found value: {:?}", value); +/// } +/// ``` +// TODO: Support partitioned tables (extract partition from key) +// TODO: Detect data lake format from table config for bucketing function +pub struct Lookuper<'a> { + conn: &'a FlussConnection, + table_info: TableInfo, + metadata: Arc, + bucketing_function: Box, + num_buckets: i32, +} + +impl<'a> Lookuper<'a> { + /// Looks up a value by its primary key. + /// + /// The bucket is automatically computed from the key using the table's + /// bucketing function. + /// + /// # Arguments + /// * `key` - The encoded primary key bytes + /// + /// # Returns + /// * `Ok(Some(Vec))` - The value bytes if the key exists + /// * `Ok(None)` - If the key does not exist + /// * `Err(Error)` - If the lookup fails + pub async fn lookup(&self, key: Vec) -> Result>> { + // Compute bucket from key + let bucket_id = self.bucketing_function.bucketing(&key, self.num_buckets)?; + + let table_id = self.table_info.get_table_id(); + let table_bucket = TableBucket::new(table_id, bucket_id); + + // Find the leader for this bucket + let cluster = self.metadata.get_cluster(); + let leader = + cluster + .leader_for(&table_bucket) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!("No leader found for table bucket: {table_bucket}"), + })?; + + // Get connection to the tablet server + let tablet_server = + cluster + .get_tablet_server(leader.id()) + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!( + "Tablet server {} is not found in metadata cache", + leader.id() + ), + })?; + + let connections = self.conn.get_connections(); + let connection = connections.get_connection(tablet_server).await?; + + // Send lookup request + let request = LookupRequest::new(table_id, None, bucket_id, vec![key]); + let response = connection.request(request).await?; + + // Extract the value from response + if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { + // Check for errors + if let Some(error_code) = bucket_resp.error_code { + if error_code != 0 { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: error_code, + message: bucket_resp.error_message.unwrap_or_default(), + }, + }); + } + } + + // Get the first value (we only requested one key) + if let Some(pb_value) = bucket_resp.values.into_iter().next() { + return Ok(pb_value.values); + } + } + + Ok(None) + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index edebb0be..f935a520 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -18,14 +18,13 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; -use crate::metadata::{TableBucket, TableInfo, TablePath}; -use crate::rpc::ApiError; -use crate::rpc::message::LookupRequest; +use crate::metadata::{TableInfo, TablePath}; use std::sync::Arc; pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod lookup; mod log_fetch_buffer; mod remote_log; @@ -33,6 +32,7 @@ mod scanner; mod writer; pub use append::{AppendWriter, TableAppend}; +pub use lookup::{Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; #[allow(dead_code)] @@ -87,86 +87,37 @@ impl<'a> FlussTable<'a> { self.has_primary_key } - /// Lookup values by primary key in a key-value table. + /// Creates a new `TableLookup` for configuring lookup operations. /// - /// This method performs a direct lookup to retrieve the value associated with the given key - /// in the specified bucket. The table must have a primary key (be a primary key table). + /// This follows the same pattern as `new_scan()` and `new_append()`, + /// returning a configuration object that can be used to create a `Lookuper`. /// - /// # Arguments - /// * `bucket_id` - The bucket ID to look up the key in - /// * `key` - The encoded primary key bytes to look up + /// The table must have a primary key (be a primary key table). /// /// # Returns - /// * `Ok(Some(Vec))` - The value bytes if the key exists - /// * `Ok(None)` - If the key does not exist - /// * `Err(Error)` - If the lookup fails or the table doesn't have a primary key + /// * `Ok(TableLookup)` - A lookup configuration object + /// * `Err(Error)` - If the table doesn't have a primary key /// /// # Example /// ```ignore /// let table = conn.get_table(&table_path).await?; - /// let key = /* encoded key bytes */; - /// if let Some(value) = table.lookup(0, key).await? { + /// let lookuper = table.new_lookup()?.create_lookuper()?; + /// let key = vec![1, 2, 3]; // encoded primary key bytes + /// if let Some(value) = lookuper.lookup(key).await? { /// println!("Found value: {:?}", value); /// } /// ``` - pub async fn lookup(&self, bucket_id: i32, key: Vec) -> Result>> { + pub fn new_lookup(&self) -> Result> { if !self.has_primary_key { return Err(Error::UnsupportedOperation { message: "Lookup is only supported for primary key tables".to_string(), }); } - - let table_id = self.table_info.get_table_id(); - let table_bucket = TableBucket::new(table_id, bucket_id); - - // Find the leader for this bucket - let cluster = self.metadata.get_cluster(); - let leader = - cluster - .leader_for(&table_bucket) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!("No leader found for table bucket: {table_bucket}"), - })?; - - // Get connection to the tablet server - let tablet_server = - cluster - .get_tablet_server(leader.id()) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!( - "Tablet server {} is not found in metadata cache", - leader.id() - ), - })?; - - let connections = self.conn.get_connections(); - let connection = connections.get_connection(tablet_server).await?; - - // Send lookup request - let request = LookupRequest::new(table_id, None, bucket_id, vec![key]); - let response = connection.request(request).await?; - - // Extract the value from response - if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { - // Check for errors - if let Some(error_code) = bucket_resp.error_code { - if error_code != 0 { - return Err(Error::FlussAPIError { - api_error: ApiError { - code: error_code, - message: bucket_resp.error_message.unwrap_or_default(), - }, - }); - } - } - - // Get the first value (we only requested one key) - if let Some(pb_value) = bucket_resp.values.into_iter().next() { - return Ok(pb_value.values); - } - } - - Ok(None) + Ok(TableLookup::new( + self.conn, + self.table_info.clone(), + self.metadata.clone(), + )) } } From 9dec8fd01aa2b1438758f57d5d4830fdb310cb1b Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Fri, 16 Jan 2026 09:14:50 +0100 Subject: [PATCH 3/5] enhance Lookuper and more changes from review --- crates/fluss/src/client/table/lookup.rs | 145 +++++++++++++++++++----- crates/fluss/src/client/table/mod.rs | 2 +- crates/fluss/src/metadata/table.rs | 1 + crates/fluss/src/row/mod.rs | 4 +- 4 files changed, 124 insertions(+), 28 deletions(-) diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 1b9171a8..68ca89ec 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -20,10 +20,76 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo}; +use crate::row::InternalRow; +use crate::row::compacted::CompactedRow; +use crate::row::encode::KeyEncoder; use crate::rpc::ApiError; use crate::rpc::message::LookupRequest; use std::sync::Arc; +/// The result of a lookup operation. +/// +/// Contains the rows returned from a lookup. For primary key lookups, +/// this will contain at most one row. For prefix key lookups (future), +/// this may contain multiple rows. +pub struct LookupResult { + rows: Vec>, +} + +impl LookupResult { + /// Creates a new LookupResult from a list of row bytes. + fn new(rows: Vec>) -> Self { + Self { rows } + } + + /// Creates an empty LookupResult. + fn empty() -> Self { + Self { rows: Vec::new() } + } + + /// Returns true if the lookup found no matching rows. + pub fn is_empty(&self) -> bool { + self.rows.is_empty() + } + + /// Returns the number of rows in the result. + pub fn len(&self) -> usize { + self.rows.len() + } + + /// Returns the raw bytes of all rows. + /// Use `get_row()` or `get_rows()` for decoded access. + pub fn raw_rows(&self) -> &[Vec] { + &self.rows + } + + /// Returns the single row as a CompactedRow, or None if empty. + /// + /// # Panics + /// Panics if there are multiple rows. Use `get_rows()` for multi-row results. + pub fn get_row<'a>( + &'a self, + row_type: &'a [crate::metadata::DataType], + ) -> Option> { + match self.rows.len() { + 0 => None, + 1 => Some(CompactedRow::from_bytes(row_type, &self.rows[0])), + _ => panic!("LookupResult contains multiple rows, use get_rows() instead"), + } + } + + /// Returns all rows as CompactedRows. + pub fn get_rows<'a>( + &'a self, + row_type: &'a [crate::metadata::DataType], + ) -> Vec> { + self.rows + .iter() + .map(|bytes| CompactedRow::from_bytes(row_type, bytes)) + .collect() + } +} + /// Configuration and factory struct for creating lookup operations. /// /// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`, @@ -34,7 +100,10 @@ use std::sync::Arc; /// ```ignore /// let table = conn.get_table(&table_path).await?; /// let lookuper = table.new_lookup()?.create_lookuper()?; -/// let value = lookuper.lookup(encoded_key).await?; +/// let result = lookuper.lookup(&row).await?; +/// if let Some(value) = result.get_row(table.table_info().row_type().fields_as_data_types()) { +/// println!("Found: {:?}", value); +/// } /// ``` // TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper) // TODO: Add create_typed_lookuper() for typed lookups with POJO mapping @@ -59,17 +128,29 @@ impl<'a> TableLookup<'a> { /// Creates a `Lookuper` for performing key-based lookups. /// - /// The lookuper will automatically compute the bucket for each key - /// using the appropriate bucketing function. + /// The lookuper will automatically encode the key and compute the bucket + /// for each lookup using the appropriate bucketing function. pub fn create_lookuper(self) -> Result> { let num_buckets = self.table_info.get_num_buckets(); - let bucketing_function = ::of(None); + + // Get data lake format from table config for bucketing function + let data_lake_format = self + .table_info + .get_table_config() + .get_datalake_format()?; + let bucketing_function = ::of(data_lake_format.as_ref()); + + // Create key encoder for the primary key fields + let pk_fields = self.table_info.get_physical_primary_keys().to_vec(); + let key_encoder = + ::of(self.table_info.row_type(), pk_fields, data_lake_format)?; Ok(Lookuper { conn: self.conn, table_info: self.table_info, metadata: self.metadata, bucketing_function, + key_encoder, num_buckets, }) } @@ -77,43 +158,46 @@ impl<'a> TableLookup<'a> { /// Performs key-based lookups against a primary key table. /// -/// The `Lookuper` automatically computes the target bucket from the key, -/// finds the appropriate tablet server, and retrieves the value. +/// The `Lookuper` automatically encodes the lookup key, computes the target +/// bucket, finds the appropriate tablet server, and retrieves the value. /// /// # Example /// ```ignore /// let lookuper = table.new_lookup()?.create_lookuper()?; -/// let key = vec![1, 2, 3]; // encoded primary key bytes -/// if let Some(value) = lookuper.lookup(key).await? { -/// println!("Found value: {:?}", value); -/// } +/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key +/// let result = lookuper.lookup(&row).await?; /// ``` // TODO: Support partitioned tables (extract partition from key) -// TODO: Detect data lake format from table config for bucketing function pub struct Lookuper<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, bucketing_function: Box, + key_encoder: Box, num_buckets: i32, } impl<'a> Lookuper<'a> { /// Looks up a value by its primary key. /// - /// The bucket is automatically computed from the key using the table's - /// bucketing function. + /// The key is encoded and the bucket is automatically computed using + /// the table's bucketing function. /// /// # Arguments - /// * `key` - The encoded primary key bytes + /// * `row` - The row containing the primary key field values /// /// # Returns - /// * `Ok(Some(Vec))` - The value bytes if the key exists - /// * `Ok(None)` - If the key does not exist + /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found) /// * `Err(Error)` - If the lookup fails - pub async fn lookup(&self, key: Vec) -> Result>> { - // Compute bucket from key - let bucket_id = self.bucketing_function.bucketing(&key, self.num_buckets)?; + pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result { + // Encode the key from the row + let encoded_key = self.key_encoder.encode_key(row)?; + let key_bytes = encoded_key.to_vec(); + + // Compute bucket from encoded key + let bucket_id = self + .bucketing_function + .bucketing(&key_bytes, self.num_buckets)?; let table_id = self.table_info.get_table_id(); let table_bucket = TableBucket::new(table_id, bucket_id); @@ -142,10 +226,10 @@ impl<'a> Lookuper<'a> { let connection = connections.get_connection(tablet_server).await?; // Send lookup request - let request = LookupRequest::new(table_id, None, bucket_id, vec![key]); + let request = LookupRequest::new(table_id, None, bucket_id, vec![key_bytes]); let response = connection.request(request).await?; - // Extract the value from response + // Extract the values from response if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { // Check for errors if let Some(error_code) = bucket_resp.error_code { @@ -159,12 +243,21 @@ impl<'a> Lookuper<'a> { } } - // Get the first value (we only requested one key) - if let Some(pb_value) = bucket_resp.values.into_iter().next() { - return Ok(pb_value.values); - } + // Collect all values + let rows: Vec> = bucket_resp + .values + .into_iter() + .filter_map(|pb_value| pb_value.values) + .collect(); + + return Ok(LookupResult::new(rows)); } - Ok(None) + Ok(LookupResult::empty()) + } + + /// Returns a reference to the table info. + pub fn table_info(&self) -> &TableInfo { + &self.table_info } } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index f935a520..7356be23 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -32,7 +32,7 @@ mod scanner; mod writer; pub use append::{AppendWriter, TableAppend}; -pub use lookup::{Lookuper, TableLookup}; +pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; #[allow(dead_code)] diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index b1e8a90b..da85b0c2 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -729,6 +729,7 @@ impl TableConfig { ArrowCompressionInfo::from_conf(&self.properties) } + /// Returns the data lake format if configured, or None if not set. pub fn get_datalake_format(&self) -> Result> { self.properties .get("table.datalake.format") diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 49960635..3477f1de 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -21,11 +21,13 @@ mod datum; pub mod binary; pub mod compacted; -mod encode; +pub mod encode; mod field_getter; pub use column::*; +pub use compacted::CompactedRow; pub use datum::*; +pub use encode::KeyEncoder; pub trait BinaryRow: InternalRow { /// Returns the binary representation of this row as a byte slice. From 9a57376a4f9d9bfc4d15b28c375bd49dbbea9115 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 16 Jan 2026 22:33:43 +0800 Subject: [PATCH 4/5] minor improvement --- crates/fluss/src/client/table/lookup.rs | 78 ++++++++----------- crates/fluss/src/client/write/write_format.rs | 4 +- crates/fluss/src/record/kv/kv_record.rs | 12 +-- crates/fluss/src/record/kv/kv_record_batch.rs | 15 ++-- .../src/record/kv/kv_record_batch_builder.rs | 26 ++++--- .../fluss/src/row/compacted/compacted_row.rs | 37 +++++---- .../src/row/compacted/compacted_row_reader.rs | 18 +++-- .../src/row/encode/compacted_row_encoder.rs | 19 ++--- crates/fluss/src/row/encode/mod.rs | 8 +- crates/fluss/src/util/varint.rs | 14 ++-- 10 files changed, 106 insertions(+), 125 deletions(-) diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 68ca89ec..3a2e0a53 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -19,7 +19,7 @@ use crate::bucketing::BucketingFunction; use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; -use crate::metadata::{TableBucket, TableInfo}; +use crate::metadata::{RowType, TableBucket, TableInfo}; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; use crate::row::encode::KeyEncoder; @@ -32,60 +32,51 @@ use std::sync::Arc; /// Contains the rows returned from a lookup. For primary key lookups, /// this will contain at most one row. For prefix key lookups (future), /// this may contain multiple rows. -pub struct LookupResult { +pub struct LookupResult<'a> { rows: Vec>, + row_type: &'a RowType, } -impl LookupResult { +impl<'a> LookupResult<'a> { /// Creates a new LookupResult from a list of row bytes. - fn new(rows: Vec>) -> Self { - Self { rows } + fn new(rows: Vec>, row_type: &'a RowType) -> Self { + Self { rows, row_type } } /// Creates an empty LookupResult. - fn empty() -> Self { - Self { rows: Vec::new() } - } - - /// Returns true if the lookup found no matching rows. - pub fn is_empty(&self) -> bool { - self.rows.is_empty() - } - - /// Returns the number of rows in the result. - pub fn len(&self) -> usize { - self.rows.len() - } - - /// Returns the raw bytes of all rows. - /// Use `get_row()` or `get_rows()` for decoded access. - pub fn raw_rows(&self) -> &[Vec] { - &self.rows + fn empty(row_type: &'a RowType) -> Self { + Self { + rows: Vec::new(), + row_type, + } } - /// Returns the single row as a CompactedRow, or None if empty. + /// Returns the only row in the result set as a [`CompactedRow`]. + /// + /// This method provides a zero-copy view of the row data, which means the returned + /// `CompactedRow` borrows from this result set and cannot outlive it. + /// + /// # Returns + /// - `Ok(Some(row))`: If exactly one row exists. + /// - `Ok(None)`: If the result set is empty. + /// - `Err(Error::UnexpectedError)`: If the result set contains more than one row. /// - /// # Panics - /// Panics if there are multiple rows. Use `get_rows()` for multi-row results. - pub fn get_row<'a>( - &'a self, - row_type: &'a [crate::metadata::DataType], - ) -> Option> { + pub fn get_single_row(&self) -> Result>> { match self.rows.len() { - 0 => None, - 1 => Some(CompactedRow::from_bytes(row_type, &self.rows[0])), - _ => panic!("LookupResult contains multiple rows, use get_rows() instead"), + 0 => Ok(None), + 1 => Ok(Some(CompactedRow::from_bytes(self.row_type, &self.rows[0]))), + _ => Err(Error::UnexpectedError { + message: "LookupResult contains multiple rows, use get_rows() instead".to_string(), + source: None, + }), } } /// Returns all rows as CompactedRows. - pub fn get_rows<'a>( - &'a self, - row_type: &'a [crate::metadata::DataType], - ) -> Vec> { + pub fn get_rows(&self) -> Vec> { self.rows .iter() - .map(|bytes| CompactedRow::from_bytes(row_type, bytes)) + .map(|bytes| CompactedRow::from_bytes(self.row_type, bytes)) .collect() } } @@ -101,7 +92,7 @@ impl LookupResult { /// let table = conn.get_table(&table_path).await?; /// let lookuper = table.new_lookup()?.create_lookuper()?; /// let result = lookuper.lookup(&row).await?; -/// if let Some(value) = result.get_row(table.table_info().row_type().fields_as_data_types()) { +/// if let Some(value) = result.get_single_row() { /// println!("Found: {:?}", value); /// } /// ``` @@ -134,10 +125,7 @@ impl<'a> TableLookup<'a> { let num_buckets = self.table_info.get_num_buckets(); // Get data lake format from table config for bucketing function - let data_lake_format = self - .table_info - .get_table_config() - .get_datalake_format()?; + let data_lake_format = self.table_info.get_table_config().get_datalake_format()?; let bucketing_function = ::of(data_lake_format.as_ref()); // Create key encoder for the primary key fields @@ -250,10 +238,10 @@ impl<'a> Lookuper<'a> { .filter_map(|pb_value| pb_value.values) .collect(); - return Ok(LookupResult::new(rows)); + return Ok(LookupResult::new(rows, self.table_info.row_type())); } - Ok(LookupResult::empty()) + Ok(LookupResult::empty(self.table_info.row_type())) } /// Returns a reference to the table info. diff --git a/crates/fluss/src/client/write/write_format.rs b/crates/fluss/src/client/write/write_format.rs index d65e42de..4a0c0d8a 100644 --- a/crates/fluss/src/client/write/write_format.rs +++ b/crates/fluss/src/client/write/write_format.rs @@ -39,7 +39,7 @@ impl WriteFormat { match self { WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED), other => Err(IllegalArgument { - message: format!("WriteFormat `{}` is not a KvFormat", other), + message: format!("WriteFormat `{other}` is not a KvFormat"), }), } } @@ -48,7 +48,7 @@ impl WriteFormat { match kv_format { KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv), other => Err(IllegalArgument { - message: format!("Unknown KvFormat: `{}`", other), + message: format!("Unknown KvFormat: `{other}`"), }), } } diff --git a/crates/fluss/src/record/kv/kv_record.rs b/crates/fluss/src/record/kv/kv_record.rs index 8c30713d..ab8c2ac1 100644 --- a/crates/fluss/src/record/kv/kv_record.rs +++ b/crates/fluss/src/record/kv/kv_record.rs @@ -101,7 +101,7 @@ impl KvRecord { let size_i32 = i32::try_from(size_in_bytes).map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, - format!("Record size {} exceeds i32::MAX", size_in_bytes), + format!("Record size {size_in_bytes} exceeds i32::MAX"), ) })?; buf.put_i32_le(size_i32); @@ -141,7 +141,7 @@ impl KvRecord { if size_in_bytes_i32 < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid record length: {}", size_in_bytes_i32), + format!("Invalid record length: {size_in_bytes_i32}"), )); } @@ -150,10 +150,7 @@ impl KvRecord { let total_size = size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidData, - format!( - "Record size overflow: {} + {}", - size_in_bytes, LENGTH_LENGTH - ), + format!("Record size overflow: {size_in_bytes} + {LENGTH_LENGTH}"), ) })?; @@ -162,8 +159,7 @@ impl KvRecord { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, format!( - "Not enough bytes to read record: expected {}, available {}", - total_size, available + "Not enough bytes to read record: expected {total_size}, available {available}" ), )); } diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index 6ead6427..eb3c09ad 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -96,7 +96,7 @@ impl KvRecordBatch { if length_i32 < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid batch length: {}", length_i32), + format!("Invalid batch length: {length_i32}"), )); } @@ -150,10 +150,7 @@ impl KvRecordBatch { if size < RECORD_BATCH_HEADER_SIZE { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!( - "Batch size {} is less than header size {}", - size, RECORD_BATCH_HEADER_SIZE - ), + format!("Batch size {size} is less than header size {RECORD_BATCH_HEADER_SIZE}"), )); } @@ -276,7 +273,7 @@ impl KvRecordBatch { if count < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("Invalid record count: {}", count), + format!("Invalid record count: {count}"), )); } Ok(KvRecordIterator { @@ -321,7 +318,7 @@ impl Iterator for KvRecordIterator { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataTypes, KvFormat}; + use crate::metadata::{DataTypes, KvFormat, RowType}; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; use crate::row::binary::BinaryWriter; use crate::row::compacted::CompactedRow; @@ -366,8 +363,8 @@ mod tests { let mut value1_writer = CompactedRowWriter::new(1); value1_writer.write_bytes(&[1, 2, 3, 4, 5]); - let data_types = &[DataTypes::bytes()]; - let row = &CompactedRow::from_bytes(data_types, value1_writer.buffer()); + let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec()); + let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer()); builder.append_row(key1, Some(row)).unwrap(); let key2 = b"key2"; diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 7d1a7972..2c266c86 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -248,7 +248,7 @@ impl KvRecordBatchBuilder { let total_size = i32::try_from(size_without_length).map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, - format!("Batch size {} exceeds i32::MAX", size_without_length), + format!("Batch size {size_without_length} exceeds i32::MAX"), ) })?; @@ -317,14 +317,16 @@ impl Drop for KvRecordBatchBuilder { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataType, DataTypes}; + use crate::metadata::{DataTypes, RowType}; use crate::row::binary::BinaryWriter; use crate::row::compacted::{CompactedRow, CompactedRowWriter}; + use std::sync::LazyLock; + static TEST_ROW_TYPE: LazyLock = + LazyLock::new(|| RowType::with_data_types(vec![DataTypes::bytes()])); // Helper function to create a CompactedRowWriter with a single bytes field for testing fn create_test_row(data: &[u8]) -> CompactedRow<'_> { - const DATA_TYPE: &[DataType] = &[DataTypes::bytes()]; - CompactedRow::from_bytes(DATA_TYPE, data) + CompactedRow::from_bytes(&TEST_ROW_TYPE, data) } #[test] @@ -491,10 +493,10 @@ mod tests { let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); - let types = vec![ + let row_type = RowType::with_data_types(vec![ DataType::Int(IntType::new()), DataType::String(StringType::new()), - ]; + ]); // Create and append first record with CompactedRowWriter let mut row_writer1 = CompactedRowWriter::new(2); @@ -502,7 +504,7 @@ mod tests { row_writer1.write_string("hello"); let data_types = &[DataTypes::int(), DataTypes::string()]; - let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer()); + let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer()); let key1 = b"key1"; assert!(builder.has_room_for_row(key1, Some(row1))); @@ -513,7 +515,7 @@ mod tests { row_writer2.write_int(100); row_writer2.write_string("world"); - let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer()); + let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer()); let key2 = b"key2"; builder.append_row(key2, Some(row2)).unwrap(); @@ -539,14 +541,14 @@ mod tests { // Verify first record let record1 = records[0].as_ref().unwrap(); assert_eq!(record1.key().as_ref(), key1); - let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap()); + let row1 = CompactedRow::from_bytes(&row_type, record1.value().unwrap()); assert_eq!(row1.get_int(0), 42); assert_eq!(row1.get_string(1), "hello"); // Verify second record let record2 = records[1].as_ref().unwrap(); assert_eq!(record2.key().as_ref(), key2); - let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap()); + let row2 = CompactedRow::from_bytes(&row_type, record2.value().unwrap()); assert_eq!(row2.get_int(0), 100); assert_eq!(row2.get_string(1), "world"); @@ -561,8 +563,8 @@ mod tests { let mut row_writer = CompactedRowWriter::new(1); row_writer.write_int(42); - let data_types = &[DataTypes::int()]; - let row = &CompactedRow::from_bytes(data_types, row_writer.buffer()); + let row_type = RowType::with_data_types([DataTypes::int()].to_vec()); + let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); // INDEXED format should reject append_row let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 9ff3b5ff..d450a1b7 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::metadata::DataType; +use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; use crate::row::{BinaryRow, GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; @@ -38,10 +38,10 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { #[allow(dead_code)] impl<'a> CompactedRow<'a> { - pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self { + pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self { Self::deserialize( - Arc::new(CompactedRowDeserializer::new(data_types)), - data_types.len(), + Arc::new(CompactedRowDeserializer::new(row_type)), + row_type.fields().len(), data, ) } @@ -84,7 +84,10 @@ impl<'a> InternalRow for CompactedRow<'a> { } fn is_null_at(&self, pos: usize) -> bool { - self.deserializer.get_data_types()[pos].is_nullable() && self.reader.is_null_at(pos) + self.deserializer.get_row_type().fields().as_slice()[pos] + .data_type + .is_nullable() + && self.reader.is_null_at(pos) } fn get_boolean(&self, pos: usize) -> bool { @@ -138,7 +141,7 @@ mod tests { use crate::row::binary::BinaryWriter; use crate::metadata::{ - BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, SmallIntType, + BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, IntType, SmallIntType, StringType, TinyIntType, }; use crate::row::compacted::compacted_row_writer::CompactedRowWriter; @@ -146,7 +149,7 @@ mod tests { #[test] fn test_compacted_row() { // Test all primitive types - let types = vec![ + let row_type = RowType::with_data_types(vec![ DataType::Boolean(BooleanType::new()), DataType::TinyInt(TinyIntType::new()), DataType::SmallInt(SmallIntType::new()), @@ -156,9 +159,9 @@ mod tests { DataType::Double(DoubleType::new()), DataType::String(StringType::new()), DataType::Bytes(BytesType::new()), - ]; + ]); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_boolean(true); writer.write_byte(1); @@ -171,7 +174,7 @@ mod tests { writer.write_bytes(&[1, 2, 3, 4, 5]); let bytes = writer.to_bytes(); - let mut row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert_eq!(row.get_field_count(), 9); assert!(row.get_boolean(0)); @@ -185,7 +188,7 @@ mod tests { assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]); // Test with nulls - let types = vec![ + let types = [ DataType::Int(IntType::new()), DataType::String(StringType::new()), DataType::Double(DoubleType::new()), @@ -198,7 +201,7 @@ mod tests { writer.write_double(2.71); let bytes = writer.to_bytes(); - row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert!(!row.is_null_at(0)); assert!(row.is_null_at(1)); @@ -211,17 +214,17 @@ mod tests { assert_eq!(row.get_int(0), 100); // Test from_bytes - let types = vec![ + let row_type = RowType::with_data_types(vec![ DataType::Int(IntType::new()), DataType::String(StringType::new()), - ]; + ]); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_int(-1); writer.write_string("test"); let bytes = writer.to_bytes(); - let mut row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); assert_eq!(row.get_int(0), -1); assert_eq!(row.get_string(1), "test"); @@ -239,7 +242,7 @@ mod tests { } let bytes = writer.to_bytes(); - row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref()); + row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); for i in 0..num_fields { assert_eq!(row.get_int(i), (i * 10) as i32); diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 9ce50952..408706cc 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::metadata::RowType; use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; use crate::{ metadata::DataType, @@ -27,31 +28,32 @@ use std::str::from_utf8; #[allow(dead_code)] #[derive(Clone)] pub struct CompactedRowDeserializer<'a> { - schema: Cow<'a, [DataType]>, + row_type: Cow<'a, RowType>, } #[allow(dead_code)] impl<'a> CompactedRowDeserializer<'a> { - pub fn new(schema: &'a [DataType]) -> Self { + pub fn new(row_type: &'a RowType) -> Self { Self { - schema: Cow::Borrowed(schema), + row_type: Cow::Borrowed(row_type), } } - pub fn new_from_owned(schema: Vec) -> Self { + pub fn new_from_owned(row_type: RowType) -> Self { Self { - schema: Cow::Owned(schema), + row_type: Cow::Owned(row_type), } } - pub fn get_data_types(&self) -> &[DataType] { - self.schema.as_ref() + pub fn get_row_type(&self) -> &RowType { + self.row_type.as_ref() } pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> { let mut row = GenericRow::new(); let mut cursor = reader.initial_position(); - for (col_pos, dtype) in self.schema.iter().enumerate() { + for (col_pos, data_field) in self.row_type.fields().iter().enumerate() { + let dtype = &data_field.data_type; if dtype.is_nullable() && reader.is_null_at(col_pos) { row.set_field(col_pos, Datum::Null); continue; diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs b/crates/fluss/src/row/encode/compacted_row_encoder.rs index fc39bb7a..48b9f3ff 100644 --- a/crates/fluss/src/row/encode/compacted_row_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs @@ -17,7 +17,7 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::DataType; +use crate::metadata::RowType; use crate::row::Datum; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, CompactedRowWriter}; @@ -33,18 +33,18 @@ pub struct CompactedRowEncoder<'a> { } impl<'a> CompactedRowEncoder<'a> { - pub fn new(field_data_types: Vec) -> Result { - let field_writers = field_data_types - .iter() + pub fn new(row_type: RowType) -> Result { + let field_writers = row_type + .field_types() .map(|d| ValueWriter::create_value_writer(d, Some(&BinaryRowFormat::Compacted))) .collect::>>()?; Ok(Self { - arity: field_data_types.len(), - writer: CompactedRowWriter::new(field_data_types.len()), + arity: field_writers.len(), + writer: CompactedRowWriter::new(field_writers.len()), field_writers, compacted_row_deserializer: Arc::new(CompactedRowDeserializer::new_from_owned( - field_data_types, + row_type, )), }) } @@ -60,10 +60,7 @@ impl RowEncoder for CompactedRowEncoder<'_> { self.field_writers .get(pos) .ok_or_else(|| IllegalArgument { - message: format!( - "invalid position {} when attempting to encode value {}", - pos, value - ), + message: format!("invalid position {pos} when attempting to encode value {value}"), })? .write_value(&mut self.writer, pos, &value) } diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 34863aba..c294ecf1 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -19,7 +19,7 @@ mod compacted_key_encoder; mod compacted_row_encoder; use crate::error::Result; -use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType}; +use crate::metadata::{DataLakeFormat, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; use crate::row::{BinaryRow, Datum, InternalRow}; @@ -111,18 +111,18 @@ pub struct RowEncoderFactory {} #[allow(dead_code)] impl RowEncoderFactory { pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result { - Self::create_for_field_types(kv_format, row_type.field_types().cloned().collect()) + Self::create_for_field_types(kv_format, row_type.clone()) } pub fn create_for_field_types( kv_format: KvFormat, - field_data_types: Vec, + row_type: RowType, ) -> Result { match kv_format { KvFormat::INDEXED => { todo!() } - KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types), + KvFormat::COMPACTED => CompactedRowEncoder::new(row_type), } } } diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs index 96fd1f50..83a75f6c 100644 --- a/crates/fluss/src/util/varint.rs +++ b/crates/fluss/src/util/varint.rs @@ -364,12 +364,11 @@ mod tests { let mut reader = Cursor::new(&buffer); let read_value = read_unsigned_varint(&mut reader).unwrap(); - assert_eq!(value, read_value, "Round trip failed for value {}", value); + assert_eq!(value, read_value, "Round trip failed for value {value}"); assert_eq!( written, buffer.len(), - "Bytes written mismatch for value {}", - value + "Bytes written mismatch for value {value}" ); // Test with BufMut @@ -382,22 +381,19 @@ mod tests { assert_eq!( calculated_size, buffer.len(), - "Size calculation failed for value {}", - value + "Size calculation failed for value {value}" ); // Test reading from bytes let (read_value_bytes, bytes_read) = read_unsigned_varint_bytes(&buffer).unwrap(); assert_eq!( value, read_value_bytes, - "Bytes read failed for value {}", - value + "Bytes read failed for value {value}" ); assert_eq!( bytes_read, buffer.len(), - "Bytes read count mismatch for value {}", - value + "Bytes read count mismatch for value {value}" ); } } From 5645eb9e158c800aaf0599ad74fdb0261560ba9c Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sat, 17 Jan 2026 09:26:09 +0800 Subject: [PATCH 5/5] fix ci --- crates/fluss/src/client/table/lookup.rs | 3 ++- .../src/record/kv/kv_record_batch_builder.rs | 8 +------ .../fluss/src/row/compacted/compacted_row.rs | 23 +++++++++++-------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 3a2e0a53..1d32ebd7 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -177,7 +177,8 @@ impl<'a> Lookuper<'a> { /// # Returns /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found) /// * `Err(Error)` - If the lookup fails - pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result { + pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result> { + // todo: support batch lookup // Encode the key from the row let encoded_key = self.key_encoder.encode_key(row)?; let key_bytes = encoded_key.to_vec(); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 2c266c86..c36a8612 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -485,7 +485,6 @@ mod tests { #[test] fn test_builder_with_compacted_row_writer() { - use crate::metadata::{DataType, IntType, StringType}; use crate::record::kv::KvRecordBatch; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; @@ -493,17 +492,12 @@ mod tests { let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); - let row_type = RowType::with_data_types(vec![ - DataType::Int(IntType::new()), - DataType::String(StringType::new()), - ]); - // Create and append first record with CompactedRowWriter let mut row_writer1 = CompactedRowWriter::new(2); row_writer1.write_int(42); row_writer1.write_string("hello"); - let data_types = &[DataTypes::int(), DataTypes::string()]; + let row_type = RowType::with_data_types([DataTypes::int(), DataTypes::string()].to_vec()); let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer()); let key1 = b"key1"; diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index d450a1b7..144f8985 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -188,13 +188,16 @@ mod tests { assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]); // Test with nulls - let types = [ - DataType::Int(IntType::new()), - DataType::String(StringType::new()), - DataType::Double(DoubleType::new()), - ]; + let row_type = RowType::with_data_types( + [ + DataType::Int(IntType::new()), + DataType::String(StringType::new()), + DataType::Double(DoubleType::new()), + ] + .to_vec(), + ); - let mut writer = CompactedRowWriter::new(types.len()); + let mut writer = CompactedRowWriter::new(row_type.fields().len()); writer.write_int(100); writer.set_null_at(1); @@ -231,9 +234,11 @@ mod tests { // Test large row let num_fields = 100; - let types: Vec = (0..num_fields) - .map(|_| DataType::Int(IntType::new())) - .collect(); + let row_type = RowType::with_data_types( + (0..num_fields) + .map(|_| DataType::Int(IntType::new())) + .collect(), + ); let mut writer = CompactedRowWriter::new(num_fields);