Skip to content

Commit a2e30ae

Browse files
committed
Modify benches to continue query execution even on failure
1 parent 551ff5d commit a2e30ae

File tree

3 files changed

+83
-40
lines changed

3 files changed

+83
-40
lines changed

benchmarks/src/clickbench.rs

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::path::Path;
1919
use std::path::PathBuf;
2020

21-
use crate::util::{BenchmarkRun, CommonOpt};
21+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
2222
use datafusion::{
2323
error::{DataFusionError, Result},
2424
prelude::SessionContext,
@@ -128,36 +128,70 @@ impl RunOpt {
128128
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
129129
self.register_hits(&ctx).await?;
130130

131-
let iterations = self.common.iterations;
132131
let mut benchmark_run = BenchmarkRun::new();
132+
let mut failed_queries: Vec<usize> =
133+
Vec::with_capacity(query_range.clone().count());
133134
for query_id in query_range {
134-
let mut millis = Vec::with_capacity(iterations);
135135
benchmark_run.start_new_case(&format!("Query {query_id}"));
136-
let sql = queries.get_query(query_id)?;
137-
println!("Q{query_id}: {sql}");
138-
139-
for i in 0..iterations {
140-
let start = Instant::now();
141-
let results = ctx.sql(sql).await?.collect().await?;
142-
let elapsed = start.elapsed();
143-
let ms = elapsed.as_secs_f64() * 1000.0;
144-
millis.push(ms);
145-
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
146-
println!(
147-
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
148-
);
149-
benchmark_run.write_iter(elapsed, row_count);
136+
let query_run = self.benchmark_query(&queries, query_id, &ctx).await;
137+
match query_run {
138+
Ok(query_results) => {
139+
for iter in query_results {
140+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
141+
}
142+
}
143+
Err(e) => {
144+
eprintln!("Query {query_id} failed: {e}");
145+
// TODO mark failure
146+
failed_queries.push(query_id);
147+
}
150148
}
151-
if self.common.debug {
152-
ctx.sql(sql).await?.explain(false, false)?.show().await?;
153-
}
154-
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
155-
println!("Query {query_id} avg time: {avg:.2} ms");
156149
}
157150
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
151+
if !failed_queries.is_empty() {
152+
println!(
153+
"Failed Queries: {}",
154+
failed_queries
155+
.iter()
156+
.map(|q| q.to_string())
157+
.collect::<Vec<_>>()
158+
.join(", ")
159+
);
160+
}
158161
Ok(())
159162
}
160163

164+
async fn benchmark_query(
165+
&self,
166+
queries: &AllQueries,
167+
query_id: usize,
168+
ctx: &SessionContext,
169+
) -> Result<Vec<QueryResult>> {
170+
let sql = queries.get_query(query_id)?;
171+
println!("Q{query_id}: {sql}");
172+
173+
let mut millis = Vec::with_capacity(self.iterations());
174+
let mut query_results = vec![];
175+
for i in 0..self.iterations() {
176+
let start = Instant::now();
177+
let results = ctx.sql(sql).await?.collect().await?;
178+
let elapsed = start.elapsed();
179+
let ms = elapsed.as_secs_f64() * 1000.0;
180+
millis.push(ms);
181+
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
182+
println!(
183+
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
184+
);
185+
query_results.push(QueryResult { elapsed, row_count })
186+
}
187+
if self.common.debug {
188+
ctx.sql(sql).await?.explain(false, false)?.show().await?;
189+
}
190+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
191+
println!("Query {query_id} avg time: {avg:.2} ms");
192+
Ok(query_results)
193+
}
194+
161195
/// Registers the `hits.parquet` as a table named `hits`
162196
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
163197
let options = Default::default();
@@ -171,4 +205,8 @@ impl RunOpt {
171205
)
172206
})
173207
}
208+
209+
fn iterations(&self) -> usize {
210+
self.common.iterations
211+
}
174212
}

benchmarks/src/sort_tpch.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_common::instant::Instant;
4040
use datafusion_common::utils::get_available_parallelism;
4141
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
4242

43-
use crate::util::{BenchmarkRun, CommonOpt};
43+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
4444

4545
#[derive(Debug, StructOpt)]
4646
pub struct RunOpt {
@@ -74,11 +74,6 @@ pub struct RunOpt {
7474
limit: Option<usize>,
7575
}
7676

77-
struct QueryResult {
78-
elapsed: std::time::Duration,
79-
row_count: usize,
80-
}
81-
8277
impl RunOpt {
8378
const SORT_TABLES: [&'static str; 1] = ["lineitem"];
8479

@@ -189,9 +184,16 @@ impl RunOpt {
189184
for query_id in query_range {
190185
benchmark_run.start_new_case(&format!("{query_id}"));
191186

192-
let query_results = self.benchmark_query(query_id).await?;
193-
for iter in query_results {
194-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
187+
let query_results = self.benchmark_query(query_id).await;
188+
match query_results {
189+
Ok(query_results) => {
190+
for iter in query_results {
191+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
192+
}
193+
}
194+
Err(e) => {
195+
eprintln!("Query {query_id} failed: {e}");
196+
}
195197
}
196198
}
197199

benchmarks/src/tpch/run.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use super::{
2222
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
2323
};
24-
use crate::util::{BenchmarkRun, CommonOpt};
24+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
2525

2626
use arrow::record_batch::RecordBatch;
2727
use arrow::util::pretty::{self, pretty_format_batches};
@@ -111,9 +111,17 @@ impl RunOpt {
111111
let mut benchmark_run = BenchmarkRun::new();
112112
for query_id in query_range {
113113
benchmark_run.start_new_case(&format!("Query {query_id}"));
114-
let query_run = self.benchmark_query(query_id).await?;
115-
for iter in query_run {
116-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
114+
let query_run = self.benchmark_query(query_id).await;
115+
match query_run {
116+
Ok(query_results) => {
117+
for iter in query_results {
118+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
119+
}
120+
}
121+
Err(e) => {
122+
// TODO mark
123+
eprintln!("Query {query_id} failed: {e}");
124+
}
117125
}
118126
}
119127
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
@@ -317,11 +325,6 @@ impl RunOpt {
317325
}
318326
}
319327

320-
struct QueryResult {
321-
elapsed: std::time::Duration,
322-
row_count: usize,
323-
}
324-
325328
#[cfg(test)]
326329
// Only run with "ci" mode when we have the data
327330
#[cfg(feature = "ci")]

0 commit comments

Comments
 (0)