From 0223aa014af58c94c94efcf1219a8e8857f7904d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 15 Jan 2026 16:52:57 +0800 Subject: [PATCH] fix read deadlock --- .../src/client/table/log_fetch_buffer.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index e9bac53f..c55c994b 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -211,19 +211,27 @@ impl LogFetchBuffer { pub fn buffered_buckets(&self) -> Vec { let mut buckets = Vec::new(); - let next_in_line_fetch = self.next_in_line_fetch.lock(); - if let Some(complete_fetch) = next_in_line_fetch.as_ref() { - if !complete_fetch.is_consumed() { - buckets.push(complete_fetch.table_bucket().clone()); + // Avoid holding multiple locks at once to prevent lock-order inversion. + { + let next_in_line_fetch = self.next_in_line_fetch.lock(); + if let Some(complete_fetch) = next_in_line_fetch.as_ref() { + if !complete_fetch.is_consumed() { + buckets.push(complete_fetch.table_bucket().clone()); + } } } - let completed = self.completed_fetches.lock(); - for fetch in completed.iter() { - buckets.push(fetch.table_bucket().clone()); + { + let completed = self.completed_fetches.lock(); + for fetch in completed.iter() { + buckets.push(fetch.table_bucket().clone()); + } + } + + { + let pending = self.pending_fetches.lock(); + buckets.extend(pending.keys().cloned()); } - let pending = self.pending_fetches.lock(); - buckets.extend(pending.keys().cloned()); buckets } }