Skip to content

Commit cca1cf8

Browse files
committed
fix: improve error message for execution failures in PySessionContext
1 parent b168afb commit cca1cf8

File tree

3 files changed

+10
-10
lines changed

3 files changed

+10
-10
lines changed

src/context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,9 @@ impl PySessionContext {
846846
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
847847
let join_result = wait_for_future(py, fut)
848848
.map_err(|e| PyDataFusionError::Common(format!("Task failed: {}", e)))?;
849-
let stream = join_result.map_err(PyDataFusionError::from)?;
850-
Ok(PyRecordBatchStream::new(stream))
849+
let stream = join_result
850+
.map_err(|e| PyDataFusionError::Common(format!("Execution error: {}", e)))?;
851+
Ok(PyRecordBatchStream::new(stream?))
851852
}
852853

853854
pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {

src/dataframe.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,9 @@ impl PyDataFrame {
756756
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
757757
rt.spawn(async move { df.execute_stream().await });
758758
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
759-
Ok(PyRecordBatchStream::new(stream?))
759+
Ok(PyRecordBatchStream::new(
760+
stream.map_err(PyDataFusionError::from)?,
761+
))
760762
}
761763

762764
fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
@@ -817,7 +819,7 @@ impl PyDataFrame {
817819

818820
// Executes this DataFrame to get the total number of rows.
819821
fn count(&self, py: Python) -> PyDataFusionResult<usize> {
820-
Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
822+
Ok(wait_for_future(py, self.df.as_ref().clone().count())??)
821823
}
822824

823825
/// Fill null values with a specified value for specific columns
@@ -844,11 +846,8 @@ impl PyDataFrame {
844846
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
845847
// Get string representation of record batches
846848
let batches = wait_for_future(py, df.collect())?;
847-
let batches_as_string = pretty::pretty_format_batches(&batches);
848-
let result = match batches_as_string {
849-
Ok(batch) => format!("DataFrame()\n{batch}"),
850-
Err(err) => format!("Error: {:?}", err.to_string()),
851-
};
849+
let batches_as_string = pretty::pretty_format_batches(&batches)?;
850+
let result = format!("DataFrame()\n{batch}", batch = batches_as_string);
852851

853852
// Import the Python 'builtins' module to access the print function
854853
// Note that println! does not print to the Python debug console and is not visible in notebooks for instance

src/record_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl PyRecordBatchStream {
6363
impl PyRecordBatchStream {
6464
fn next(&mut self, py: Python) -> PyResult<PyRecordBatch> {
6565
let stream = self.stream.clone();
66-
wait_for_future(py, next_stream(stream, true))
66+
wait_for_future(py, next_stream(stream, true))?
6767
}
6868

6969
fn __next__(&mut self, py: Python) -> PyResult<PyRecordBatch> {

0 commit comments

Comments
 (0)