Skip to content

Commit 5e9c2bf

Browse files
refactor: Simplify read_arrow to always use deferred loading
This commit refactors `bigframes.pandas.read_arrow()` and its underlying Session methods to always use a deferred loading mechanism, removing the `write_engine` parameter and its associated complexity. The original `read_pandas` implementation has been restored, and it retains its `write_engine` functionality. Key changes: 1. **`Session.read_arrow` and `Session._read_arrow` Simplification**: * Removed the `write_engine` parameter from these methods in `bigframes/session/__init__.py`. * `Session._read_arrow` now directly converts the input `pyarrow.Table` to a `pandas.DataFrame` (using `types_mapper=pd.ArrowDtype`) and then to a `bigframes.core.blocks.Block` using `Block.from_local()`. This effectively implements a deferred load. 2. **Public API `bpd.read_arrow` Simplification**: * Removed the `write_engine` parameter from `bigframes.pandas.read_arrow` in `bigframes/pandas/io/api.py`. * Docstrings updated to reflect the removal of `write_engine` and the deferred loading behavior. 3. **`GbqDataLoader.read_arrow` Removal**: * Deleted `GbqDataLoader.read_arrow` from `bigframes/session/loader.py` as it is no longer needed. 4. **`read_pandas` Restoration**: * `Session._read_pandas` in `bigframes/session/__init__.py` has been reverted to its original implementation, preserving its `write_engine` options and behavior. * The associated helper `_get_loader_details_for_engine` and `_read_arrow_inline` (from a previous unsubmitted refactoring) have been removed. 5. **Test Updates**: * Tests in `tests/system/small/test_read_arrow.py` have been updated to remove `write_engine` specific scenarios. * Existing tests are verified against the new deferred loading mechanism, with pandas comparison DataFrames created using `types_mapper=pd.ArrowDtype` for consistency.
1 parent 9f30c18 commit 5e9c2bf

File tree

4 files changed

+89
-344
lines changed

4 files changed

+89
-344
lines changed

bigframes/pandas/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
from bigframes.pandas.core.api import to_timedelta
4040
from bigframes.pandas.io.api import (
4141
from_glob_path,
42-
read_arrow,
4342
read_csv,
4443
read_gbq,
4544
read_gbq_function,
@@ -51,6 +50,7 @@
5150
read_pandas,
5251
read_parquet,
5352
read_pickle,
53+
read_arrow,
5454
)
5555
import bigframes.series
5656
import bigframes.session

bigframes/pandas/io/api.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,13 +516,10 @@ def read_pandas(
516516
# would likely be converted to a Table first or handled by a different dedicated function.
517517
def read_arrow(
518518
arrow_table: pyarrow.Table,
519-
*,
520-
write_engine: constants.WriteEngineType = "default",
521519
) -> bigframes.dataframe.DataFrame:
522520
return global_session.with_default_session(
523521
bigframes.session.Session.read_arrow,
524522
arrow_table,
525-
write_engine=write_engine,
526523
)
527524

528525

bigframes/session/__init__.py

Lines changed: 49 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -930,40 +930,31 @@ def read_arrow(
930930
def read_arrow(
931931
self,
932932
arrow_table: pyarrow.Table,
933-
*,
934-
write_engine: constants.WriteEngineType = "default",
935933
) -> dataframe.DataFrame:
936934
"""Loads a BigQuery DataFrames DataFrame from a ``pyarrow.Table`` object.
937935
938-
This method persists the ``pyarrow.Table`` data into a temporary BigQuery
939-
table, which is automatically cleaned up when the session is closed.
936+
This method uses a deferred loading mechanism: the ``pyarrow.Table`` data
937+
is kept in memory locally and converted to a BigFrames DataFrame
938+
representation without immediate BigQuery table materialization.
939+
Actual computation or data transfer to BigQuery is deferred until an
940+
action requiring remote execution is triggered on the DataFrame.
941+
940942
This is the primary session-level API for reading Arrow tables and is
941943
called by :func:`bigframes.pandas.read_arrow`.
942944
943-
.. note::
944-
The method of persistence (and associated BigQuery costs/quotas)
945-
depends on the ``write_engine`` parameter and the table's size.
946-
If the input ``pyarrow.Table`` is small (determined by its in-memory
947-
size, roughly <= 5MB using ``pyarrow.Table.nbytes``), its data might
948-
be inlined directly into a SQL query when ``write_engine`` is
949-
``"default"`` or ``"bigquery_inline"``. For larger tables, or when
950-
``write_engine`` is ``"bigquery_load"``, ``"bigquery_streaming"``,
951-
or ``"bigquery_write"``, a BigQuery load job or streaming API is used.
952-
953945
**Examples:**
954946
955947
>>> import bigframes.pandas as bpd
956948
>>> import pyarrow as pa
957949
>>> # Assume 'session' is an active BigQuery DataFrames Session
958-
>>> # bpd.options.display.progress_bar = None # Optional: to silence progress bar
959950
960951
>>> data_dict = {
961952
... "id": pa.array([1, 2, 3], type=pa.int64()),
962953
... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()),
963954
... }
964955
>>> arrow_table = pa.Table.from_pydict(data_dict)
965-
>>> df = session.read_arrow(arrow_table)
966-
>>> df
956+
>>> bf_df = session.read_arrow(arrow_table)
957+
>>> bf_df
967958
id product_name
968959
0 1 laptop
969960
1 2 tablet
@@ -973,25 +964,7 @@ def read_arrow(
973964
974965
Args:
975966
arrow_table (pyarrow.Table):
976-
The ``pyarrow.Table`` object to load into BigQuery DataFrames.
977-
write_engine (str, default "default"):
978-
Specifies the mechanism for writing data to BigQuery.
979-
Supported values:
980-
981-
* ``"default"``: (Recommended) Automatically selects the most
982-
appropriate write mechanism. If the table's estimated
983-
in-memory size (via ``arrow_table.nbytes``) is less than
984-
or equal to :data:`bigframes.constants.MAX_INLINE_BYTES`
985-
(currently 5000 bytes), ``"bigquery_inline"`` is used.
986-
Otherwise, ``"bigquery_load"`` is used.
987-
* ``"bigquery_inline"``: Embeds the table data directly into a
988-
BigQuery SQL query. Suitable only for very small tables.
989-
* ``"bigquery_load"``: Uses a BigQuery load job to ingest the
990-
data. Preferred for larger datasets.
991-
* ``"bigquery_streaming"``: Employs the BigQuery Storage Write
992-
API in streaming mode (older JSON-based API).
993-
* ``"bigquery_write"``: [Preview] Leverages the BigQuery Storage
994-
Write API (Arrow-based). This feature is in public preview.
967+
The ``pyarrow.Table`` object to load.
995968
996969
Returns:
997970
bigframes.dataframe.DataFrame:
@@ -1000,11 +973,10 @@ def read_arrow(
1000973
1001974
Raises:
1002975
ValueError:
1003-
If the input object is not a ``pyarrow.Table`` or if an
1004-
unsupported ``write_engine`` is specified.
976+
If the input object is not a ``pyarrow.Table``.
1005977
"""
1006978
if isinstance(arrow_table, pyarrow.Table):
1007-
return self._read_arrow(arrow_table, write_engine=write_engine)
979+
return self._read_arrow(arrow_table)
1008980
else:
1009981
raise ValueError(
1010982
f"read_arrow() expects a pyarrow.Table, but got a {type(arrow_table)}"
@@ -1024,33 +996,34 @@ def _read_pandas(
1024996
"bigframes.pandas.DataFrame."
1025997
)
1026998

1027-
final_engine, is_inline, loader_method = self._get_loader_details_for_engine(
1028-
write_engine, pandas_dataframe.memory_usage(deep=True).sum()
1029-
)
999+
mem_usage = pandas_dataframe.memory_usage(deep=True).sum()
1000+
if write_engine == "default":
1001+
write_engine = (
1002+
"bigquery_load"
1003+
if mem_usage > bigframes.constants.MAX_INLINE_BYTES
1004+
else "bigquery_inline"
1005+
)
10301006

1031-
if is_inline:
1032-
if final_engine == "bigquery_inline":
1033-
# Ensure inline data isn't too large if specified directly
1034-
if (
1035-
pandas_dataframe.memory_usage(deep=True).sum()
1036-
> bigframes.constants.MAX_INLINE_BYTES
1037-
):
1038-
raise ValueError(
1039-
f"DataFrame size ({pandas_dataframe.memory_usage(deep=True).sum()} bytes) "
1040-
f"exceeds the maximum allowed for inline data "
1041-
f"({bigframes.constants.MAX_INLINE_BYTES} bytes) when "
1042-
f"write_engine='bigquery_inline'."
1043-
)
1044-
return self._read_pandas_inline(pandas_dataframe)
1045-
elif final_engine == "_deferred":
1046-
return dataframe.DataFrame(
1047-
blocks.Block.from_local(pandas_dataframe, self)
1007+
if write_engine == "bigquery_inline":
1008+
if mem_usage > bigframes.constants.MAX_INLINE_BYTES:
1009+
raise ValueError(
1010+
f"DataFrame size ({mem_usage} bytes) exceeds the maximum allowed "
1011+
f"for inline data ({bigframes.constants.MAX_INLINE_BYTES} bytes)."
10481012
)
1049-
else:
1050-
# Should not happen if _get_loader_details_for_engine is correct
1051-
raise ValueError(f"Unexpected inline engine: {final_engine}")
1013+
return self._read_pandas_inline(pandas_dataframe)
1014+
elif write_engine == "bigquery_load":
1015+
return self._loader.read_pandas(pandas_dataframe, method="load")
1016+
elif write_engine == "bigquery_streaming":
1017+
return self._loader.read_pandas(pandas_dataframe, method="stream")
1018+
elif write_engine == "bigquery_write":
1019+
return self._loader.read_pandas(pandas_dataframe, method="write")
1020+
elif write_engine == "_deferred":
1021+
# Must import here to avoid circular dependency from blocks.py
1022+
import bigframes.dataframe as dataframe
1023+
1024+
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self))
10521025
else:
1053-
return self._loader.read_pandas(pandas_dataframe, method=loader_method)
1026+
raise ValueError(f"Got unexpected write_engine '{write_engine}'")
10541027

10551028
def _read_pandas_inline(
10561029
self, pandas_dataframe: pandas.DataFrame
@@ -1061,107 +1034,32 @@ def _read_pandas_inline(
10611034
local_block = blocks.Block.from_local(pandas_dataframe, self)
10621035
return dataframe.DataFrame(local_block)
10631036

1064-
def _read_arrow_inline(self, arrow_table: pyarrow.Table) -> dataframe.DataFrame:
1065-
"""Creates a BigFrames DataFrame from an in-memory pyarrow Table by inlining data."""
1066-
import bigframes.dataframe as dataframe
1067-
1068-
# Assuming Block.from_local can handle pandas DataFrame.
1069-
# If Block.from_local is enhanced to take pyarrow.Table directly,
1070-
# this conversion can be removed.
1071-
pandas_df = arrow_table.to_pandas()
1072-
local_block = blocks.Block.from_local(pandas_df, self)
1073-
return dataframe.DataFrame(local_block)
1074-
1075-
def _get_loader_details_for_engine(
1076-
self, write_engine: str, in_memory_size: int
1077-
) -> Tuple[str, bool, str]:
1078-
"""
1079-
Determines the final write engine, if it's an inline operation, and the loader method name.
1080-
1081-
Args:
1082-
write_engine (str):
1083-
The user-provided or default write engine.
1084-
in_memory_size (int):
1085-
The size of the data in bytes.
1086-
1087-
Returns:
1088-
Tuple[str, bool, str]:
1089-
A tuple containing:
1090-
- final_write_engine (str): The resolved engine.
1091-
- is_inline (bool): True if the engine is "bigquery_inline" or "_deferred".
1092-
- loader_method_name (str): The method name for GbqDataLoader
1093-
(e.g., "load", "stream", "write"), or an empty string if inline.
1094-
"""
1095-
final_write_engine = write_engine
1096-
if write_engine == "default":
1097-
if in_memory_size > bigframes.constants.MAX_INLINE_BYTES:
1098-
final_write_engine = "bigquery_load"
1099-
else:
1100-
final_write_engine = "bigquery_inline"
1101-
1102-
if final_write_engine == "bigquery_inline":
1103-
return "bigquery_inline", True, ""
1104-
elif final_write_engine == "bigquery_load":
1105-
return "bigquery_load", False, "load"
1106-
elif final_write_engine == "bigquery_streaming":
1107-
return "bigquery_streaming", False, "stream"
1108-
elif final_write_engine == "bigquery_write":
1109-
return "bigquery_write", False, "write"
1110-
elif final_write_engine == "_deferred": # Specific to _read_pandas
1111-
return "_deferred", True, ""
1112-
else:
1113-
raise ValueError(f"Got unexpected write_engine '{final_write_engine}'")
1114-
11151037
def _read_arrow(
11161038
self,
11171039
arrow_table: pyarrow.Table,
1118-
*,
1119-
write_engine: constants.WriteEngineType = "default",
11201040
) -> dataframe.DataFrame:
1121-
"""Internal helper to load a ``pyarrow.Table`` into a BigQuery DataFrames DataFrame.
1041+
"""Internal helper to load a ``pyarrow.Table`` using a deferred mechanism.
11221042
1123-
This method orchestrates the data loading process based on the specified
1124-
``write_engine``. It determines whether to inline the data, use a load
1125-
job, or employ streaming based on the engine and table properties.
1043+
Converts the Arrow table to a pandas DataFrame with ArrowDTypes,
1044+
then creates a BigFrames block from this local pandas DataFrame.
1045+
The data remains in memory until an operation triggers execution.
11261046
Called by the public :meth:`~Session.read_arrow`.
11271047
11281048
Args:
11291049
arrow_table (pyarrow.Table):
11301050
The ``pyarrow.Table`` to load.
1131-
write_engine (str):
1132-
The write engine determining the loading mechanism.
1133-
If ``"default"``, the engine is chosen based on the table's
1134-
estimated size (``arrow_table.nbytes``). See
1135-
:meth:`~Session.read_arrow` for detailed descriptions of options.
11361051
11371052
Returns:
11381053
bigframes.dataframe.DataFrame:
11391054
A new DataFrame representing the data from the Arrow table.
1140-
1141-
Raises:
1142-
ValueError: If an unsupported ``write_engine`` is specified.
11431055
"""
1144-
final_engine, is_inline, loader_method = self._get_loader_details_for_engine(
1145-
write_engine, arrow_table.nbytes
1146-
)
1147-
1148-
if is_inline:
1149-
if final_engine == "bigquery_inline":
1150-
# Ensure inline data isn't too large if specified directly
1151-
if arrow_table.nbytes > bigframes.constants.MAX_INLINE_BYTES:
1152-
raise ValueError(
1153-
f"Arrow Table size ({arrow_table.nbytes} bytes) "
1154-
f"exceeds the maximum allowed for inline data "
1155-
f"({bigframes.constants.MAX_INLINE_BYTES} bytes) when "
1156-
f"write_engine='bigquery_inline'."
1157-
)
1158-
return self._read_arrow_inline(arrow_table)
1159-
# No "_deferred" case for Arrow currently
1160-
else:
1161-
# Should not happen
1162-
raise ValueError(f"Unexpected inline engine for Arrow: {final_engine}")
1163-
else:
1164-
return self._loader.read_arrow(arrow_table, method=loader_method)
1056+
import bigframes.dataframe as dataframe
1057+
# It's important to use types_mapper=pd.ArrowDtype to preserve Arrow types
1058+
# as much as possible when converting to pandas, especially for types
1059+
# that might otherwise lose precision or be converted to NumPy types.
1060+
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype)
1061+
block = blocks.Block.from_local(pandas_df, self)
1062+
return dataframe.DataFrame(block)
11651063

11661064
def read_csv(
11671065
self,
@@ -2273,3 +2171,5 @@ def _warn_if_bf_version_is_obsolete():
22732171
if today - release_date > datetime.timedelta(days=365):
22742172
msg = f"Your BigFrames version {version.__version__} is more than 1 year old. Please update to the lastest version."
22752173
warnings.warn(msg, bfe.ObsoleteVersionWarning)
2174+
2175+
[end of bigframes/session/__init__.py]

0 commit comments

Comments
 (0)