From fe4f798fcccca65cad6df59ead9c37810b155328 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Mar 2025 15:01:20 +0530 Subject: [PATCH 1/2] suggestions --- src/handlers/airplane.rs | 11 ++++------- src/handlers/http/query.rs | 14 +------------- src/query/mod.rs | 5 ++--- 3 files changed, 7 insertions(+), 23 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index f3e0bfcb8..f712a4958 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -216,13 +216,10 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let stream_name_clone = stream_name.clone(); - let (records, _) = - match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { - Ok(Ok((records, fields))) => (records, fields), - Ok(Err(e)) => return Err(Status::internal(e.to_string())), - Err(err) => return Err(Status::internal(err.to_string())), - }; + let (records, _) = match query.execute(&stream_name).await { + Ok((records, fields)) => (records, fields), + Err(err) => return Err(Status::internal(err.to_string())), + }; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 0d7d0b340..4eb986341 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; -use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -131,7 +130,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(Vec, Vec), QueryError> { - match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { - Ok(Ok(result)) => Ok(result), - Ok(Err(e)) => Err(QueryError::Execute(e)), - Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - } -} - pub async fn get_counts( req: HttpRequest, counts_request: Json, diff --git a/src/query/mod.rs b/src/query/mod.rs index 8b799ac00..c24a11209 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -131,12 +131,11 @@ impl Query { SessionContext::new_with_state(state) } - #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, - stream_name: String, + stream_name: &str, ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(&time_partition)) From df571e6dd0867c9a48d6791f41dfd2f3209b963b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Mar 2025 22:31:30 +0530 Subject: [PATCH 2/2] refactor: execute and `map_err` --- src/handlers/airplane.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index f712a4958..0824f3ecd 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -216,10 +216,10 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let (records, _) = match query.execute(&stream_name).await { - Ok((records, fields)) => (records, fields), - Err(err) => return Err(Status::internal(err.to_string())), - }; + let (records, _) = query + .execute(&stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; /* * INFO: No returning the schema with the data.