Skip to content

Commit bf5949e

Browse files
committed
Improve error handling in spawn_future function for better integration with Python exceptions
1 parent dadeed2 commit bf5949e

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

src/utils.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,24 @@ where
9494
{
9595
let rt = &get_tokio_runtime().0;
9696
let handle: JoinHandle<datafusion::common::Result<T>> = rt.spawn(fut);
97-
Ok(wait_for_future(py, async {
98-
handle.await.map_err(to_datafusion_err)
99-
})??)
97+
// Wait for the join handle while respecting Python signal handling.
98+
// We handle errors in two steps so `?` maps the error types correctly:
99+
// 1) convert any Python-related error from `wait_for_future` into `PyDataFusionError`
100+
// 2) convert any DataFusion error (inner result) into `PyDataFusionError`
101+
let inner_result = wait_for_future(py, async {
102+
// handle.await yields `Result<datafusion::common::Result<T>, JoinError>`
103+
// map JoinError into a DataFusion error so the async block returns
104+
// `datafusion::common::Result<T>` (i.e. Result<T, DataFusionError>)
105+
match handle.await {
106+
Ok(inner) => inner,
107+
Err(join_err) => Err(to_datafusion_err(join_err)),
108+
}
109+
})?; // converts PyErr -> PyDataFusionError
110+
111+
// `inner_result` is `datafusion::common::Result<T>`; use `?` to convert
112+
// the inner DataFusion error into `PyDataFusionError` via `From` and
113+
// return the inner `T` on success.
114+
Ok(inner_result?)
100115
}
101116

102117
/// Spawn a [`SendableRecordBatchStream`] on the Tokio runtime and wait for completion

0 commit comments

Comments
 (0)