Skip to content

Commit 23d175d

Browse files
committed
Optimize RecordBatch conversion by releasing GIL and processing in parallel
1 parent 7087685 commit 23d175d

File tree

2 files changed

+64
-18
lines changed

2 files changed

+64
-18
lines changed

docs/source/user-guide/dataframe/collect-gil.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ effectively serialised.
88
For queries that return many batches this limited CPU utilisation because only
99
one conversion could run at a time.
1010

11-
The implementation has been updated to release the GIL and convert batches in
12-
parallel using Rayon. This allows the CPU intensive portions of the conversion
13-
to run concurrently.
11+
The implementation now converts each batch to Arrow's C data (schema/array)
12+
while the GIL is released, acquiring the GIL only to wrap those pointers into
13+
PyArrow objects. This allows the CPU intensive portions of the conversion to
14+
run fully in parallel.
1415

1516
A simple benchmark is provided in `benchmarks/collect_gil_bench.py`.
1617
Run it twice to compare serial and parallel conversions:

src/dataframe.rs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
use std::collections::HashMap;
1919
use std::ffi::CString;
20+
use std::ptr::addr_of;
2021
use std::sync::Arc;
2122

22-
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
23+
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader, StructArray};
2324
use arrow::compute::can_cast_types;
2425
use arrow::error::ArrowError;
25-
use arrow::ffi::FFI_ArrowSchema;
26+
use arrow::ffi::{self, FFI_ArrowArray, FFI_ArrowSchema};
2627
use arrow::ffi_stream::FFI_ArrowArrayStream;
2728
use arrow::pyarrow::FromPyArrow;
2829
use datafusion::arrow::datatypes::{Schema, SchemaRef};
@@ -39,6 +40,7 @@ use datafusion::prelude::*;
3940
use datafusion_ffi::table_provider::FFI_TableProvider;
4041
use futures::{StreamExt, TryStreamExt};
4142
use pyo3::exceptions::PyValueError;
43+
use pyo3::ffi::Py_uintptr_t;
4244
use pyo3::prelude::*;
4345
use pyo3::pybacked::PyBackedStr;
4446
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
@@ -526,18 +528,34 @@ impl PyDataFrame {
526528
let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
527529
.map_err(PyDataFusionError::from)?;
528530

529-
// Profiling `rb.to_pyarrow(py)` showed that the conversion holds the
530-
// Python GIL for almost all of its execution. Serially converting a
531-
// large number of batches therefore throttles CPU utilisation. Run the
532-
// conversions in Rayon threads and only acquire the GIL when creating
533-
// the final PyArrow objects so the CPU intensive work happens in
534-
// parallel.
535-
py.allow_threads(move || {
536-
batches
537-
.into_par_iter()
538-
.map(|rb| Python::with_gil(|py| rb.to_pyarrow(py)))
539-
.collect()
540-
})
531+
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
532+
.allow_threads(|| {
533+
batches
534+
.into_par_iter()
535+
.map(|rb| {
536+
let sa: StructArray = rb.into();
537+
ffi::to_ffi(&sa.to_data())
538+
})
539+
.collect()
540+
})
541+
.map_err(PyDataFusionError::from)?;
542+
543+
let module = py.import("pyarrow")?;
544+
let class = module.getattr("RecordBatch")?;
545+
ffi_batches
546+
.into_iter()
547+
.map(|(array, schema)| {
548+
class
549+
.call_method1(
550+
"_import_from_c",
551+
(
552+
addr_of!(array) as Py_uintptr_t,
553+
addr_of!(schema) as Py_uintptr_t,
554+
),
555+
)
556+
.map(Into::into)
557+
})
558+
.collect()
541559
}
542560

543561
/// Cache DataFrame.
@@ -554,7 +572,34 @@ impl PyDataFrame {
554572

555573
batches
556574
.into_iter()
557-
.map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
575+
.map(|rbs| {
576+
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
577+
.allow_threads(|| {
578+
rbs.into_par_iter()
579+
.map(|rb| {
580+
let sa: StructArray = rb.into();
581+
ffi::to_ffi(&sa.to_data())
582+
})
583+
.collect()
584+
})
585+
.map_err(PyDataFusionError::from)?;
586+
let module = py.import("pyarrow")?;
587+
let class = module.getattr("RecordBatch")?;
588+
ffi_batches
589+
.into_iter()
590+
.map(|(array, schema)| {
591+
class
592+
.call_method1(
593+
"_import_from_c",
594+
(
595+
addr_of!(array) as Py_uintptr_t,
596+
addr_of!(schema) as Py_uintptr_t,
597+
),
598+
)
599+
.map(Into::into)
600+
})
601+
.collect::<PyResult<Vec<_>>>()
602+
})
558603
.collect()
559604
}
560605

0 commit comments

Comments
 (0)