Skip to content

Commit 5594436

Browse files
authored
chore: remove format! in heavy IO loop (#19077)
* save * refactor: reduce overhead when log level is not DEBUG * fmt * refactor: ThreadTrackerFilter with report_issues mode * refactor: ThreadTrackerFilter with report_issues mode * refactor: ThreadTrackerFilter with report_issues mode * refactor: ThreadTrackerFilter with report_issues mode * Revert "refactor: ThreadTrackerFilter with report_issues mode" This reverts commit 4358828. * Revert "refactor: ThreadTrackerFilter with report_issues mode" This reverts commit c3df019. * Revert "refactor: ThreadTrackerFilter with report_issues mode" This reverts commit 13a2296. * Revert "refactor: ThreadTrackerFilter with report_issues mode" This reverts commit 93ea2ab. * Revert "fmt" This reverts commit 5755f28. * Revert "refactor: reduce overhead when log level is not DEBUG" This reverts commit e3bc664. * Revert "save" This reverts commit ef5100a. * fix: pre-assemble spawn task name, to avoid calling format! in heavy read loop
1 parent d7bf3cf commit 5594436

File tree

1 file changed

+59
-24
lines changed

1 file changed

+59
-24
lines changed

src/common/storage/src/runtime_layer.rs

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::fmt::Formatter;
1717
use std::sync::Arc;
1818

1919
use databend_common_base::runtime::Runtime;
20+
use databend_common_base::runtime::ThreadTracker;
2021
use databend_common_base::runtime::TrySpawn;
2122
use opendal::raw::oio;
2223
use opendal::raw::Access;
@@ -183,13 +184,23 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
183184
pub struct RuntimeIO<R: 'static> {
184185
inner: Option<R>,
185186
runtime: Arc<Runtime>,
187+
spawn_task_name: String,
186188
}
187189

188190
impl<R> RuntimeIO<R> {
189191
fn new(inner: R, runtime: Arc<Runtime>) -> Self {
192+
// pre-assemble spawn task name, to avoid calling format! in heavy read loop
193+
let query_id = ThreadTracker::query_id();
194+
let spawn_task_name = if let Some(id) = query_id {
195+
format!("Running query {} IO task", id)
196+
} else {
197+
String::from("Running IO task")
198+
};
199+
190200
Self {
191201
inner: Some(inner),
192202
runtime,
203+
spawn_task_name,
193204
}
194205
}
195206
}
@@ -200,10 +211,14 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
200211
let runtime = self.runtime.clone();
201212

202213
let (r, res) = runtime
203-
.spawn(async move {
204-
let res = r.read().await;
205-
(r, res)
206-
})
214+
.try_spawn(
215+
async move {
216+
let res = r.read().await;
217+
(r, res)
218+
},
219+
Some(self.spawn_task_name.clone()),
220+
)
221+
.expect("spawn must success")
207222
.await
208223
.expect("join must success");
209224
self.inner = Some(r);
@@ -217,10 +232,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
217232
let runtime = self.runtime.clone();
218233

219234
let (r, res) = runtime
220-
.spawn(async move {
221-
let res = r.write(bs).await;
222-
(r, res)
223-
})
235+
.try_spawn(
236+
async move {
237+
let res = r.write(bs).await;
238+
(r, res)
239+
},
240+
Some(self.spawn_task_name.clone()),
241+
)
242+
.expect("spawn must success")
224243
.await
225244
.expect("join must success");
226245
self.inner = Some(r);
@@ -232,10 +251,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
232251
let runtime = self.runtime.clone();
233252

234253
let (r, res) = runtime
235-
.spawn(async move {
236-
let res = r.close().await;
237-
(r, res)
238-
})
254+
.try_spawn(
255+
async move {
256+
let res = r.close().await;
257+
(r, res)
258+
},
259+
Some(self.spawn_task_name.clone()),
260+
)
261+
.expect("spawn must success")
239262
.await
240263
.expect("join must success");
241264
self.inner = Some(r);
@@ -247,10 +270,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
247270
let runtime = self.runtime.clone();
248271

249272
let (r, res) = runtime
250-
.spawn(async move {
251-
let res = r.abort().await;
252-
(r, res)
253-
})
273+
.try_spawn(
274+
async move {
275+
let res = r.abort().await;
276+
(r, res)
277+
},
278+
Some(self.spawn_task_name.clone()),
279+
)
280+
.expect("spawn must success")
254281
.await
255282
.expect("join must success");
256283
self.inner = Some(r);
@@ -264,10 +291,14 @@ impl<R: oio::List> oio::List for RuntimeIO<R> {
264291
let runtime = self.runtime.clone();
265292

266293
let (r, res) = runtime
267-
.spawn(async move {
268-
let res = r.next().await;
269-
(r, res)
270-
})
294+
.try_spawn(
295+
async move {
296+
let res = r.next().await;
297+
(r, res)
298+
},
299+
Some(self.spawn_task_name.clone()),
300+
)
301+
.expect("spawn must success")
271302
.await
272303
.expect("join must success");
273304
self.inner = Some(r);
@@ -285,10 +316,14 @@ impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
285316
let runtime = self.runtime.clone();
286317

287318
let (r, res) = runtime
288-
.spawn(async move {
289-
let res = r.flush().await;
290-
(r, res)
291-
})
319+
.try_spawn(
320+
async move {
321+
let res = r.flush().await;
322+
(r, res)
323+
},
324+
Some(self.spawn_task_name.clone()),
325+
)
326+
.expect("spawn must success")
292327
.await
293328
.expect("join must success");
294329
self.inner = Some(r);

0 commit comments

Comments
 (0)