diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index f3e0bfcb8..0824f3ecd 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -216,13 +216,10 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let stream_name_clone = stream_name.clone(); - let (records, _) = - match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { - Ok(Ok((records, fields))) => (records, fields), - Ok(Err(e)) => return Err(Status::internal(e.to_string())), - Err(err) => return Err(Status::internal(err.to_string())), - }; + let (records, _) = query + .execute(&stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 0d7d0b340..4eb986341 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; -use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -131,7 +130,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(Vec, Vec), QueryError> { - match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { - Ok(Ok(result)) => Ok(result), - Ok(Err(e)) => Err(QueryError::Execute(e)), - Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - } -} - pub async fn get_counts( req: HttpRequest, counts_request: Json, diff --git a/src/query/mod.rs b/src/query/mod.rs index 8b799ac00..c24a11209 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -131,12 +131,11 @@ impl Query { SessionContext::new_with_state(state) } - #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, - stream_name: String, + stream_name: &str, ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(&time_partition))