From afd854d682c3872f4406838ef11fbfd0102fabfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 10 Jan 2026 17:43:57 +0800 Subject: [PATCH] fix: correct log record batch checksum bounds --- crates/fluss/src/record/arrow.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index c166ebe..b331ae9 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -86,6 +86,8 @@ pub enum LogMagicValue { V0 = 0, } +// NOTE: Rust layout/offsets currently match Java only for V0. +// TODO: Add V1 layout/offsets to keep parity with Java's V1 format. pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8; /// Value used if writer ID is not available or non-idempotent. @@ -457,8 +459,7 @@ impl LogRecordBatch { fn compute_checksum(&self) -> u32 { let start = SCHEMA_ID_OFFSET; - let end = start + self.data.len(); - crc32c(&self.data[start..end]) + crc32c(&self.data[start..]) } fn attributes(&self) -> u8 { @@ -471,12 +472,12 @@ impl LogRecordBatch { pub fn checksum(&self) -> u32 { let offset = CRC_OFFSET; - LittleEndian::read_u32(&self.data[offset..offset + CRC_OFFSET]) + LittleEndian::read_u32(&self.data[offset..offset + CRC_LENGTH]) } pub fn schema_id(&self) -> i16 { let offset = SCHEMA_ID_OFFSET; - LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_OFFSET]) + LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_LENGTH]) } pub fn base_log_offset(&self) -> i64 { @@ -1240,6 +1241,27 @@ mod tests { assert!(matches!(result, Err(Error::IllegalArgument { .. }))); } + #[test] + fn checksum_and_schema_id_read_minimum_header() { + // Header-only batches with record_count == 0 are valid; this covers the minimal bytes + // needed for checksum/schema_id access. + let mut data = vec![0u8; SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH]; + let crc = 0xA1B2C3D4u32; + let schema_id = 42i16; + LittleEndian::write_u32(&mut data[CRC_OFFSET..CRC_OFFSET + CRC_LENGTH], crc); + LittleEndian::write_i16( + &mut data[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH], + schema_id, + ); + + let batch = LogRecordBatch::new(Bytes::from(data)); + assert_eq!(batch.checksum(), crc); + assert_eq!(batch.schema_id(), schema_id); + + let expected = crc32c(&batch.data[SCHEMA_ID_OFFSET..]); + assert_eq!(batch.compute_checksum(), expected); + } + fn le_bytes(vals: &[u32]) -> Vec { let mut out = Vec::with_capacity(vals.len() * 4); for &v in vals {