diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index e9bac53f..c55c994b 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -211,19 +211,27 @@ impl LogFetchBuffer { pub fn buffered_buckets(&self) -> Vec { let mut buckets = Vec::new(); - let next_in_line_fetch = self.next_in_line_fetch.lock(); - if let Some(complete_fetch) = next_in_line_fetch.as_ref() { - if !complete_fetch.is_consumed() { - buckets.push(complete_fetch.table_bucket().clone()); + // Avoid holding multiple locks at once to prevent lock-order inversion. + { + let next_in_line_fetch = self.next_in_line_fetch.lock(); + if let Some(complete_fetch) = next_in_line_fetch.as_ref() { + if !complete_fetch.is_consumed() { + buckets.push(complete_fetch.table_bucket().clone()); + } } } - let completed = self.completed_fetches.lock(); - for fetch in completed.iter() { - buckets.push(fetch.table_bucket().clone()); + { + let completed = self.completed_fetches.lock(); + for fetch in completed.iter() { + buckets.push(fetch.table_bucket().clone()); + } + } + + { + let pending = self.pending_fetches.lock(); + buckets.extend(pending.keys().cloned()); } - let pending = self.pending_fetches.lock(); - buckets.extend(pending.keys().cloned()); buckets } }