Skip to content

Conversation

@zhaohaidao
Copy link
Contributor

Purpose

Linked issue: close #141

Brief change log

Tests

API and Format

Documentation

@zhaohaidao
Copy link
Contributor Author

@luoyuxia Hi, PTAL if u have time

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request improves error handling in the read path of the Fluss Rust client, focusing on fetch and scan operations. The changes add robust error handling for API errors, corrupt messages, and transient failures during log fetching and scanning.

Changes:

  • Enhanced error handling in list_offsets.rs to properly convert API error codes to Fluss errors with descriptive messages
  • Implemented comprehensive error handling in the scanner with proper metadata refresh scheduling for transient errors
  • Added error propagation through the LogFetchBuffer with new ErrorCompletedFetch type to handle errors at the fetch level
  • Improved projection validation in arrow.rs with bounds checking and better error messages
  • Added extensive test coverage for new error paths, including tests for wakeup errors, API errors, and metadata refresh scheduling

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
crates/fluss/src/error.rs Added new WakeupError variant for handling wakeup interruptions
crates/fluss/src/rpc/message/list_offsets.rs Improved error handling to properly convert error codes to API errors with descriptive messages
crates/fluss/src/client/table/scanner.rs Added MetadataRefreshScheduler for automatic metadata refresh on transient errors, enhanced error handling throughout fetch operations
crates/fluss/src/client/table/log_fetch_buffer.rs Implemented ErrorCompletedFetch type, added error propagation methods, improved corrupt record handling
crates/fluss/src/record/arrow.rs Added projection validation with bounds checking, improved error messages for projection failures
crates/fluss/src/util/mod.rs Added tests for FairBucketStatusMap
crates/fluss/src/row/column.rs Added tests for ColumnarRow
crates/fluss/src/record/mod.rs Added tests for ChangeType, ScanRecords, and ScanRecord
crates/fluss/src/client/write/sender.rs Added HashMap import

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 835 to 840
);
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the handle_fetch_response function, errors like NotLeaderOrFollower, LogStorageException, KvStorageException, StorageException, FencedLeaderEpochException, and UnknownTableOrBucketException are logged but do not trigger metadata refresh. However, in the initialize_fetch function (lines 1071-1085), these same errors do trigger metadata refresh via schedule_metadata_update. This inconsistency could lead to delayed recovery from transient errors. Consider adding metadata refresh scheduling for these error cases in handle_fetch_response as well.

Suggested change
);
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
);
self.schedule_metadata_update();
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
self.schedule_metadata_update();

Copilot uses AI. Check for mistakes.
Comment on lines 500 to 543
tokio::spawn(async move {
loop {
let (delay, error_for_log) = {
let mut guard = state.lock();
if !guard.pending {
guard.running = false;
return;
}
guard.pending = false;

let now = Instant::now();
let delay = match guard.last_refresh {
Some(last) => {
let earliest = last + min_interval;
if now < earliest {
earliest - now
} else {
Duration::from_millis(0)
}
}
None => Duration::from_millis(0),
};
(delay, guard.last_error.take())
};

if !delay.is_zero() {
sleep(delay).await;
}

if let Err(e) = (refresh)().await {
if let Some(error) = error_for_log {
warn!(
"Failed to update metadata for {table_path} after fetch error {error:?}: {e:?}"
);
} else {
warn!("Failed to update metadata for {table_path}: {e:?}");
}
}

let mut guard = state.lock();
guard.last_refresh = Some(Instant::now());
}
});
}
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataRefreshScheduler spawns a background tokio task (line 500) that is not explicitly tracked or canceled. When the LogFetcher (and its metadata_refresh field) is dropped, this background task will continue running until it naturally exits, potentially holding references to shared state longer than necessary. Consider using JoinHandle to track the spawned task and implement a cancellation mechanism (e.g., via CancellationToken from tokio_util) to ensure clean shutdown when the LogFetcher is dropped.

Copilot uses AI. Check for mistakes.

fn fetch_error(&self) -> Error {
let mut message = format!(
"Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.",
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrase "back to past the record" is grammatically incorrect. It should be "go back past the record" or "skip past the record" to be clear and correct.

Suggested change
"Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.",
"Received exception when fetching the next record from {table_bucket}. If needed, please skip past the record to continue scanning.",

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaohaidao Thanks for the pr. Left some comments. PTAL

corrupt_last_record: bool,
}

impl DefaultCompletedFetch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we don't make it return Result

pub fn new(
        table_bucket: TableBucket,
        log_record_batch: LogRecordsBatches,
        size_in_bytes: usize,
        read_context: ReadContext,
        fetch_offset: i64,
        high_watermark: i64,
    ) -> Self {
        Self {
            table_bucket,
            log_record_batch,
            read_context,
            next_fetch_offset: fetch_offset,
            high_watermark,
            size_in_bytes,
            consumed: false,
            initialized: false,
            records_read: 0,
            current_record_iterator: None,
            current_record_batch: None,
            last_record: None,
            cached_record_error: None,
            corrupt_last_record: false,
        }
    }

scan_records.push(record);
} else {
break;
if self.cached_record_error.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether the following code more clear?

for _ in 0..max_records {
            if self.cached_record_error.is_some() {
                break;
            }
            
            self.corrupt_last_record = true;
            match self.next_fetched_record() {
                Ok(Some(record)) => {
                    self.corrupt_last_record = false;
                    self.next_fetch_offset = record.offset() + 1;
                    self.records_read += 1;
                    scan_records.push(record);
                }
                Ok(None) => {
                    self.corrupt_last_record = false;
                    self.last_record = None;
                    break;
                }
                Err(e) => {
                    self.cached_record_error = Some(e);
                    break;
                }
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From your suggestion, I see there's no logic related to last_record.take. Using last_record is to preserve the behavior of "partial success followed by error" (returning read records first, then reporting the error on the next attempt).
This logic itself is also to maintain consistency with Java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for take it into consideration.

@zhaohaidao
Copy link
Contributor Author

@luoyuxia Thank you so much for your thorough and patient review. I have processed the clear comments, but there are still some unclear points that require your further consideration. PTAL if u have time.

@luoyuxia
Copy link
Contributor

@zhaohaidao Thanks for update. Have went through the code, look more clear to me. Will review it carefully in next few days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve Read Path Fault Tolerance Abilities

2 participants