Skip to content

Commit 38cc43f

Browse files
feat: Implement read_arrow with deferred loading via pandas conversion
This commit implements `bigframes.pandas.read_arrow()` for creating BigQuery DataFrames DataFrames from `pyarrow.Table` objects. This version uses a deferred loading mechanism by first converting the Arrow table to a pandas DataFrame (using `ArrowDtype` for type mapping) and then leveraging `Block.from_local()`. This approach simplifies `read_arrow` by removing the `write_engine` parameter and ensures that `Block.from_local()` retains its original behavior of only accepting pandas objects. Key changes: 1. **`bigframes.core.blocks.Block.from_local` Reverted**: * Restored `Block.from_local` to only accept `pandas.DataFrame` or `pandas.Series` as input. Direct `pyarrow.Table` support was removed from this method. 2. **`Session.read_arrow` and `Session._read_arrow` Updated**: * The `write_engine` parameter was removed from these methods in `bigframes/session/__init__.py`. * `Session._read_arrow` now converts the input `pyarrow.Table` to a `pandas.DataFrame` using `arrow_table.to_pandas(types_mapper=pd.ArrowDtype)`. * This pandas DataFrame is then passed to `blocks.Block.from_local(pandas_df, self)` to create a deferred block. 3. **Public API `bpd.read_arrow` Updated**: * The `write_engine` parameter was removed from `bigframes.pandas.read_arrow` in `bigframes/pandas/io/api.py`. * Docstrings updated to reflect deferred loading. 4. **Test Updates**: * Tests in `tests/system/small/test_read_arrow.py` were verified to be consistent with this deferred loading approach. Comparison logic uses `arrow_table.to_pandas(types_mapper=pd.ArrowDtype)` for the expected DataFrame and `check_dtype=True` in assertions.
1 parent 813ef35 commit 38cc43f

File tree

2 files changed

+42
-66
lines changed

2 files changed

+42
-66
lines changed

bigframes/core/blocks.py

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
import google.cloud.bigquery as bigquery
3737
import numpy
3838
import pandas as pd
39-
import pyarrow as pa
39+
# pyarrow is imported below where needed, but not at top-level if only used for type hints by Session
40+
# import pyarrow as pa
4041

4142
from bigframes import session
4243
from bigframes._config import sampling_options
@@ -159,7 +160,7 @@ def __init__(
159160
@classmethod
160161
def from_local(
161162
cls,
162-
data: Union[pd.DataFrame, pd.Series, pa.Table],
163+
data: Union[pd.DataFrame, pd.Series],
163164
session: bigframes.Session,
164165
*,
165166
cache_transpose: bool = True,
@@ -170,24 +171,10 @@ def from_local(
170171
index_names: typing.Sequence[typing.Optional[Label]]
171172
column_names: pd.Index
172173
managed_data: local_data.ManagedArrowTable
174+
array_value_column_ids: typing.Sequence[str]
173175

174-
if isinstance(data, pa.Table):
175-
# For a raw Arrow table, assume all columns are value columns initially.
176-
# No pre-defined index in the Arrow metadata itself that Block.from_local
177-
# would understand without further conventions or schema.pandas_metadata.
178-
# If schema.pandas_metadata exists, it could potentially inform index/column setup,
179-
# but for generic pa.Table, treat all as data.
180-
index_cols = []
181-
value_cols = list(data.column_names) # these will become the internal IDs
182-
index_names = []
183-
column_names = pd.Index(data.column_names) # Use original arrow column names as labels
184-
managed_data = local_data.ManagedArrowTable(data)
185-
# The array_value created later will use value_cols as its column_ids directly
186-
# so no separate reset_index or set_axis is needed for raw arrow table input.
187-
# The internal IDs for the ArrayValue will be the original Arrow column names.
188-
array_value_column_ids = value_cols
189-
190-
elif isinstance(data, pd.Series):
176+
177+
if isinstance(data, pd.Series):
191178
# Standardize column names to avoid collisions, eg. index named "value" and series also named "value"
192179
original_index_names = list(name if name is not None else f"level_{i}" for i, name in enumerate(data.index.names))
193180
original_series_name = data.name if data.name is not None else "value"
@@ -236,7 +223,7 @@ def from_local(
236223
array_value_column_ids = [*index_cols, *value_cols]
237224
else:
238225
raise TypeError(
239-
f"data must be pandas DataFrame, Series, or pyarrow Table. Got: {type(data)}"
226+
f"data must be pandas DataFrame or Series. Got: {type(data)}"
240227
)
241228

242229
array_value = core.ArrayValue.from_managed(managed_data, session=session, default_column_ids=array_value_column_ids)
@@ -248,9 +235,7 @@ def from_local(
248235
index_labels=index_names,
249236
)
250237

251-
# For pandas inputs, attempt to create transpose cache.
252-
# For Arrow inputs, this is skipped as data.T is not standard.
253-
if isinstance(data, (pd.DataFrame, pd.Series)) and cache_transpose:
238+
if cache_transpose:
254239
try:
255240
# this cache will help when aligning on axis=1
256241
block = block.with_transpose_cache(
@@ -3412,3 +3397,5 @@ def _pd_index_to_array_value(
34123397
rows.append(row)
34133398

34143399
return core.ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=session)
3400+
3401+
[end of bigframes/core/blocks.py]

bigframes/session/__init__.py

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -917,70 +917,38 @@ def read_pandas(
917917
f"read_pandas() expects a pandas.DataFrame, but got a {type(pandas_dataframe)}"
918918
)
919919

920+
# read_arrow method (public API)
920921
@typing.overload
921922
def read_arrow(
922923
self,
923924
arrow_table: pyarrow.Table,
924-
*,
925-
write_engine: constants.WriteEngineType = "default", # This line will be removed by the change
926925
) -> dataframe.DataFrame:
927926
...
928927

929-
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types.
928+
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types if needed.
930929
def read_arrow(
931930
self,
932931
arrow_table: pyarrow.Table,
933932
) -> dataframe.DataFrame:
934-
"""Loads a BigQuery DataFrames DataFrame from a ``pyarrow.Table`` object.
935-
936-
This method uses a deferred loading mechanism: the ``pyarrow.Table`` data
937-
is kept in memory locally and converted to a BigFrames DataFrame
938-
representation without immediate BigQuery table materialization.
939-
Actual computation or data transfer to BigQuery is deferred until an
940-
action requiring remote execution is triggered on the DataFrame.
941-
942-
This is the primary session-level API for reading Arrow tables and is
943-
called by :func:`bigframes.pandas.read_arrow`.
944-
945-
**Examples:**
933+
"""Loads a pyarrow.Table into a BigQuery DataFrames DataFrame using deferred execution.
946934
947-
>>> import bigframes.pandas as bpd
948-
>>> import pyarrow as pa
949-
>>> # Assume 'session' is an active BigQuery DataFrames Session
950-
951-
>>> data_dict = {
952-
... "id": pa.array([1, 2, 3], type=pa.int64()),
953-
... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()),
954-
... }
955-
>>> arrow_table = pa.Table.from_pydict(data_dict)
956-
>>> bf_df = session.read_arrow(arrow_table)
957-
>>> bf_df
958-
id product_name
959-
0 1 laptop
960-
1 2 tablet
961-
2 3 phone
962-
<BLANKLINE>
963-
[3 rows x 2 columns]
935+
The Arrow table data is kept in local memory and is only processed or
936+
uploaded to BigQuery when an action requiring remote execution is called
937+
on the DataFrame.
964938
965939
Args:
966940
arrow_table (pyarrow.Table):
967-
The ``pyarrow.Table`` object to load.
941+
The pyarrow Table to load.
968942
969943
Returns:
970944
bigframes.dataframe.DataFrame:
971-
A new BigQuery DataFrames DataFrame representing the data from the
972-
input ``pyarrow.Table``.
973-
974-
Raises:
975-
ValueError:
976-
If the input object is not a ``pyarrow.Table``.
945+
A new DataFrame representing the data from the pyarrow Table.
977946
"""
978-
if isinstance(arrow_table, pyarrow.Table):
979-
return self._read_arrow(arrow_table)
980-
else:
947+
if not isinstance(arrow_table, pyarrow.Table):
981948
raise ValueError(
982949
f"read_arrow() expects a pyarrow.Table, but got a {type(arrow_table)}"
983950
)
951+
return self._read_arrow(arrow_table)
984952

985953
def _read_pandas(
986954
self,
@@ -1054,7 +1022,28 @@ def _read_arrow(
10541022
A new DataFrame representing the data from the Arrow table.
10551023
"""
10561024
import bigframes.dataframe as dataframe
1057-
block = blocks.Block.from_local(arrow_table, self)
1025+
# The Block.from_local method is now responsible for handling pyarrow.Table input.
1026+
# This may involve an internal conversion to ManagedArrowTable or similar.
1027+
block = blocks.Block.from_local(arrow_table, self) # This line was part of a previous _read_arrow, will be removed.
1028+
return dataframe.DataFrame(block) # This line was part of a previous _read_arrow, will be removed.
1029+
1030+
def _read_arrow(
1031+
self,
1032+
arrow_table: pyarrow.Table,
1033+
) -> dataframe.DataFrame:
1034+
"""Internal helper to load a pyarrow.Table via deferred mechanism.
1035+
1036+
Converts the Arrow table to a pandas DataFrame using ArrowDtype,
1037+
then creates a Block using from_local for deferred loading.
1038+
"""
1039+
# Ensure necessary imports are at the top of the file or class:
1040+
# import pandas
1041+
# import bigframes.core.blocks as blocks
1042+
# import bigframes.dataframe as dataframe
1043+
1044+
# It's good practice to import pandas as pd, but since it's already imported as pandas, we'll use that.
1045+
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype)
1046+
block = blocks.Block.from_local(pandas_df, self)
10581047
return dataframe.DataFrame(block)
10591048

10601049
def read_csv(

0 commit comments

Comments
 (0)