Skip to content

Commit 07baa76

Browse files
google-labs-jules[bot]tswast
authored andcommitted
feat: Add read_arrow methods to Session and pandas
Adds `read_arrow` methods to `bigframes.session.Session` and `bigframes.pandas.read_arrow` for creating BigQuery DataFrames DataFrames from PyArrow Tables. The implementation refactors existing logic from `bigframes.session._io.bigquery.read_gbq_query` for converting Arrow data into BigFrames DataFrames. Includes: - New file `bigframes/session/_io/arrow.py` with the core conversion logic. - `read_arrow(pa.Table) -> bpd.DataFrame` in `Session` class. - `read_arrow(pa.Table) -> bpd.DataFrame` in `pandas` module. - Unit and system tests for the new functionality. - Docstrings for new methods/functions. Note: Unit tests for direct DataFrame operations (shape, to_pandas) on the result of read_arrow are currently failing due to the complexity of mocking the session and executor for LocalDataNode interactions. System tests are recommended for full end-to-end validation.
1 parent c88a825 commit 07baa76

File tree

10 files changed

+664
-32
lines changed

10 files changed

+664
-32
lines changed

bigframes/pandas/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes
2727
import pandas
28+
import pyarrow as pa
2829

2930
import bigframes._config as config
3031
from bigframes.core import log_adapter
@@ -54,6 +55,21 @@
5455
)
5556
import bigframes.series
5657
import bigframes.session
58+
59+
60+
def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
61+
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.
62+
63+
Args:
64+
pa_table (pyarrow.Table):
65+
PyArrow table to load data from.
66+
67+
Returns:
68+
bigframes.dataframe.DataFrame:
69+
A new DataFrame representing the data from the PyArrow table.
70+
"""
71+
session = global_session.get_global_session()
72+
return session.read_arrow(pa_table=pa_table)
5773
import bigframes.session._io.bigquery
5874
import bigframes.session.clients
5975
import bigframes.version

bigframes/session/_io/arrow.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Private helpers for reading pyarrow objects."""
16+
17+
from __future__ import annotations
18+
19+
import pyarrow as pa
20+
21+
from bigframes import dataframe
22+
import bigframes.core as core
23+
import bigframes.dtypes
24+
from bigframes.core import local_data, pyarrow_utils
25+
import bigframes.core.blocks as blocks
26+
import bigframes.core.guid
27+
import bigframes.core.schema as schemata
28+
import bigframes.session
29+
30+
31+
def create_dataframe_from_arrow_table(
32+
pa_table: pa.Table, *, session: bigframes.session.Session
33+
) -> dataframe.DataFrame:
34+
"""Convert a PyArrow Table into a BigQuery DataFrames DataFrame.
35+
36+
This DataFrame will wrap a LocalNode, meaning the data is processed locally.
37+
38+
Args:
39+
pa_table (pyarrow.Table):
40+
The PyArrow Table to convert.
41+
session (bigframes.session.Session):
42+
The BigQuery DataFrames session to associate with the new DataFrame.
43+
44+
Returns:
45+
bigframes.dataframe.DataFrame:
46+
A new DataFrame representing the data from the PyArrow table.
47+
"""
48+
# TODO(tswast): Use array_value.promote_offsets() instead once that node is
49+
# supported by the local engine.
50+
offsets_col = bigframes.core.guid.generate_guid()
51+
# TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859):
52+
# Allow users to specify the "total ordering" column(s) or allow multiple
53+
# such columns.
54+
pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col)
55+
56+
# We use the ManagedArrowTable constructor directly, because the
57+
# results of to_arrow() should be the source of truth with regards
58+
# to canonical formats since it comes from either the BQ Storage
59+
# Read API or has been transformed by google-cloud-bigquery to look
60+
# like the output of the BQ Storage Read API.
61+
schema_items = []
62+
for field in pa_table.schema:
63+
bf_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(field.type, allow_lossless_cast=True)
64+
schema_items.append(schemata.SchemaItem(field.name, bf_dtype))
65+
bf_schema = schemata.ArraySchema(tuple(schema_items))
66+
67+
mat = local_data.ManagedArrowTable(
68+
pa_table,
69+
bf_schema,
70+
)
71+
mat.validate()
72+
73+
array_value = core.ArrayValue.from_managed(mat, session)
74+
block = blocks.Block(
75+
array_value,
76+
(offsets_col,),
77+
[field.name for field in pa_table.schema if field.name != offsets_col],
78+
(None,),
79+
)
80+
return dataframe.DataFrame(block)

bigframes/session/_io/bigquery/read_gbq_query.py

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@
2323
import pandas
2424

2525
from bigframes import dataframe
26-
from bigframes.core import local_data, pyarrow_utils
27-
import bigframes.core as core
28-
import bigframes.core.blocks as blocks
29-
import bigframes.core.guid
30-
import bigframes.core.schema as schemata
3126
import bigframes.session
27+
from bigframes.session._io.arrow import create_dataframe_from_arrow_table
3228

3329

3430
def create_dataframe_from_query_job_stats(
@@ -61,30 +57,4 @@ def create_dataframe_from_row_iterator(
6157
'jobless' case where there's no destination table.
6258
"""
6359
pa_table = rows.to_arrow()
64-
65-
# TODO(tswast): Use array_value.promote_offsets() instead once that node is
66-
# supported by the local engine.
67-
offsets_col = bigframes.core.guid.generate_guid()
68-
pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col)
69-
70-
# We use the ManagedArrowTable constructor directly, because the
71-
# results of to_arrow() should be the source of truth with regards
72-
# to canonical formats since it comes from either the BQ Storage
73-
# Read API or has been transformed by google-cloud-bigquery to look
74-
# like the output of the BQ Storage Read API.
75-
mat = local_data.ManagedArrowTable(
76-
pa_table,
77-
schemata.ArraySchema.from_bq_schema(
78-
list(rows.schema) + [bigquery.SchemaField(offsets_col, "INTEGER")]
79-
),
80-
)
81-
mat.validate()
82-
83-
array_value = core.ArrayValue.from_managed(mat, session)
84-
block = blocks.Block(
85-
array_value,
86-
(offsets_col,),
87-
[field.name for field in rows.schema],
88-
(None,),
89-
)
90-
return dataframe.DataFrame(block)
60+
return create_dataframe_from_arrow_table(pa_table, session=session)

bigframes/session/bigquery_session.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
# TODO: Non-ibis implementation
2222
import bigframes_vendored.ibis.backends.bigquery.datatypes as ibis_bq
2323
import google.cloud.bigquery as bigquery
24+
import pyarrow as pa
2425

26+
import bigframes.dataframe
2527
from bigframes.core.compile import googlesql
2628
from bigframes.session import temporary_storage
29+
from bigframes.session._io.arrow import create_dataframe_from_arrow_table
2730

2831
KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0
2932

@@ -142,6 +145,19 @@ def _keep_session_alive(self):
142145
except Exception as e:
143146
logging.warning("BigQuery session keep-alive query errored : %s", e)
144147

148+
def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
149+
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.
150+
151+
Args:
152+
pa_table (pyarrow.Table):
153+
PyArrow table to load data from.
154+
155+
Returns:
156+
bigframes.dataframe.DataFrame:
157+
A new DataFrame representing the data from the PyArrow table.
158+
"""
159+
return create_dataframe_from_arrow_table(pa_table, session=self)
160+
145161

146162
class RecurringTaskDaemon:
147163
def __init__(self, task: Callable[[], None], frequency: datetime.timedelta):

0 commit comments

Comments
 (0)