-
Notifications
You must be signed in to change notification settings - Fork 21
feat: Improve read path error handling logic #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…eat/error-handle-reader
|
@luoyuxia Hi, PTAL if u have time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request improves error handling in the read path of the Fluss Rust client, focusing on fetch and scan operations. The changes add robust error handling for API errors, corrupt messages, and transient failures during log fetching and scanning.
Changes:
- Enhanced error handling in
list_offsets.rsto properly convert API error codes to Fluss errors with descriptive messages - Implemented comprehensive error handling in the scanner with proper metadata refresh scheduling for transient errors
- Added error propagation through the
LogFetchBufferwith newErrorCompletedFetchtype to handle errors at the fetch level - Improved projection validation in
arrow.rswith bounds checking and better error messages - Added extensive test coverage for new error paths, including tests for wakeup errors, API errors, and metadata refresh scheduling
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
crates/fluss/src/error.rs |
Added new WakeupError variant for handling wakeup interruptions |
crates/fluss/src/rpc/message/list_offsets.rs |
Improved error handling to properly convert error codes to API errors with descriptive messages |
crates/fluss/src/client/table/scanner.rs |
Added MetadataRefreshScheduler for automatic metadata refresh on transient errors, enhanced error handling throughout fetch operations |
crates/fluss/src/client/table/log_fetch_buffer.rs |
Implemented ErrorCompletedFetch type, added error propagation methods, improved corrupt record handling |
crates/fluss/src/record/arrow.rs |
Added projection validation with bounds checking, improved error messages for projection failures |
crates/fluss/src/util/mod.rs |
Added tests for FairBucketStatusMap |
crates/fluss/src/row/column.rs |
Added tests for ColumnarRow |
crates/fluss/src/record/mod.rs |
Added tests for ChangeType, ScanRecords, and ScanRecord |
crates/fluss/src/client/write/sender.rs |
Added HashMap import |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ); | ||
| } | ||
| FlussError::UnknownTableOrBucketException => { | ||
| warn!( | ||
| "Received unknown table or bucket error in fetch for bucket {table_bucket}" | ||
| ); |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the handle_fetch_response function, errors like NotLeaderOrFollower, LogStorageException, KvStorageException, StorageException, FencedLeaderEpochException, and UnknownTableOrBucketException are logged but do not trigger metadata refresh. However, in the initialize_fetch function (lines 1071-1085), these same errors do trigger metadata refresh via schedule_metadata_update. This inconsistency could lead to delayed recovery from transient errors. Consider adding metadata refresh scheduling for these error cases in handle_fetch_response as well.
| ); | |
| } | |
| FlussError::UnknownTableOrBucketException => { | |
| warn!( | |
| "Received unknown table or bucket error in fetch for bucket {table_bucket}" | |
| ); | |
| ); | |
| self.schedule_metadata_update(); | |
| } | |
| FlussError::UnknownTableOrBucketException => { | |
| warn!( | |
| "Received unknown table or bucket error in fetch for bucket {table_bucket}" | |
| ); | |
| self.schedule_metadata_update(); |
| tokio::spawn(async move { | ||
| loop { | ||
| let (delay, error_for_log) = { | ||
| let mut guard = state.lock(); | ||
| if !guard.pending { | ||
| guard.running = false; | ||
| return; | ||
| } | ||
| guard.pending = false; | ||
|
|
||
| let now = Instant::now(); | ||
| let delay = match guard.last_refresh { | ||
| Some(last) => { | ||
| let earliest = last + min_interval; | ||
| if now < earliest { | ||
| earliest - now | ||
| } else { | ||
| Duration::from_millis(0) | ||
| } | ||
| } | ||
| None => Duration::from_millis(0), | ||
| }; | ||
| (delay, guard.last_error.take()) | ||
| }; | ||
|
|
||
| if !delay.is_zero() { | ||
| sleep(delay).await; | ||
| } | ||
|
|
||
| if let Err(e) = (refresh)().await { | ||
| if let Some(error) = error_for_log { | ||
| warn!( | ||
| "Failed to update metadata for {table_path} after fetch error {error:?}: {e:?}" | ||
| ); | ||
| } else { | ||
| warn!("Failed to update metadata for {table_path}: {e:?}"); | ||
| } | ||
| } | ||
|
|
||
| let mut guard = state.lock(); | ||
| guard.last_refresh = Some(Instant::now()); | ||
| } | ||
| }); | ||
| } |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MetadataRefreshScheduler spawns a background tokio task (line 500) that is not explicitly tracked or canceled. When the LogFetcher (and its metadata_refresh field) is dropped, this background task will continue running until it naturally exits, potentially holding references to shared state longer than necessary. Consider using JoinHandle to track the spawned task and implement a cancellation mechanism (e.g., via CancellationToken from tokio_util) to ensure clean shutdown when the LogFetcher is dropped.
|
|
||
| fn fetch_error(&self) -> Error { | ||
| let mut message = format!( | ||
| "Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.", |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The phrase "back to past the record" is grammatically incorrect. It should be "go back past the record" or "skip past the record" to be clear and correct.
| "Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.", | |
| "Received exception when fetching the next record from {table_bucket}. If needed, please skip past the record to continue scanning.", |
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaohaidao Thanks for the pr. Left some comments. PTAL
| corrupt_last_record: bool, | ||
| } | ||
|
|
||
| impl DefaultCompletedFetch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we don't make it return Result
pub fn new(
table_bucket: TableBucket,
log_record_batch: LogRecordsBatches,
size_in_bytes: usize,
read_context: ReadContext,
fetch_offset: i64,
high_watermark: i64,
) -> Self {
Self {
table_bucket,
log_record_batch,
read_context,
next_fetch_offset: fetch_offset,
high_watermark,
size_in_bytes,
consumed: false,
initialized: false,
records_read: 0,
current_record_iterator: None,
current_record_batch: None,
last_record: None,
cached_record_error: None,
corrupt_last_record: false,
}
}
| scan_records.push(record); | ||
| } else { | ||
| break; | ||
| if self.cached_record_error.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether the following code more clear?
for _ in 0..max_records {
if self.cached_record_error.is_some() {
break;
}
self.corrupt_last_record = true;
match self.next_fetched_record() {
Ok(Some(record)) => {
self.corrupt_last_record = false;
self.next_fetch_offset = record.offset() + 1;
self.records_read += 1;
scan_records.push(record);
}
Ok(None) => {
self.corrupt_last_record = false;
self.last_record = None;
break;
}
Err(e) => {
self.cached_record_error = Some(e);
break;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From your suggestion, I see there's no logic related to last_record.take. Using last_record is to preserve the behavior of "partial success followed by error" (returning read records first, then reporting the error on the next attempt).
This logic itself is also to maintain consistency with Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for take it into consideration.
|
@luoyuxia Thank you so much for your thorough and patient review. I have processed the clear comments, but there are still some unclear points that require your further consideration. PTAL if u have time. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…eat/error-handle-reader
|
@zhaohaidao Thanks for update. Have went through the code, look more clear to me. Will review it carefully in next few days. |
Purpose
Linked issue: close #141
Brief change log
Tests
API and Format
Documentation