Skip to content

Commit c98a69f

Browse files
committed
revert branch UNPICK
1 parent 23d175d commit c98a69f

File tree

13 files changed

+374
-637
lines changed

13 files changed

+374
-637
lines changed

Cargo.lock

Lines changed: 285 additions & 297 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "/build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
3030

3131
[features]
3232
default = ["mimalloc"]
@@ -48,7 +48,6 @@ uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4949
async-trait = "0.1.89"
5050
futures = "0.3"
51-
rayon = "1.10"
5251
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5352
url = "2"
5453
log = "0.4.27"

benchmarks/collect_gil_bench.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

docs/source/user-guide/configuration.rst

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,5 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4747
print(ctx)
4848
4949
50-
.. _target_partitions:
51-
52-
Target partitions and threads
53-
-----------------------------
54-
55-
The :py:meth:`~datafusion.context.SessionConfig.with_target_partitions` method
56-
controls how many partitions DataFusion uses when executing a query. Each
57-
partition is processed on its own thread, so this setting effectively limits
58-
the number of threads that will be scheduled.
59-
60-
For most workloads a good starting value is the number of logical CPU cores on
61-
your machine. You can use :func:`os.cpu_count` to automatically configure this::
62-
63-
import os
64-
config = SessionConfig().with_target_partitions(os.cpu_count())
65-
66-
Choosing a value significantly higher than the available cores can lead to
67-
excessive context switching without performance gains, while a much lower value
68-
may underutilize the machine.
69-
70-
7150
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
7251
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.

docs/source/user-guide/dataframe/collect-gil.md

Lines changed: 0 additions & 26 deletions
This file was deleted.

docs/source/user-guide/dataframe/index.rst

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ The ``DataFrame`` class is the core abstraction in DataFusion that represents ta
2525
on that data. DataFrames provide a flexible API for transforming data through various operations such as
2626
filtering, projection, aggregation, joining, and more.
2727

28-
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29-
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called. ``collect()`` loads
30-
all record batches into Python memory; for large results you may want to stream data instead using
31-
``execute_stream()`` or ``__arrow_c_stream__()``.
28+
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29+
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called.
3230

3331
Creating DataFrames
3432
-------------------
@@ -130,47 +128,27 @@ DataFusion's DataFrame API offers a wide range of operations:
130128
131129
Terminal Operations
132130
-------------------
133-
``collect()`` materializes every record batch in Python. While convenient, this
134-
eagerly loads the full result set into memory and can overwhelm the Python
135-
process for large queries. Alternatives that stream data from Rust avoid this
136-
memory growth:
131+
132+
To materialize the results of your DataFrame operations:
137133

138134
.. code-block:: python
139135
140-
# Collect all data as PyArrow RecordBatches (loads entire result set)
136+
# Collect all data as PyArrow RecordBatches
141137
result_batches = df.collect()
142-
143-
# Stream batches using the native API
144-
stream = df.execute_stream()
145-
for batch in stream:
146-
... # process each RecordBatch
147-
148-
# Stream via the Arrow C Data Interface
149-
import pyarrow as pa
150-
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
151-
for batch in reader:
152-
...
153-
154-
# Convert to various formats (also load all data into memory)
138+
139+
# Convert to various formats
155140
pandas_df = df.to_pandas() # Pandas DataFrame
156141
polars_df = df.to_polars() # Polars DataFrame
157142
arrow_table = df.to_arrow_table() # PyArrow Table
158143
py_dict = df.to_pydict() # Python dictionary
159144
py_list = df.to_pylist() # Python list of dictionaries
160-
145+
161146
# Display results
162147
df.show() # Print tabular format to console
163-
148+
164149
# Count rows
165150
count = df.count()
166151
167-
For large outputs, prefer engine-level writers such as ``df.write_parquet()``
168-
or other DataFusion writers. These stream data directly to the destination and
169-
avoid buffering the entire dataset in Python.
170-
171-
For more on parallel record batch conversion and the Python GIL, see
172-
:doc:`collect-gil`.
173-
174152
HTML Rendering
175153
--------------
176154

@@ -229,4 +207,3 @@ For a complete list of available functions, see the :py:mod:`datafusion.function
229207
:maxdepth: 1
230208

231209
rendering
232-
collect-gil

docs/source/user-guide/io/arrow.rst

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,34 +57,17 @@ and returns a ``StructArray``. Common pyarrow sources you can use are:
5757
Exporting from DataFusion
5858
-------------------------
5959

60-
DataFusion DataFrames implement ``__arrow_c_stream__`` so any Python library
61-
that accepts this interface can import a DataFusion ``DataFrame`` directly.
60+
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
61+
Python library that accepts these can import a DataFusion DataFrame directly.
6262

63-
``collect()`` or ``pa.table(df)`` will materialize every record batch in
64-
Python. For large results this can quickly exhaust memory. Instead, stream the
65-
output incrementally:
63+
.. warning::
64+
It is important to note that this will cause the DataFrame execution to happen, which may be
65+
a time consuming task. That is, you will cause a
66+
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
6667

67-
.. ipython:: python
68-
69-
# Stream batches with DataFusion's native API
70-
stream = df.execute_stream()
71-
for batch in stream:
72-
... # process each RecordBatch as it arrives
73-
74-
.. ipython:: python
75-
76-
# Expose a C stream that PyArrow can consume lazily
77-
import pyarrow as pa
78-
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
79-
for batch in reader:
80-
... # process each batch without buffering the entire table
81-
82-
If the goal is simply to persist results, prefer engine-level writers such as
83-
``df.write_parquet()``. These writers stream data from Rust directly to the
84-
destination and avoid Python-side memory growth.
8568

8669
.. ipython:: python
8770
8871
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
89-
pa.table(df) # loads all batches into memory
72+
pa.table(df)
9073

python/datafusion/context.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,7 @@ def with_batch_size(self, batch_size: int) -> SessionConfig:
161161
def with_target_partitions(self, target_partitions: int) -> SessionConfig:
162162
"""Customize the number of target partitions for query execution.
163163
164-
Each partition is processed on its own thread, so this value controls
165-
the degree of parallelism. A good starting point is the number of
166-
logical CPU cores on your machine, for example
167-
``SessionConfig().with_target_partitions(os.cpu_count())``.
168-
169-
See the :ref:`configuration guide <target_partitions>` for more
170-
discussion on choosing a value.
164+
Increasing partitions can increase concurrency.
171165
172166
Args:
173167
target_partitions: Number of target partitions.

python/tests/test_dataframe.py

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import re
2121
import threading
2222
import time
23-
import tracemalloc
2423
from typing import Any
2524

2625
import pyarrow as pa
@@ -253,6 +252,13 @@ def test_filter(df):
253252
assert result.column(2) == pa.array([5])
254253

255254

255+
def test_show_empty(df, capsys):
256+
df_empty = df.filter(column("a") > literal(3))
257+
df_empty.show()
258+
captured = capsys.readouterr()
259+
assert "DataFrame has no rows" in captured.out
260+
261+
256262
def test_sort(df):
257263
df = df.sort(column("b").sort(ascending=False))
258264

@@ -1464,14 +1470,6 @@ def test_empty_to_pandas(df):
14641470
assert set(pandas_df.columns) == {"a", "b", "c"}
14651471

14661472

1467-
def test_show_empty_dataframe(df, capsys):
1468-
"""Ensure showing an empty DataFrame prints a helpful message."""
1469-
empty_df = df.limit(0)
1470-
empty_df.show()
1471-
captured = capsys.readouterr()
1472-
assert "Empty DataFrame" in captured.out
1473-
1474-
14751473
def test_to_polars(df):
14761474
# Skip test if polars is not installed
14771475
pl = pytest.importorskip("polars")
@@ -1576,23 +1574,6 @@ async def test_execute_stream_partitioned_async(df):
15761574
assert not remaining_batches
15771575

15781576

1579-
def test_arrow_c_stream_streaming(large_df):
1580-
df = large_df.repartition(4)
1581-
capsule = df.__arrow_c_stream__()
1582-
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
1583-
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]
1584-
ptr = ctypes.pythonapi.PyCapsule_GetPointer(capsule, b"arrow_array_stream")
1585-
reader = pa.RecordBatchReader._import_from_c(ptr)
1586-
1587-
tracemalloc.start()
1588-
batch_count = sum(1 for _ in reader)
1589-
current, peak = tracemalloc.get_traced_memory()
1590-
tracemalloc.stop()
1591-
1592-
assert batch_count > 1
1593-
assert peak < 50 * MB
1594-
1595-
15961577
def test_empty_to_arrow_table(df):
15971578
# Convert empty datafusion dataframe to pyarrow Table
15981579
pyarrow_table = df.limit(0).to_arrow_table()
@@ -2683,3 +2664,19 @@ def trigger_interrupt():
26832664

26842665
# Make sure the interrupt thread has finished
26852666
interrupt_thread.join(timeout=1.0)
2667+
2668+
2669+
def test_show_select_where_no_rows(capsys) -> None:
2670+
ctx = SessionContext()
2671+
df = ctx.sql("SELECT 1 WHERE 1=0")
2672+
df.show()
2673+
out = capsys.readouterr().out
2674+
assert "DataFrame has no rows" in out
2675+
2676+
2677+
def test_show_from_empty_batch(capsys) -> None:
2678+
ctx = SessionContext()
2679+
batch = pa.record_batch([pa.array([], type=pa.int32())], names=["a"])
2680+
ctx.create_dataframe([[batch]]).show()
2681+
out = capsys.readouterr().out
2682+
assert "| a |" in out

src/context.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use pyo3::prelude::*;
3434
use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider};
3535
use crate::dataframe::PyDataFrame;
3636
use crate::dataset::Dataset;
37-
use crate::errors::{py_datafusion_err, PyDataFusionResult};
37+
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
3838
use crate::expr::sort_expr::PySortExpr;
3939
use crate::physical_plan::PyExecutionPlan;
4040
use crate::record_batch::PyRecordBatchStream;
@@ -45,7 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, spawn_and_wait, validate_pycapsule, wait_for_future};
48+
use crate::utils::{get_global_ctx, get_tokio_runtime, validate_pycapsule, wait_for_future};
4949
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5050
use datafusion::arrow::pyarrow::PyArrowType;
5151
use datafusion::arrow::record_batch::RecordBatch;
@@ -66,13 +66,15 @@ use datafusion::execution::disk_manager::DiskManagerMode;
6666
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
6767
use datafusion::execution::options::ReadOptions;
6868
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
69+
use datafusion::physical_plan::SendableRecordBatchStream;
6970
use datafusion::prelude::{
7071
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
7172
};
7273
use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
7374
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7475
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
7576
use pyo3::IntoPyObjectExt;
77+
use tokio::task::JoinHandle;
7678

7779
/// Configuration options for a SessionContext
7880
#[pyclass(name = "SessionConfig", module = "datafusion", subclass)]
@@ -1130,8 +1132,12 @@ impl PySessionContext {
11301132
py: Python,
11311133
) -> PyDataFusionResult<PyRecordBatchStream> {
11321134
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1135+
// create a Tokio runtime to run the async code
1136+
let rt = &get_tokio_runtime().0;
11331137
let plan = plan.plan.clone();
1134-
let stream = spawn_and_wait(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1138+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
1139+
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1140+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
11351141
Ok(PyRecordBatchStream::new(stream))
11361142
}
11371143
}

0 commit comments

Comments
 (0)