Skip to content

Commit b168afb

Browse files
committed
feat: implement async execution for execution plans in PySessionContext
1 parent 60ff7f2 commit b168afb

File tree

1 file changed

+18
-19
lines changed

1 file changed

+18
-19
lines changed

src/context.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,24 @@ impl PySessionContext {
832832
Ok(PyDataFrame::new(x))
833833
}
834834

835+
pub fn execute(
836+
&self,
837+
plan: PyExecutionPlan,
838+
part: usize,
839+
py: Python,
840+
) -> PyDataFusionResult<PyRecordBatchStream> {
841+
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
842+
// create a Tokio runtime to run the async code
843+
let rt = &get_tokio_runtime().0;
844+
let plan = plan.plan.clone();
845+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
846+
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
847+
let join_result = wait_for_future(py, fut)
848+
.map_err(|e| PyDataFusionError::Common(format!("Task failed: {}", e)))?;
849+
let stream = join_result.map_err(PyDataFusionError::from)?;
850+
Ok(PyRecordBatchStream::new(stream))
851+
}
852+
835853
pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {
836854
Ok(self.ctx.table_exist(name)?)
837855
}
@@ -1009,25 +1027,6 @@ impl PySessionContext {
10091027
config_entries.join("\n\t")
10101028
))
10111029
}
1012-
1013-
/// Execute a partition of an execution plan and return a stream of record batches
1014-
pub fn execute(
1015-
&self,
1016-
plan: PyExecutionPlan,
1017-
part: usize,
1018-
py: Python,
1019-
) -> PyDataFusionResult<PyRecordBatchStream> {
1020-
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1021-
// create a Tokio runtime to run the async code
1022-
let rt = &get_tokio_runtime().0;
1023-
let plan = plan.plan.clone();
1024-
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
1025-
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1026-
let stream = wait_for_future(py, fut)
1027-
.map_err(py_datafusion_err)?
1028-
.map_err(PyDataFusionError::from)?;
1029-
Ok(PyRecordBatchStream::new(stream))
1030-
}
10311030
}
10321031

10331032
impl PySessionContext {

0 commit comments

Comments
 (0)