Skip to content

Commit 81e4204

Browse files
committed
fix: enhance error handling and improve execution plan retrieval in PyDataFrame
1 parent cca1cf8 commit 81e4204

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

src/dataframe.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ impl PyDataFrame {
514514
/// Get the execution plan for this `DataFrame`
515515
fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> {
516516
let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())?;
517-
Ok(plan.into())
517+
Ok(PyExecutionPlan::new(plan))
518518
}
519519

520520
/// Repartition a `DataFrame` based on a logical partitioning scheme.
@@ -736,7 +736,8 @@ impl PyDataFrame {
736736
batches = batches
737737
.into_iter()
738738
.map(|record_batch| record_batch_into_schema(record_batch, &schema))
739-
.collect::<Result<Vec<RecordBatch>, ArrowError>>()?;
739+
.collect::<Result<Vec<RecordBatch>, ArrowError>>()
740+
.map_err(PyDataFusionError::from)?;
740741
}
741742

742743
let batches_wrapped = batches.into_iter().map(Ok);
@@ -756,9 +757,7 @@ impl PyDataFrame {
756757
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
757758
rt.spawn(async move { df.execute_stream().await });
758759
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
759-
Ok(PyRecordBatchStream::new(
760-
stream.map_err(PyDataFusionError::from)?,
761-
))
760+
Ok(PyRecordBatchStream::new(stream?))
762761
}
763762

764763
fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
@@ -771,9 +770,9 @@ impl PyDataFrame {
771770

772771
match stream {
773772
Ok(batches) => Ok(batches.into_iter().map(PyRecordBatchStream::new).collect()),
774-
_ => Err(PyValueError::new_err(
775-
"Unable to execute stream partitioned",
776-
)),
773+
Err(e) => Err(PyValueError::new_err(format!(
774+
"Unable to execute stream partitioned: {e}"
775+
))),
777776
}
778777
}
779778

@@ -846,7 +845,8 @@ impl PyDataFrame {
846845
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
847846
// Get string representation of record batches
848847
let batches = wait_for_future(py, df.collect())?;
849-
let batches_as_string = pretty::pretty_format_batches(&batches)?;
848+
let batches_as_string =
849+
pretty::pretty_format_batches(&batches).map_err(PyDataFusionError::from)?;
850850
let result = format!("DataFrame()\n{batch}", batch = batches_as_string);
851851

852852
// Import the Python 'builtins' module to access the print function

0 commit comments

Comments
 (0)