Skip to content

Commit ab1dd7e

Browse files
committed
Add crossbeam dependencies and initialize global Rayon pool
1 parent bdcb076 commit ab1dd7e

File tree

4 files changed

+43
-7
lines changed

4 files changed

+43
-7
lines changed

Cargo.lock

Lines changed: 24 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/context.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, spawn_and_wait, validate_pycapsule, wait_for_future};
48+
use crate::utils::{
49+
get_global_ctx, init_global_rayon_pool, spawn_and_wait, validate_pycapsule,
50+
wait_for_future,
51+
};
4952
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5053
use datafusion::arrow::pyarrow::PyArrowType;
5154
use datafusion::arrow::record_batch::RecordBatch;
@@ -313,6 +316,7 @@ impl PySessionContext {
313316
} else {
314317
SessionConfig::default().with_information_schema(true)
315318
};
319+
init_global_rayon_pool(config.target_partitions());
316320
let runtime_env_builder = if let Some(c) = runtime {
317321
c.builder
318322
} else {

src/dataframe.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ use crate::physical_plan::PyExecutionPlan;
5252
use crate::record_batch::PyRecordBatchStream;
5353
use crate::sql::logical::PyLogicalPlan;
5454
use crate::utils::{
55-
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_and_wait, validate_pycapsule,
56-
wait_for_future, wait_for_stream_next,
55+
get_tokio_runtime, init_global_rayon_pool, is_ipython_env, py_obj_to_scalar_value,
56+
spawn_and_wait, validate_pycapsule, wait_for_future, wait_for_stream_next,
5757
};
5858
use crate::{
5959
errors::PyDataFusionResult,
@@ -364,6 +364,7 @@ fn record_batches_to_pyarrow(
364364
record_batch_class: &Bound<'_, PyAny>,
365365
batches: Vec<RecordBatch>,
366366
) -> PyResult<Vec<PyObject>> {
367+
init_global_rayon_pool(std::thread::available_parallelism().map_or(1, |n| n.get()));
367368
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
368369
.allow_threads(|| {
369370
batches

src/utils.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use futures::StreamExt;
2828
use pyo3::prelude::*;
2929
use pyo3::{exceptions::PyValueError, types::PyCapsule};
3030
use std::{future::Future, sync::OnceLock, time::Duration};
31+
use rayon::ThreadPoolBuilder;
3132
use tokio::{runtime::Runtime, time::sleep};
3233
/// Utility to get the Tokio Runtime from Python
3334
#[inline]
@@ -59,6 +60,16 @@ pub(crate) fn get_global_ctx() -> &'static SessionContext {
5960
CTX.get_or_init(SessionContext::new)
6061
}
6162

63+
#[inline]
64+
pub(crate) fn init_global_rayon_pool(num_threads: usize) {
65+
static RAYON_POOL: OnceLock<()> = OnceLock::new();
66+
RAYON_POOL.get_or_init(|| {
67+
let _ = ThreadPoolBuilder::new()
68+
.num_threads(num_threads)
69+
.build_global();
70+
});
71+
}
72+
6273
/// Utility to collect rust futures with GIL released and respond to
6374
/// Python interrupts such as ``KeyboardInterrupt``. If a signal is
6475
/// received while the future is running, the future is aborted and the

0 commit comments

Comments
 (0)