Skip to content

Commit b6cdd29

Browse files
committed
Revert "UNPICK changes to review"
This reverts commit 405d220.
1 parent 405d220 commit b6cdd29

File tree

14 files changed

+726
-70
lines changed

14 files changed

+726
-70
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,34 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = [
30+
"/src",
31+
"/datafusion",
32+
"/LICENSE.txt",
33+
"build.rs",
34+
"pyproject.toml",
35+
"Cargo.toml",
36+
"Cargo.lock",
37+
]
3038

3139
[features]
3240
default = ["mimalloc"]
33-
protoc = [ "datafusion-substrait/protoc" ]
41+
protoc = ["datafusion-substrait/protoc"]
3442
substrait = ["dep:datafusion-substrait"]
3543

3644
[dependencies]
37-
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
45+
tokio = { version = "1.45", features = [
46+
"macros",
47+
"rt",
48+
"rt-multi-thread",
49+
"sync",
50+
] }
51+
pyo3 = { version = "0.24", features = [
52+
"extension-module",
53+
"abi3",
54+
"abi3-py39",
55+
] }
56+
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
4057
pyo3-log = "0.12.4"
4158
arrow = { version = "55.1.0", features = ["pyarrow"] }
4259
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
@@ -45,15 +62,23 @@ datafusion-proto = { version = "49.0.2" }
4562
datafusion-ffi = { version = "49.0.2" }
4663
prost = "0.13.1" # keep in line with `datafusion-substrait`
4764
uuid = { version = "1.18", features = ["v4"] }
48-
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
65+
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
66+
"local_dynamic_tls",
67+
] }
4968
async-trait = "0.1.89"
5069
futures = "0.3"
51-
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
70+
cstr = "0.2"
71+
object_store = { version = "0.12.3", features = [
72+
"aws",
73+
"gcp",
74+
"azure",
75+
"http",
76+
] }
5277
url = "2"
5378
log = "0.4.27"
5479

5580
[build-dependencies]
56-
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
81+
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
5782
pyo3-build-config = "0.24"
5883

5984
[lib]

dev/changelog/50.0.0.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Apache DataFusion Python 50.0.0 Changelog
21+
22+
**Other:**
23+
24+
- remove deprecated `DataFrame.to_record_batch_stream`; use `execute_stream` instead.

docs/source/user-guide/dataframe/index.rst

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,110 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152+
PyArrow Streaming
153+
-----------------
154+
155+
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156+
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157+
Earlier versions eagerly converted the entire DataFrame when exporting to
158+
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159+
are produced lazily so you can process arbitrarily large results without
160+
out-of-memory errors.
161+
162+
.. code-block:: python
163+
164+
import pyarrow as pa
165+
166+
# Create a PyArrow RecordBatchReader without materializing all batches
167+
reader = pa.RecordBatchReader.from_stream(df)
168+
for batch in reader:
169+
... # process each batch as it is produced
170+
171+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
172+
objects lazily so you can loop over results directly without importing
173+
PyArrow:
174+
175+
.. code-block:: python
176+
177+
for batch in df:
178+
... # each batch is a ``datafusion.RecordBatch``
179+
180+
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
181+
table. ``pa.table(df)`` collects the entire DataFrame eagerly into a
182+
PyArrow table:
183+
184+
.. code-block:: python
185+
186+
import pyarrow as pa
187+
table = pa.table(df)
188+
189+
Asynchronous iteration is supported as well, allowing integration with
190+
``asyncio`` event loops:
191+
192+
.. code-block:: python
193+
194+
async for batch in df:
195+
... # process each batch as it is produced
196+
197+
To work with the stream directly, use
198+
``execute_stream()``, which returns a
199+
:class:`~datafusion.RecordBatchStream`:
200+
201+
.. code-block:: python
202+
203+
stream = df.execute_stream()
204+
for batch in stream:
205+
...
206+
207+
Execute as Stream
208+
^^^^^^^^^^^^^^^^^
209+
210+
For finer control over streaming execution, use
211+
:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a
212+
:py:class:`datafusion.RecordBatchStream`:
213+
214+
.. code-block:: python
215+
216+
stream = df.execute_stream()
217+
for batch in stream:
218+
... # process each batch as it is produced
219+
220+
.. tip::
221+
222+
To get a PyArrow reader instead, call
223+
``pa.RecordBatchReader.from_stream(df)``.
224+
225+
When partition boundaries are important,
226+
:py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
227+
returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per
228+
partition:
229+
230+
.. code-block:: python
231+
232+
for stream in df.execute_stream_partitioned():
233+
for batch in stream:
234+
... # each stream yields RecordBatches
235+
236+
To process partitions concurrently, first collect the streams into a list
237+
and then poll each one in a separate ``asyncio`` task:
238+
239+
.. code-block:: python
240+
241+
import asyncio
242+
243+
async def consume(stream):
244+
async for batch in stream:
245+
...
246+
247+
streams = list(df.execute_stream_partitioned())
248+
await asyncio.gather(*(consume(s) for s in streams))
249+
250+
See :doc:`../io/arrow` for additional details on the Arrow interface.
251+
152252
HTML Rendering
153253
--------------
154254

python/datafusion/dataframe.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28+
AsyncIterator,
2829
Iterable,
30+
Iterator,
2931
Literal,
3032
Optional,
3133
Union,
@@ -42,7 +44,7 @@
4244
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4345
from datafusion.expr import Expr, SortExpr, sort_or_default
4446
from datafusion.plan import ExecutionPlan, LogicalPlan
45-
from datafusion.record_batch import RecordBatchStream
47+
from datafusion.record_batch import RecordBatch, RecordBatchStream
4648

4749
if TYPE_CHECKING:
4850
import pathlib
@@ -296,6 +298,9 @@ def __init__(
296298
class DataFrame:
297299
"""Two dimensional table representation of data.
298300
301+
DataFrame objects are iterable; iterating over a DataFrame yields
302+
:class:`datafusion.RecordBatch` instances lazily.
303+
299304
See :ref:`user_guide_concepts` in the online documentation for more information.
300305
"""
301306

@@ -312,7 +317,7 @@ def into_view(self) -> pa.Table:
312317
return self.df.into_view()
313318

314319
def __getitem__(self, key: str | list[str]) -> DataFrame:
315-
"""Return a new :py:class`DataFrame` with the specified column or columns.
320+
"""Return a new :py:class:`DataFrame` with the specified column or columns.
316321
317322
Args:
318323
key: Column name or list of column names to select.
@@ -1105,21 +1110,33 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
11051110
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
11061111

11071112
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1108-
"""Export an Arrow PyCapsule Stream.
1113+
"""Export the DataFrame as an Arrow C Stream.
11091114
1110-
This will execute and collect the DataFrame. We will attempt to respect the
1111-
requested schema, but only trivial transformations will be applied such as only
1112-
returning the fields listed in the requested schema if their data types match
1113-
those in the DataFrame.
1115+
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1116+
Arrow's C Stream interface. Record batches are produced incrementally, so the
1117+
full result set is never materialized in memory. When ``requested_schema`` is
1118+
provided, only straightforward projections such as column selection or
1119+
reordering are applied.
11141120
11151121
Args:
11161122
requested_schema: Attempt to provide the DataFrame using this schema.
11171123
11181124
Returns:
1119-
Arrow PyCapsule object.
1125+
Arrow PyCapsule object representing an ``ArrowArrayStream``.
11201126
"""
1127+
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1128+
# ``execute_stream_partitioned`` under the hood to stream batches while
1129+
# preserving the original partition order.
11211130
return self.df.__arrow_c_stream__(requested_schema)
11221131

1132+
def __iter__(self) -> Iterator[RecordBatch]:
1133+
"""Return an iterator over this DataFrame's record batches."""
1134+
return iter(self.execute_stream())
1135+
1136+
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1137+
"""Return an async iterator over this DataFrame's record batches."""
1138+
return self.execute_stream().__aiter__()
1139+
11231140
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11241141
"""Apply a function to the current DataFrame which returns another DataFrame.
11251142

python/datafusion/record_batch.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49+
def __arrow_c_array__(
50+
self, requested_schema: object | None = None
51+
) -> tuple[object, object]:
52+
"""Export the record batch via the Arrow C Data Interface.
53+
54+
This allows zero-copy interchange with libraries that support the
55+
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56+
CDataInterface/PyCapsuleInterface.html>`_.
57+
58+
Args:
59+
requested_schema: Attempt to provide the record batch using this
60+
schema. Only straightforward projections such as column
61+
selection or reordering are applied.
62+
63+
Returns:
64+
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65+
``ArrowSchema``.
66+
"""
67+
return self.record_batch.__arrow_c_array__(requested_schema)
68+
4969

5070
class RecordBatchStream:
5171
"""This class represents a stream of record batches.
@@ -63,19 +83,19 @@ def next(self) -> RecordBatch:
6383
return next(self)
6484

6585
async def __anext__(self) -> RecordBatch:
66-
"""Async iterator function."""
86+
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
6787
next_batch = await self.rbs.__anext__()
6888
return RecordBatch(next_batch)
6989

7090
def __next__(self) -> RecordBatch:
71-
"""Iterator function."""
91+
"""Return the next :py:class:`RecordBatch` in the stream."""
7292
next_batch = next(self.rbs)
7393
return RecordBatch(next_batch)
7494

7595
def __aiter__(self) -> typing_extensions.Self:
76-
"""Async iterator function."""
96+
"""Return an asynchronous iterator over record batches."""
7797
return self
7898

7999
def __iter__(self) -> typing_extensions.Self:
80-
"""Iterator function."""
100+
"""Return an iterator over record batches."""
81101
return self

python/tests/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import SessionContext
20+
from datafusion import DataFrame, SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,3 +49,12 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52+
53+
54+
@pytest.fixture
55+
def fail_collect(monkeypatch):
56+
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57+
msg = "collect should not be called"
58+
raise AssertionError(msg)
59+
60+
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)