From 07baa76ede04b4ef02f57e24b48bd6c9a0d4979a Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 25 Jun 2025 20:37:44 +0000 Subject: [PATCH 1/3] feat: Add read_arrow methods to Session and pandas Adds `read_arrow` methods to `bigframes.session.Session` and `bigframes.pandas.read_arrow` for creating BigQuery DataFrames DataFrames from PyArrow Tables. The implementation refactors existing logic from `bigframes.session._io.bigquery.read_gbq_query` for converting Arrow data into BigFrames DataFrames. Includes: - New file `bigframes/session/_io/arrow.py` with the core conversion logic. - `read_arrow(pa.Table) -> bpd.DataFrame` in `Session` class. - `read_arrow(pa.Table) -> bpd.DataFrame` in `pandas` module. - Unit and system tests for the new functionality. - Docstrings for new methods/functions. Note: Unit tests for direct DataFrame operations (shape, to_pandas) on the result of read_arrow are currently failing due to the complexity of mocking the session and executor for LocalDataNode interactions. System tests are recommended for full end-to-end validation. --- bigframes/pandas/__init__.py | 16 ++ bigframes/session/_io/arrow.py | 80 ++++++ .../session/_io/bigquery/read_gbq_query.py | 34 +-- bigframes/session/bigquery_session.py | 16 ++ tests/system/small/test_arrow_io.py | 224 ++++++++++++++++ .../test_compile_numerical_add/out.sql | 16 ++ .../out.sql | 16 ++ .../test_compile_string_add/out.sql | 16 ++ .../sqlglot/test_compile_scalar_expr.py | 37 +++ tests/unit/session/test_io_arrow.py | 241 ++++++++++++++++++ 10 files changed, 664 insertions(+), 32 deletions(-) create mode 100644 bigframes/session/_io/arrow.py create mode 100644 tests/system/small/test_arrow_io.py create mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql create mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql create mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql create mode 100644 tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py create mode 100644 tests/unit/session/test_io_arrow.py diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index ed999e62c1..e45a480854 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -25,6 +25,7 @@ import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes import pandas +import pyarrow as pa import bigframes._config as config from bigframes.core import log_adapter @@ -54,6 +55,21 @@ ) import bigframes.series import bigframes.session + + +def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + session = global_session.get_global_session() + return session.read_arrow(pa_table=pa_table) import bigframes.session._io.bigquery import bigframes.session.clients import bigframes.version diff --git a/bigframes/session/_io/arrow.py b/bigframes/session/_io/arrow.py new file mode 100644 index 0000000000..ebd1964fcd --- /dev/null +++ b/bigframes/session/_io/arrow.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Private helpers for reading pyarrow objects.""" + +from __future__ import annotations + +import pyarrow as pa + +from bigframes import dataframe +import bigframes.core as core +import bigframes.dtypes +from bigframes.core import local_data, pyarrow_utils +import bigframes.core.blocks as blocks +import bigframes.core.guid +import bigframes.core.schema as schemata +import bigframes.session + + +def create_dataframe_from_arrow_table( + pa_table: pa.Table, *, session: bigframes.session.Session +) -> dataframe.DataFrame: + """Convert a PyArrow Table into a BigQuery DataFrames DataFrame. + + This DataFrame will wrap a LocalNode, meaning the data is processed locally. + + Args: + pa_table (pyarrow.Table): + The PyArrow Table to convert. + session (bigframes.session.Session): + The BigQuery DataFrames session to associate with the new DataFrame. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + # TODO(tswast): Use array_value.promote_offsets() instead once that node is + # supported by the local engine. + offsets_col = bigframes.core.guid.generate_guid() + # TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859): + # Allow users to specify the "total ordering" column(s) or allow multiple + # such columns. + pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) + + # We use the ManagedArrowTable constructor directly, because the + # results of to_arrow() should be the source of truth with regards + # to canonical formats since it comes from either the BQ Storage + # Read API or has been transformed by google-cloud-bigquery to look + # like the output of the BQ Storage Read API. + schema_items = [] + for field in pa_table.schema: + bf_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(field.type, allow_lossless_cast=True) + schema_items.append(schemata.SchemaItem(field.name, bf_dtype)) + bf_schema = schemata.ArraySchema(tuple(schema_items)) + + mat = local_data.ManagedArrowTable( + pa_table, + bf_schema, + ) + mat.validate() + + array_value = core.ArrayValue.from_managed(mat, session) + block = blocks.Block( + array_value, + (offsets_col,), + [field.name for field in pa_table.schema if field.name != offsets_col], + (None,), + ) + return dataframe.DataFrame(block) diff --git a/bigframes/session/_io/bigquery/read_gbq_query.py b/bigframes/session/_io/bigquery/read_gbq_query.py index 70c83d7875..25c923ad96 100644 --- a/bigframes/session/_io/bigquery/read_gbq_query.py +++ b/bigframes/session/_io/bigquery/read_gbq_query.py @@ -23,12 +23,8 @@ import pandas from bigframes import dataframe -from bigframes.core import local_data, pyarrow_utils -import bigframes.core as core -import bigframes.core.blocks as blocks -import bigframes.core.guid -import bigframes.core.schema as schemata import bigframes.session +from bigframes.session._io.arrow import create_dataframe_from_arrow_table def create_dataframe_from_query_job_stats( @@ -61,30 +57,4 @@ def create_dataframe_from_row_iterator( 'jobless' case where there's no destination table. """ pa_table = rows.to_arrow() - - # TODO(tswast): Use array_value.promote_offsets() instead once that node is - # supported by the local engine. - offsets_col = bigframes.core.guid.generate_guid() - pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) - - # We use the ManagedArrowTable constructor directly, because the - # results of to_arrow() should be the source of truth with regards - # to canonical formats since it comes from either the BQ Storage - # Read API or has been transformed by google-cloud-bigquery to look - # like the output of the BQ Storage Read API. - mat = local_data.ManagedArrowTable( - pa_table, - schemata.ArraySchema.from_bq_schema( - list(rows.schema) + [bigquery.SchemaField(offsets_col, "INTEGER")] - ), - ) - mat.validate() - - array_value = core.ArrayValue.from_managed(mat, session) - block = blocks.Block( - array_value, - (offsets_col,), - [field.name for field in rows.schema], - (None,), - ) - return dataframe.DataFrame(block) + return create_dataframe_from_arrow_table(pa_table, session=session) diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index 883087df07..b424149bc8 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -21,9 +21,12 @@ # TODO: Non-ibis implementation import bigframes_vendored.ibis.backends.bigquery.datatypes as ibis_bq import google.cloud.bigquery as bigquery +import pyarrow as pa +import bigframes.dataframe from bigframes.core.compile import googlesql from bigframes.session import temporary_storage +from bigframes.session._io.arrow import create_dataframe_from_arrow_table KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0 @@ -142,6 +145,19 @@ def _keep_session_alive(self): except Exception as e: logging.warning("BigQuery session keep-alive query errored : %s", e) + def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + return create_dataframe_from_arrow_table(pa_table, session=self) + class RecurringTaskDaemon: def __init__(self, task: Callable[[], None], frequency: datetime.timedelta): diff --git a/tests/system/small/test_arrow_io.py b/tests/system/small/test_arrow_io.py new file mode 100644 index 0000000000..e8e3eef072 --- /dev/null +++ b/tests/system/small/test_arrow_io.py @@ -0,0 +1,224 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import pyarrow as pa +import pytest + +import bigframes.pandas as bpd + + +@pytest.fixture(scope="session") +def arrow_all_types_table(): + # Using a dictionary to create the PyArrow table + data_dict = { + "int64_col": pa.array([1, 2, None, 4], type=pa.int64()), + "float64_col": pa.array([0.1, 0.2, None, 0.4], type=pa.float64()), + "bool_col": pa.array([True, False, None, True], type=pa.bool_()), + "string_col": pa.array(["apple", "banana", None, "cherry"], type=pa.string()), + "bytes_col": pa.array([b"one", b"two", None, b"four"], type=pa.large_binary()), # Using large_binary for BQ compatibility + "date_col": pa.array( + [datetime.date(2023, 1, 1), datetime.date(2023, 1, 2), None, datetime.date(2023, 1, 4)], + type=pa.date32(), + ), + "timestamp_s_col": pa.array( + [ + datetime.datetime.fromisoformat(ts) if ts else None + for ts in ["2023-01-01T00:00:00", "2023-01-02T12:34:56", None, "2023-01-04T23:59:59"] + ], + type=pa.timestamp("s", tz="UTC"), + ), + "timestamp_ms_col": pa.array( + [ + datetime.datetime.fromisoformat(ts) if ts else None + for ts in ["2023-01-01T00:00:00.123", "2023-01-02T12:34:56.456", None, "2023-01-04T23:59:59.789"] + ], + type=pa.timestamp("ms", tz="UTC"), + ), + "timestamp_us_col": pa.array( + [ + datetime.datetime.fromisoformat(ts) if ts else None + for ts in ["2023-01-01T00:00:00.123456", "2023-01-02T12:34:56.789012", None, "2023-01-04T23:59:59.345678"] + ], + type=pa.timestamp("us", tz="UTC"), + ), + # BigQuery doesn't directly support nanosecond precision timestamps, they are typically truncated or rounded to microsecond. + # "timestamp_ns_col": pa.array( + # [ + # datetime.datetime.fromisoformat(ts) if ts else None + # for ts in ["2023-01-01T00:00:00.123456789", "2023-01-02T12:34:56.890123456", None, "2023-01-04T23:59:59.456789012"] + # ], + # type=pa.timestamp("ns", tz="UTC"), + # ), + # TODO: Add more complex types if supported by the conversion logic + # "list_col": pa.array([[1, 2], None, [3, 4]], type=pa.list_(pa.int64())), + # "struct_col": pa.array([{"a": 1, "b": "x"}, None, {"a": 2, "b": "y"}], type=pa.struct([("a", pa.int64()), ("b", pa.string())])), + } + return pa.Table.from_pydict(data_dict) + + +def test_read_arrow_all_types_global_session(arrow_all_types_table): + # System test using the global session (or default session if set) + # bpd.options.bigquery.project = "your-gcp-project" # Ensure project is set if not using global default + # bpd.options.bigquery.location = "your-location" # Ensure location is set + + df = bpd.read_arrow(arrow_all_types_table) + + assert isinstance(df, bpd.DataFrame) + # Number of columns might change if more types are added + assert df.shape == (4, 9) + assert list(df.columns) == [ + "int64_col", + "float64_col", + "bool_col", + "string_col", + "bytes_col", + "date_col", + "timestamp_s_col", + "timestamp_ms_col", + "timestamp_us_col", + # "timestamp_ns_col", + ] + + # Fetching the data to pandas to verify + pd_df = df.to_pandas() + assert pd_df.shape == (4, 9) + + # Basic check of data integrity (first non-null value) + assert pd_df["int64_col"][0] == 1 + assert pd_df["float64_col"][0] == 0.1 + assert pd_df["bool_col"][0] is True + assert pd_df["string_col"][0] == "apple" + assert pd_df["bytes_col"][0] == b"one" + assert str(pd_df["date_col"][0]) == "2023-01-01" # Pandas converts to its own date/datetime objects + assert str(pd_df["timestamp_s_col"][0]) == "2023-01-01 00:00:00+00:00" + assert str(pd_df["timestamp_ms_col"][0]) == "2023-01-01 00:00:00.123000+00:00" + assert str(pd_df["timestamp_us_col"][0]) == "2023-01-01 00:00:00.123456+00:00" + # assert str(pd_df["timestamp_ns_col"][0]) == "2023-01-01 00:00:00.123456789+00:00" # BQ truncates to us + + # Check for None/NaT where PyArrow table had None + assert pd_df["int64_col"][2] is pandas.NA + assert pd_df["float64_col"][2] is pandas.NA + assert pd_df["bool_col"][2] is pandas.NA + assert pd_df["string_col"][2] is None # Pandas uses None for object types (like string) + assert pd_df["bytes_col"][2] is None + assert pd_df["date_col"][2] is pandas.NaT + assert pd_df["timestamp_s_col"][2] is pandas.NaT + assert pd_df["timestamp_ms_col"][2] is pandas.NaT + assert pd_df["timestamp_us_col"][2] is pandas.NaT + # assert pd_df["timestamp_ns_col"][2] is pandas.NaT + + +def test_read_arrow_empty_table_global_session(session): + empty_table = pa.Table.from_pydict({ + "col_a": pa.array([], type=pa.int64()), + "col_b": pa.array([], type=pa.string()) + }) + df = bpd.read_arrow(empty_table) + assert isinstance(df, bpd.DataFrame) + assert df.shape == (0, 2) + assert list(df.columns) == ["col_a", "col_b"] + pd_df = df.to_pandas() + assert pd_df.empty + assert list(pd_df.columns) == ["col_a", "col_b"] + assert pd_df["col_a"].dtype == "int64" # Or Int64 if there were NAs + assert pd_df["col_b"].dtype == "object" + + +def test_read_arrow_specific_session(session, arrow_all_types_table): + # Create a new session to ensure it's used + # In a real test setup, you might create a session with specific configurations + specific_session = bpd.get_global_session() # or bpd.Session(...) + df = specific_session.read_arrow(arrow_all_types_table) + + assert isinstance(df, bpd.DataFrame) + assert df.shape == (4, 9) + pd_df = df.to_pandas() # Forces execution + assert pd_df.shape == (4, 9) + assert pd_df["int64_col"][0] == 1 # Basic data check + + # Ensure the dataframe is associated with the correct session if possible to check + # This might involve checking some internal property or behavior linked to the session + # For example, if temporary tables are created, they should use this session's context. + # For local data, this is harder to verify directly without inspecting internals. + assert df.session == specific_session + +@pytest.mark.parametrize( + "data,arrow_type,expected_bq_type_kind", + [ + ([1, 2], pa.int8(), "INT64"), + ([1, 2], pa.int16(), "INT64"), + ([1, 2], pa.int32(), "INT64"), + ([1, 2], pa.int64(), "INT64"), + ([1.0, 2.0], pa.float16(), "FLOAT64"), # BQ promotes half to float + ([1.0, 2.0], pa.float32(), "FLOAT64"), + ([1.0, 2.0], pa.float64(), "FLOAT64"), + ([True, False], pa.bool_(), "BOOL"), + (["a", "b"], pa.string(), "STRING"), + (["a", "b"], pa.large_string(), "STRING"), + ([b"a", b"b"], pa.binary(), "BYTES"), + ([b"a", b"b"], pa.large_binary(), "BYTES"), + # Duration types are tricky, BQ doesn't have a direct equivalent, often converted to INT64 or STRING + # ([pa.scalar(1000, type=pa.duration('s')), pa.scalar(2000, type=pa.duration('s'))], pa.duration('s'), "INT64"), # Or error + ([datetime.date(2023,1,1)], pa.date32(), "DATE"), + ([datetime.date(2023,1,1)], pa.date64(), "DATE"), # BQ promotes date64 to date + ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("s", tz="UTC"), "TIMESTAMP"), + ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("ms", tz="UTC"), "TIMESTAMP"), + ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("us", tz="UTC"), "TIMESTAMP"), + # Time types also tricky, BQ has TIME type + # ([datetime.time(12,34,56)], pa.time32("s"), "TIME"), + # ([datetime.time(12,34,56,789000)], pa.time64("us"), "TIME"), + ], +) +def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind): + """ + Tests that various arrow types are mapped to the expected BigQuery types. + This is an indirect check via the resulting DataFrame's schema. + """ + pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"]) + df = session.read_arrow(pa_table) + + # Access the underlying ibis schema to check BQ types + # This is an internal detail, but useful for this kind of test. + # Adjust if the way to get ibis schema changes. + ibis_schema = df._block.expr.schema + assert ibis_schema["col"].name.upper() == expected_bq_type_kind + + # Also check pandas dtype after conversion for good measure + pd_df = df.to_pandas() + # The pandas dtype can be more complex (e.g., ArrowDtype, nullable dtypes) + # but this ensures it's processed. + assert pd_df["col"].shape == (len(data),) + +# TODO(developer): Add tests for more complex arrow types like lists, structs, maps +# if and when they are supported by create_dataframe_from_arrow_table. +# For example: +# def test_read_arrow_list_type(session): +# pa_table = pa.Table.from_arrays([pa.array([[1,2], [3,4,5]], type=pa.list_(pa.int64()))], names=['list_col']) +# df = session.read_arrow(pa_table) +# ibis_schema = df._block.expr.schema +# assert ibis_schema["list_col"].is_array() +# assert ibis_schema["list_col"].value_type.is_integer() +# # Further checks on data + +# def test_read_arrow_struct_type(session): +# struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) +# pa_table = pa.Table.from_arrays([pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], names=['struct_col']) +# df = session.read_arrow(pa_table) +# ibis_schema = df._block.expr.schema +# assert ibis_schema["struct_col"].is_struct() +# assert ibis_schema["struct_col"].fields["a"].is_integer() +# assert ibis_schema["struct_col"].fields["b"].is_string() +# # Further checks on dataTool output for `create_file_with_block`: diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql new file mode 100644 index 0000000000..1496f89f28 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql @@ -0,0 +1,16 @@ +WITH `bfcte_0` AS ( + SELECT + `int64_col` AS `bfcol_0`, + `rowindex` AS `bfcol_1` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + `bfcol_1` AS `bfcol_4`, + `bfcol_0` + `bfcol_0` AS `bfcol_5` + FROM `bfcte_0` +) +SELECT + `bfcol_4` AS `rowindex`, + `bfcol_5` AS `int64_col` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql new file mode 100644 index 0000000000..9c4b01a6df --- /dev/null +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql @@ -0,0 +1,16 @@ +WITH `bfcte_0` AS ( + SELECT + `int64_col` AS `bfcol_0`, + `rowindex` AS `bfcol_1` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + `bfcol_1` AS `bfcol_4`, + `bfcol_0` + 1 AS `bfcol_5` + FROM `bfcte_0` +) +SELECT + `bfcol_4` AS `rowindex`, + `bfcol_5` AS `int64_col` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql new file mode 100644 index 0000000000..7a8ab83df1 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql @@ -0,0 +1,16 @@ +WITH `bfcte_0` AS ( + SELECT + `rowindex` AS `bfcol_0`, + `string_col` AS `bfcol_1` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + `bfcol_0` AS `bfcol_4`, + CONCAT(`bfcol_1`, 'a') AS `bfcol_5` + FROM `bfcte_0` +) +SELECT + `bfcol_4` AS `rowindex`, + `bfcol_5` AS `string_col` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py b/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py new file mode 100644 index 0000000000..862ee2467c --- /dev/null +++ b/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py @@ -0,0 +1,37 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import bigframes.pandas as bpd + +pytest.importorskip("pytest_snapshot") + + +def test_compile_numerical_add(scalars_types_df: bpd.DataFrame, snapshot): + bf_df = scalars_types_df[["int64_col"]] + bf_df["int64_col"] = bf_df["int64_col"] + bf_df["int64_col"] + snapshot.assert_match(bf_df.sql, "out.sql") + + +def test_compile_numerical_add_w_scalar(scalars_types_df: bpd.DataFrame, snapshot): + bf_df = scalars_types_df[["int64_col"]] + bf_df["int64_col"] = bf_df["int64_col"] + 1 + snapshot.assert_match(bf_df.sql, "out.sql") + + +def test_compile_string_add(scalars_types_df: bpd.DataFrame, snapshot): + bf_df = scalars_types_df[["string_col"]] + bf_df["string_col"] = bf_df["string_col"] + "a" + snapshot.assert_match(bf_df.sql, "out.sql") diff --git a/tests/unit/session/test_io_arrow.py b/tests/unit/session/test_io_arrow.py new file mode 100644 index 0000000000..c95b9121d3 --- /dev/null +++ b/tests/unit/session/test_io_arrow.py @@ -0,0 +1,241 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import pyarrow as pa +import pytest + +import bigframes.pandas as bpd +from bigframes.session._io.arrow import create_dataframe_from_arrow_table +from bigframes.testing import mocks + + +@pytest.fixture(scope="module") +def session(): + # Use the mock session from bigframes.testing + return mocks.create_bigquery_session() + + +def test_create_dataframe_from_arrow_table(session): + pa_table = pa.Table.from_pydict( + { + "col1": [1, 2, 3], + "col2": ["a", "b", "c"], + } + ) + # The mock session might need specific setup for to_pandas if it hits BQ client + # For now, let's assume the LocalNode execution path for to_pandas is self-contained enough + # or that the default mock bqclient.query_and_wait is sufficient. + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert len(df.columns) == 2 + # The default mock session's query_and_wait returns an empty table by default for non-special queries. + # We need to mock the specific table result for to_pandas to work as expected for LocalDataNode. + # This is a bit of a deeper mock interaction. + # A simpler unit test might avoid to_pandas() and check Block properties. + # However, if create_bigquery_session's mocks are sufficient, this might pass. + # For LocalDataNode, to_pandas eventually calls executor.execute(block.expr) + # which for the default mock session might not return the actual data. + # Let's adjust how the mock session's query_and_wait works for this specific table. + + # For local data, to_pandas will use the local data directly if possible, + # avoiding BQ calls. Let's see if the current mocks are enough. + pd_df = df.to_pandas() + assert pd_df.shape == (3,2) + assert list(df.columns) == ["col1", "col2"] + assert pd_df["col1"].dtype == "int64" + assert pd_df["col2"].dtype == "object" + + +def test_create_dataframe_from_arrow_table_empty(session): + pa_table = pa.Table.from_pydict( + { + "col1": pa.array([], type=pa.int64()), + "col2": pa.array([], type=pa.string()), + } + ) + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert len(df.columns) == 2 + pd_df = df.to_pandas() + assert pd_df.shape == (0,2) + assert list(df.columns) == ["col1", "col2"] + assert pd_df["col1"].dtype == "int64" + assert pd_df["col2"].dtype == "object" + + +def test_create_dataframe_from_arrow_table_all_types(session): + pa_table = pa.Table.from_pydict( + { + "int_col": [1, None, 3], + "float_col": [1.0, None, 3.0], + "bool_col": [True, None, False], + "string_col": ["a", None, "c"], + "bytes_col": [b"a", None, b"c"], + "date_col": pa.array([datetime.date(2023, 1, 1), None, datetime.date(2023, 1, 3)], type=pa.date32()), + "timestamp_col": pa.array([datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), None, datetime.datetime(2023, 1, 3, 12, 0, 0, tzinfo=datetime.timezone.utc)], type=pa.timestamp("us", tz="UTC")), + } + ) + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert len(df.columns) == 7 + + # For LocalDataNode, head() should work locally. + df_head = df.head(5) + assert len(df_head) == 3 + + pd_df = df.to_pandas() + assert pd_df["int_col"].dtype == "Int64" + assert pd_df["float_col"].dtype == "float64" + assert pd_df["bool_col"].dtype == "boolean" + assert pd_df["string_col"].dtype == "object" + assert pd_df["bytes_col"].dtype == "object" + assert pd_df["date_col"].dtype.name.startswith("date32") + assert pd_df["timestamp_col"].dtype.name.startswith("timestamp[us, tz=UTC]") + +@pytest.fixture(scope="module") # Changed to module as it's shared and doesn't change +def arrow_sample_data(): + return pa.Table.from_pydict( + { + "id": [1, 2, 3, 4, 5], + "name": ["foo", "bar", "baz", "qux", "quux"], + "value": [10.0, 20.0, 30.0, 40.0, 50.0], + } + ) + +def test_session_read_arrow(session, arrow_sample_data): + session._set_arrow_table_for_shape(arrow_sample_data) + df = session.read_arrow(arrow_sample_data) + assert isinstance(df, bpd.DataFrame) + # assert df.shape == (5, 3) + assert list(df.columns) == ["id", "name", "value"] + pd_df = df.to_pandas() + assert pd_df["id"].tolist() == [1,2,3,4,5] + assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] + +def test_pandas_read_arrow(arrow_sample_data, mocker): + # This test uses the global session, so we mock what get_global_session returns + mock_session_instance = session() # Get an instance of our MockSession + mock_session_instance._set_arrow_table_for_shape(arrow_sample_data) + mocker.patch("bigframes.pandas.global_session.get_global_session", return_value=mock_session_instance) + + df = bpd.read_arrow(arrow_sample_data) + assert isinstance(df, bpd.DataFrame) + # assert df.shape == (5, 3) + assert list(df.columns) == ["id", "name", "value"] + pd_df = df.to_pandas() + + +def test_create_dataframe_from_arrow_table(session): + pa_table = pa.Table.from_pydict( + { + "col1": [1, 2, 3], + "col2": ["a", "b", "c"], + } + ) + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert df.shape == (3, 2) + assert list(df.columns) == ["col1", "col2"] + # Verify dtypes by converting to pandas - more direct checks might be needed depending on internal structure + pd_df = df.to_pandas() + assert pd_df["col1"].dtype == "int64" + assert pd_df["col2"].dtype == "object" # StringDtype might be more accurate depending on session config + + +def test_create_dataframe_from_arrow_table_empty(session): + pa_table = pa.Table.from_pydict( + { + "col1": pa.array([], type=pa.int64()), + "col2": pa.array([], type=pa.string()), + } + ) + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert df.shape == (0, 2) + assert list(df.columns) == ["col1", "col2"] + pd_df = df.to_pandas() + assert pd_df["col1"].dtype == "int64" + assert pd_df["col2"].dtype == "object" + + +def test_create_dataframe_from_arrow_table_all_types(session): + pa_table = pa.Table.from_pydict( + { + "int_col": [1, None, 3], + "float_col": [1.0, None, 3.0], + "bool_col": [True, None, False], + "string_col": ["a", None, "c"], + "bytes_col": [b"a", None, b"c"], + "date_col": pa.array([datetime.date(2023, 1, 1), None, datetime.date(2023, 1, 3)], type=pa.date32()), + "timestamp_col": pa.array([datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), None, datetime.datetime(2023, 1, 3, 12, 0, 0, tzinfo=datetime.timezone.utc)], type=pa.timestamp("us", tz="UTC")), + } + ) + df = create_dataframe_from_arrow_table(pa_table, session=session) + assert df.shape == (3, 7) + # This will execute the query and fetch results, good for basic validation + df_head = df.head(5) + assert len(df_head) == 3 + + # More detailed dtype checks might require looking at the bq schema or ibis schema + # For now, we rely on to_pandas() for a basic check + pd_df = df.to_pandas() + assert pd_df["int_col"].dtype == "Int64" # Pandas uses nullable Int64Dtype + assert pd_df["float_col"].dtype == "float64" + assert pd_df["bool_col"].dtype == "boolean" # Pandas uses nullable BooleanDtype + assert pd_df["string_col"].dtype == "object" # Or StringDtype + assert pd_df["bytes_col"].dtype == "object" # Or ArrowDtype(pa.binary()) + assert pd_df["date_col"].dtype.name.startswith("date32") # ArrowDtype(pa.date32()) + assert pd_df["timestamp_col"].dtype.name.startswith("timestamp[us, tz=UTC]") # ArrowDtype(pa.timestamp('us', tz='UTC')) + +@pytest.fixture(scope="session") +def arrow_sample_data(): + return pa.Table.from_pydict( + { + "id": [1, 2, 3, 4, 5], + "name": ["foo", "bar", "baz", "qux", "quux"], + "value": [10.0, 20.0, 30.0, 40.0, 50.0], + } + ) + +def test_session_read_arrow(session, arrow_sample_data): + df = session.read_arrow(arrow_sample_data) + assert isinstance(df, bpd.DataFrame) + assert df.shape == (5, 3) + assert list(df.columns) == ["id", "name", "value"] + pd_df = df.to_pandas() + assert pd_df["id"].tolist() == [1,2,3,4,5] + assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] + + def test_pandas_read_arrow(arrow_sample_data, mocker): + # This test uses the global session, so we mock what get_global_session returns + mock_session_instance = session() # Get an instance of our MockSession + mocker.patch("bigframes.pandas.global_session.get_global_session", return_value=mock_session_instance) + + df = bpd.read_arrow(arrow_sample_data) + assert isinstance(df, bpd.DataFrame) + assert df.shape == (5, 3) + assert list(df.columns) == ["id", "name", "value"] + pd_df = df.to_pandas() + assert pd_df["id"].tolist() == [1,2,3,4,5] + assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] + +def test_read_arrow_with_index_col_error(session, arrow_sample_data): + # read_arrow doesn't support index_col, ensure it's not accidentally passed or used + # For now, there's no index_col parameter. If added, this test would need adjustment. + # This test is more of a placeholder to remember this aspect. + # If create_dataframe_from_arrow_table were to accept index_col, this would be relevant. + with pytest.raises(TypeError) as excinfo: + session.read_arrow(arrow_sample_data, index_col="id") # type: ignore + assert "got an unexpected keyword argument 'index_col'" in str(excinfo.value) + + with pytest.raises(TypeError) as excinfo: + bpd.read_arrow(arrow_sample_data, index_col="id") # type: ignore + assert "got an unexpected keyword argument 'index_col'" in str(excinfo.value) From 7988c16fcb115f7f42257bdf2397e3b74f340613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Wed, 25 Jun 2025 16:58:59 -0500 Subject: [PATCH 2/3] rearrange --- bigframes/core/blocks.py | 31 +++ bigframes/pandas/__init__.py | 18 +- bigframes/pandas/io/api.py | 16 ++ bigframes/session/__init__.py | 18 ++ bigframes/session/_io/arrow.py | 80 ------ .../session/_io/bigquery/read_gbq_query.py | 34 ++- bigframes/session/bigquery_session.py | 16 -- tests/system/small/test_arrow_io.py | 231 ++++++------------ 8 files changed, 168 insertions(+), 276 deletions(-) delete mode 100644 bigframes/session/_io/arrow.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 675e8c8b7a..94aa51ec78 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -50,6 +50,7 @@ import bigframes.core.identifiers import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering +import bigframes.core.pyarrow_utils as pyarrow_utils import bigframes.core.schema as bf_schema import bigframes.core.sql as sql import bigframes.core.utils as utils @@ -156,6 +157,36 @@ def __init__( self._view_ref: Optional[bigquery.TableReference] = None self._view_ref_dry_run: Optional[bigquery.TableReference] = None + @classmethod + def from_pyarrow( + cls, + data: pa.Table, + session: bigframes.Session, + ) -> Block: + column_labels = data.column_names + + # TODO(tswast): Use array_value.promote_offsets() instead once that node is + # supported by the local engine. + offsets_col = bigframes.core.guid.generate_guid() + index_ids = [offsets_col] + index_labels = [None] + + # TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859): + # Allow users to specify the "total ordering" column(s) or allow multiple + # such columns. + data = pyarrow_utils.append_offsets(data, offsets_col=offsets_col) + + # from_pyarrow will normalize the types for us. + managed_data = local_data.ManagedArrowTable.from_pyarrow(data) + array_value = core.ArrayValue.from_managed(managed_data, session=session) + block = cls( + array_value, + column_labels=column_labels, + index_columns=index_ids, + index_labels=index_labels, + ) + return block + @classmethod def from_local( cls, diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index e45a480854..f163d25757 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -25,7 +25,6 @@ import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes import pandas -import pyarrow as pa import bigframes._config as config from bigframes.core import log_adapter @@ -41,6 +40,7 @@ from bigframes.pandas.io.api import ( _read_gbq_colab, from_glob_path, + read_arrow, read_csv, read_gbq, read_gbq_function, @@ -55,21 +55,6 @@ ) import bigframes.series import bigframes.session - - -def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame: - """Load a PyArrow Table to a BigQuery DataFrames DataFrame. - - Args: - pa_table (pyarrow.Table): - PyArrow table to load data from. - - Returns: - bigframes.dataframe.DataFrame: - A new DataFrame representing the data from the PyArrow table. - """ - session = global_session.get_global_session() - return session.read_arrow(pa_table=pa_table) import bigframes.session._io.bigquery import bigframes.session.clients import bigframes.version @@ -383,6 +368,7 @@ def reset_session(): merge, qcut, read_csv, + read_arrow, read_gbq, _read_gbq_colab, read_gbq_function, diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 608eaf5a82..2aebf59ccb 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -44,6 +44,7 @@ ReadPickleBuffer, StorageOptions, ) +import pyarrow as pa import bigframes._config as config import bigframes.core.global_session as global_session @@ -72,6 +73,21 @@ # method and its arguments. +def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + session = global_session.get_global_session() + return session.read_arrow(pa_table=pa_table) + + def read_csv( filepath_or_buffer: str | IO["bytes"], *, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 13db6823c1..dbea42b111 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -55,12 +55,14 @@ ReadPickleBuffer, StorageOptions, ) +import pyarrow as pa from bigframes import exceptions as bfe from bigframes import version import bigframes._config.bigquery_options as bigquery_options import bigframes.clients import bigframes.constants +import bigframes.core from bigframes.core import blocks, log_adapter, utils import bigframes.core.pyformat @@ -966,6 +968,22 @@ def _read_pandas_inline( local_block = blocks.Block.from_local(pandas_dataframe, self) return dataframe.DataFrame(local_block) + def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + import bigframes.dataframe as dataframe + + local_block = blocks.Block.from_pyarrow(pa_table, self) + return dataframe.DataFrame(local_block) + def read_csv( self, filepath_or_buffer: str | IO["bytes"], diff --git a/bigframes/session/_io/arrow.py b/bigframes/session/_io/arrow.py deleted file mode 100644 index ebd1964fcd..0000000000 --- a/bigframes/session/_io/arrow.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Private helpers for reading pyarrow objects.""" - -from __future__ import annotations - -import pyarrow as pa - -from bigframes import dataframe -import bigframes.core as core -import bigframes.dtypes -from bigframes.core import local_data, pyarrow_utils -import bigframes.core.blocks as blocks -import bigframes.core.guid -import bigframes.core.schema as schemata -import bigframes.session - - -def create_dataframe_from_arrow_table( - pa_table: pa.Table, *, session: bigframes.session.Session -) -> dataframe.DataFrame: - """Convert a PyArrow Table into a BigQuery DataFrames DataFrame. - - This DataFrame will wrap a LocalNode, meaning the data is processed locally. - - Args: - pa_table (pyarrow.Table): - The PyArrow Table to convert. - session (bigframes.session.Session): - The BigQuery DataFrames session to associate with the new DataFrame. - - Returns: - bigframes.dataframe.DataFrame: - A new DataFrame representing the data from the PyArrow table. - """ - # TODO(tswast): Use array_value.promote_offsets() instead once that node is - # supported by the local engine. - offsets_col = bigframes.core.guid.generate_guid() - # TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859): - # Allow users to specify the "total ordering" column(s) or allow multiple - # such columns. - pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) - - # We use the ManagedArrowTable constructor directly, because the - # results of to_arrow() should be the source of truth with regards - # to canonical formats since it comes from either the BQ Storage - # Read API or has been transformed by google-cloud-bigquery to look - # like the output of the BQ Storage Read API. - schema_items = [] - for field in pa_table.schema: - bf_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(field.type, allow_lossless_cast=True) - schema_items.append(schemata.SchemaItem(field.name, bf_dtype)) - bf_schema = schemata.ArraySchema(tuple(schema_items)) - - mat = local_data.ManagedArrowTable( - pa_table, - bf_schema, - ) - mat.validate() - - array_value = core.ArrayValue.from_managed(mat, session) - block = blocks.Block( - array_value, - (offsets_col,), - [field.name for field in pa_table.schema if field.name != offsets_col], - (None,), - ) - return dataframe.DataFrame(block) diff --git a/bigframes/session/_io/bigquery/read_gbq_query.py b/bigframes/session/_io/bigquery/read_gbq_query.py index 25c923ad96..70c83d7875 100644 --- a/bigframes/session/_io/bigquery/read_gbq_query.py +++ b/bigframes/session/_io/bigquery/read_gbq_query.py @@ -23,8 +23,12 @@ import pandas from bigframes import dataframe +from bigframes.core import local_data, pyarrow_utils +import bigframes.core as core +import bigframes.core.blocks as blocks +import bigframes.core.guid +import bigframes.core.schema as schemata import bigframes.session -from bigframes.session._io.arrow import create_dataframe_from_arrow_table def create_dataframe_from_query_job_stats( @@ -57,4 +61,30 @@ def create_dataframe_from_row_iterator( 'jobless' case where there's no destination table. """ pa_table = rows.to_arrow() - return create_dataframe_from_arrow_table(pa_table, session=session) + + # TODO(tswast): Use array_value.promote_offsets() instead once that node is + # supported by the local engine. + offsets_col = bigframes.core.guid.generate_guid() + pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) + + # We use the ManagedArrowTable constructor directly, because the + # results of to_arrow() should be the source of truth with regards + # to canonical formats since it comes from either the BQ Storage + # Read API or has been transformed by google-cloud-bigquery to look + # like the output of the BQ Storage Read API. + mat = local_data.ManagedArrowTable( + pa_table, + schemata.ArraySchema.from_bq_schema( + list(rows.schema) + [bigquery.SchemaField(offsets_col, "INTEGER")] + ), + ) + mat.validate() + + array_value = core.ArrayValue.from_managed(mat, session) + block = blocks.Block( + array_value, + (offsets_col,), + [field.name for field in rows.schema], + (None,), + ) + return dataframe.DataFrame(block) diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index b424149bc8..883087df07 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -21,12 +21,9 @@ # TODO: Non-ibis implementation import bigframes_vendored.ibis.backends.bigquery.datatypes as ibis_bq import google.cloud.bigquery as bigquery -import pyarrow as pa -import bigframes.dataframe from bigframes.core.compile import googlesql from bigframes.session import temporary_storage -from bigframes.session._io.arrow import create_dataframe_from_arrow_table KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0 @@ -145,19 +142,6 @@ def _keep_session_alive(self): except Exception as e: logging.warning("BigQuery session keep-alive query errored : %s", e) - def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame: - """Load a PyArrow Table to a BigQuery DataFrames DataFrame. - - Args: - pa_table (pyarrow.Table): - PyArrow table to load data from. - - Returns: - bigframes.dataframe.DataFrame: - A new DataFrame representing the data from the PyArrow table. - """ - return create_dataframe_from_arrow_table(pa_table, session=self) - class RecurringTaskDaemon: def __init__(self, task: Callable[[], None], frequency: datetime.timedelta): diff --git a/tests/system/small/test_arrow_io.py b/tests/system/small/test_arrow_io.py index e8e3eef072..d49445adba 100644 --- a/tests/system/small/test_arrow_io.py +++ b/tests/system/small/test_arrow_io.py @@ -20,140 +20,23 @@ import bigframes.pandas as bpd -@pytest.fixture(scope="session") -def arrow_all_types_table(): - # Using a dictionary to create the PyArrow table - data_dict = { - "int64_col": pa.array([1, 2, None, 4], type=pa.int64()), - "float64_col": pa.array([0.1, 0.2, None, 0.4], type=pa.float64()), - "bool_col": pa.array([True, False, None, True], type=pa.bool_()), - "string_col": pa.array(["apple", "banana", None, "cherry"], type=pa.string()), - "bytes_col": pa.array([b"one", b"two", None, b"four"], type=pa.large_binary()), # Using large_binary for BQ compatibility - "date_col": pa.array( - [datetime.date(2023, 1, 1), datetime.date(2023, 1, 2), None, datetime.date(2023, 1, 4)], - type=pa.date32(), - ), - "timestamp_s_col": pa.array( - [ - datetime.datetime.fromisoformat(ts) if ts else None - for ts in ["2023-01-01T00:00:00", "2023-01-02T12:34:56", None, "2023-01-04T23:59:59"] - ], - type=pa.timestamp("s", tz="UTC"), - ), - "timestamp_ms_col": pa.array( - [ - datetime.datetime.fromisoformat(ts) if ts else None - for ts in ["2023-01-01T00:00:00.123", "2023-01-02T12:34:56.456", None, "2023-01-04T23:59:59.789"] - ], - type=pa.timestamp("ms", tz="UTC"), - ), - "timestamp_us_col": pa.array( - [ - datetime.datetime.fromisoformat(ts) if ts else None - for ts in ["2023-01-01T00:00:00.123456", "2023-01-02T12:34:56.789012", None, "2023-01-04T23:59:59.345678"] - ], - type=pa.timestamp("us", tz="UTC"), - ), - # BigQuery doesn't directly support nanosecond precision timestamps, they are typically truncated or rounded to microsecond. - # "timestamp_ns_col": pa.array( - # [ - # datetime.datetime.fromisoformat(ts) if ts else None - # for ts in ["2023-01-01T00:00:00.123456789", "2023-01-02T12:34:56.890123456", None, "2023-01-04T23:59:59.456789012"] - # ], - # type=pa.timestamp("ns", tz="UTC"), - # ), - # TODO: Add more complex types if supported by the conversion logic - # "list_col": pa.array([[1, 2], None, [3, 4]], type=pa.list_(pa.int64())), - # "struct_col": pa.array([{"a": 1, "b": "x"}, None, {"a": 2, "b": "y"}], type=pa.struct([("a", pa.int64()), ("b", pa.string())])), - } - return pa.Table.from_pydict(data_dict) - - -def test_read_arrow_all_types_global_session(arrow_all_types_table): - # System test using the global session (or default session if set) - # bpd.options.bigquery.project = "your-gcp-project" # Ensure project is set if not using global default - # bpd.options.bigquery.location = "your-location" # Ensure location is set - - df = bpd.read_arrow(arrow_all_types_table) - - assert isinstance(df, bpd.DataFrame) - # Number of columns might change if more types are added - assert df.shape == (4, 9) - assert list(df.columns) == [ - "int64_col", - "float64_col", - "bool_col", - "string_col", - "bytes_col", - "date_col", - "timestamp_s_col", - "timestamp_ms_col", - "timestamp_us_col", - # "timestamp_ns_col", - ] - - # Fetching the data to pandas to verify - pd_df = df.to_pandas() - assert pd_df.shape == (4, 9) - - # Basic check of data integrity (first non-null value) - assert pd_df["int64_col"][0] == 1 - assert pd_df["float64_col"][0] == 0.1 - assert pd_df["bool_col"][0] is True - assert pd_df["string_col"][0] == "apple" - assert pd_df["bytes_col"][0] == b"one" - assert str(pd_df["date_col"][0]) == "2023-01-01" # Pandas converts to its own date/datetime objects - assert str(pd_df["timestamp_s_col"][0]) == "2023-01-01 00:00:00+00:00" - assert str(pd_df["timestamp_ms_col"][0]) == "2023-01-01 00:00:00.123000+00:00" - assert str(pd_df["timestamp_us_col"][0]) == "2023-01-01 00:00:00.123456+00:00" - # assert str(pd_df["timestamp_ns_col"][0]) == "2023-01-01 00:00:00.123456789+00:00" # BQ truncates to us - - # Check for None/NaT where PyArrow table had None - assert pd_df["int64_col"][2] is pandas.NA - assert pd_df["float64_col"][2] is pandas.NA - assert pd_df["bool_col"][2] is pandas.NA - assert pd_df["string_col"][2] is None # Pandas uses None for object types (like string) - assert pd_df["bytes_col"][2] is None - assert pd_df["date_col"][2] is pandas.NaT - assert pd_df["timestamp_s_col"][2] is pandas.NaT - assert pd_df["timestamp_ms_col"][2] is pandas.NaT - assert pd_df["timestamp_us_col"][2] is pandas.NaT - # assert pd_df["timestamp_ns_col"][2] is pandas.NaT - - -def test_read_arrow_empty_table_global_session(session): - empty_table = pa.Table.from_pydict({ - "col_a": pa.array([], type=pa.int64()), - "col_b": pa.array([], type=pa.string()) - }) - df = bpd.read_arrow(empty_table) +def test_read_arrow_empty_table(session): + empty_table = pa.Table.from_pydict( + { + "col_a": pa.array([], type=pa.int64()), + "col_b": pa.array([], type=pa.string()), + } + ) + df = session.read_arrow(empty_table) assert isinstance(df, bpd.DataFrame) assert df.shape == (0, 2) assert list(df.columns) == ["col_a", "col_b"] pd_df = df.to_pandas() assert pd_df.empty assert list(pd_df.columns) == ["col_a", "col_b"] - assert pd_df["col_a"].dtype == "int64" # Or Int64 if there were NAs - assert pd_df["col_b"].dtype == "object" - + assert pd_df["col_a"].dtype == "Int64" + assert pd_df["col_b"].dtype == "string[pyarrow]" -def test_read_arrow_specific_session(session, arrow_all_types_table): - # Create a new session to ensure it's used - # In a real test setup, you might create a session with specific configurations - specific_session = bpd.get_global_session() # or bpd.Session(...) - df = specific_session.read_arrow(arrow_all_types_table) - - assert isinstance(df, bpd.DataFrame) - assert df.shape == (4, 9) - pd_df = df.to_pandas() # Forces execution - assert pd_df.shape == (4, 9) - assert pd_df["int64_col"][0] == 1 # Basic data check - - # Ensure the dataframe is associated with the correct session if possible to check - # This might involve checking some internal property or behavior linked to the session - # For example, if temporary tables are created, they should use this session's context. - # For local data, this is harder to verify directly without inspecting internals. - assert df.session == specific_session @pytest.mark.parametrize( "data,arrow_type,expected_bq_type_kind", @@ -162,7 +45,7 @@ def test_read_arrow_specific_session(session, arrow_all_types_table): ([1, 2], pa.int16(), "INT64"), ([1, 2], pa.int32(), "INT64"), ([1, 2], pa.int64(), "INT64"), - ([1.0, 2.0], pa.float16(), "FLOAT64"), # BQ promotes half to float + ([1.0, 2.0], pa.float16(), "FLOAT64"), ([1.0, 2.0], pa.float32(), "FLOAT64"), ([1.0, 2.0], pa.float64(), "FLOAT64"), ([True, False], pa.bool_(), "BOOL"), @@ -170,16 +53,33 @@ def test_read_arrow_specific_session(session, arrow_all_types_table): (["a", "b"], pa.large_string(), "STRING"), ([b"a", b"b"], pa.binary(), "BYTES"), ([b"a", b"b"], pa.large_binary(), "BYTES"), - # Duration types are tricky, BQ doesn't have a direct equivalent, often converted to INT64 or STRING - # ([pa.scalar(1000, type=pa.duration('s')), pa.scalar(2000, type=pa.duration('s'))], pa.duration('s'), "INT64"), # Or error - ([datetime.date(2023,1,1)], pa.date32(), "DATE"), - ([datetime.date(2023,1,1)], pa.date64(), "DATE"), # BQ promotes date64 to date - ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("s", tz="UTC"), "TIMESTAMP"), - ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("ms", tz="UTC"), "TIMESTAMP"), - ([datetime.datetime(2023,1,1,12,0,0, tzinfo=datetime.timezone.utc)], pa.timestamp("us", tz="UTC"), "TIMESTAMP"), - # Time types also tricky, BQ has TIME type - # ([datetime.time(12,34,56)], pa.time32("s"), "TIME"), - # ([datetime.time(12,34,56,789000)], pa.time64("us"), "TIME"), + ( + [ + pa.scalar(1000, type=pa.duration("s")), + pa.scalar(2000, type=pa.duration("s")), + ], + pa.duration("s"), + "INT64", + ), + ([datetime.date(2023, 1, 1)], pa.date32(), "DATE"), + ([datetime.date(2023, 1, 1)], pa.date64(), "DATE"), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("s", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("ms", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("us", tz="UTC"), + "TIMESTAMP", + ), + ([datetime.time(12, 34, 56)], pa.time32("s"), "TIME"), + ([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"), ], ) def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind): @@ -190,11 +90,10 @@ def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_ki pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"]) df = session.read_arrow(pa_table) - # Access the underlying ibis schema to check BQ types - # This is an internal detail, but useful for this kind of test. - # Adjust if the way to get ibis schema changes. - ibis_schema = df._block.expr.schema - assert ibis_schema["col"].name.upper() == expected_bq_type_kind + col_id = df._block.label_to_col_id["col"] + bigquery_schema = df._block.expr.schema.to_bigquery() + field = [field for field in bigquery_schema if field.name == col_id][0] + assert field.field_type.upper() == expected_bq_type_kind # Also check pandas dtype after conversion for good measure pd_df = df.to_pandas() @@ -202,23 +101,31 @@ def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_ki # but this ensures it's processed. assert pd_df["col"].shape == (len(data),) -# TODO(developer): Add tests for more complex arrow types like lists, structs, maps -# if and when they are supported by create_dataframe_from_arrow_table. -# For example: -# def test_read_arrow_list_type(session): -# pa_table = pa.Table.from_arrays([pa.array([[1,2], [3,4,5]], type=pa.list_(pa.int64()))], names=['list_col']) -# df = session.read_arrow(pa_table) -# ibis_schema = df._block.expr.schema -# assert ibis_schema["list_col"].is_array() -# assert ibis_schema["list_col"].value_type.is_integer() -# # Further checks on data -# def test_read_arrow_struct_type(session): -# struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) -# pa_table = pa.Table.from_arrays([pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], names=['struct_col']) -# df = session.read_arrow(pa_table) -# ibis_schema = df._block.expr.schema -# assert ibis_schema["struct_col"].is_struct() -# assert ibis_schema["struct_col"].fields["a"].is_integer() -# assert ibis_schema["struct_col"].fields["b"].is_string() -# # Further checks on dataTool output for `create_file_with_block`: +def test_read_arrow_list_type(session): + pa_table = pa.Table.from_arrays( + [pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"] + ) + df = session.read_arrow(pa_table) + + col_id = df._block.label_to_col_id["list_col"] + bigquery_schema = df._block.expr.schema.to_bigquery() + field = [field for field in bigquery_schema if field.name == col_id][0] + assert field.mode.upper() == "REPEATED" + assert field.field_type.upper() == "INTEGER" + + +def test_read_arrow_struct_type(session): + struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) + pa_table = pa.Table.from_arrays( + [pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], + names=["struct_col"], + ) + df = session.read_arrow(pa_table) + + col_id = df._block.label_to_col_id["struct_col"] + bigquery_schema = df._block.expr.schema.to_bigquery() + field = [field for field in bigquery_schema if field.name == col_id][0] + assert field.field_type.upper() == "RECORD" + assert field.fields[0].name == "a" + assert field.fields[1].name == "b" From 3f141bdd9b2bdde2df58b8b4d61bb800e1aea53d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 26 Jun 2025 10:34:01 -0500 Subject: [PATCH 3/3] fix unit tests --- tests/system/small/test_arrow_io.py | 131 -------- .../test_compile_numerical_add/out.sql | 16 - .../out.sql | 16 - .../test_compile_string_add/out.sql | 16 - .../sqlglot/test_compile_scalar_expr.py | 37 --- tests/unit/session/test_io_arrow.py | 290 ++++++------------ 6 files changed, 91 insertions(+), 415 deletions(-) delete mode 100644 tests/system/small/test_arrow_io.py delete mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql delete mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql delete mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql delete mode 100644 tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py diff --git a/tests/system/small/test_arrow_io.py b/tests/system/small/test_arrow_io.py deleted file mode 100644 index d49445adba..0000000000 --- a/tests/system/small/test_arrow_io.py +++ /dev/null @@ -1,131 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime - -import pyarrow as pa -import pytest - -import bigframes.pandas as bpd - - -def test_read_arrow_empty_table(session): - empty_table = pa.Table.from_pydict( - { - "col_a": pa.array([], type=pa.int64()), - "col_b": pa.array([], type=pa.string()), - } - ) - df = session.read_arrow(empty_table) - assert isinstance(df, bpd.DataFrame) - assert df.shape == (0, 2) - assert list(df.columns) == ["col_a", "col_b"] - pd_df = df.to_pandas() - assert pd_df.empty - assert list(pd_df.columns) == ["col_a", "col_b"] - assert pd_df["col_a"].dtype == "Int64" - assert pd_df["col_b"].dtype == "string[pyarrow]" - - -@pytest.mark.parametrize( - "data,arrow_type,expected_bq_type_kind", - [ - ([1, 2], pa.int8(), "INT64"), - ([1, 2], pa.int16(), "INT64"), - ([1, 2], pa.int32(), "INT64"), - ([1, 2], pa.int64(), "INT64"), - ([1.0, 2.0], pa.float16(), "FLOAT64"), - ([1.0, 2.0], pa.float32(), "FLOAT64"), - ([1.0, 2.0], pa.float64(), "FLOAT64"), - ([True, False], pa.bool_(), "BOOL"), - (["a", "b"], pa.string(), "STRING"), - (["a", "b"], pa.large_string(), "STRING"), - ([b"a", b"b"], pa.binary(), "BYTES"), - ([b"a", b"b"], pa.large_binary(), "BYTES"), - ( - [ - pa.scalar(1000, type=pa.duration("s")), - pa.scalar(2000, type=pa.duration("s")), - ], - pa.duration("s"), - "INT64", - ), - ([datetime.date(2023, 1, 1)], pa.date32(), "DATE"), - ([datetime.date(2023, 1, 1)], pa.date64(), "DATE"), - ( - [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], - pa.timestamp("s", tz="UTC"), - "TIMESTAMP", - ), - ( - [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], - pa.timestamp("ms", tz="UTC"), - "TIMESTAMP", - ), - ( - [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], - pa.timestamp("us", tz="UTC"), - "TIMESTAMP", - ), - ([datetime.time(12, 34, 56)], pa.time32("s"), "TIME"), - ([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"), - ], -) -def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind): - """ - Tests that various arrow types are mapped to the expected BigQuery types. - This is an indirect check via the resulting DataFrame's schema. - """ - pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"]) - df = session.read_arrow(pa_table) - - col_id = df._block.label_to_col_id["col"] - bigquery_schema = df._block.expr.schema.to_bigquery() - field = [field for field in bigquery_schema if field.name == col_id][0] - assert field.field_type.upper() == expected_bq_type_kind - - # Also check pandas dtype after conversion for good measure - pd_df = df.to_pandas() - # The pandas dtype can be more complex (e.g., ArrowDtype, nullable dtypes) - # but this ensures it's processed. - assert pd_df["col"].shape == (len(data),) - - -def test_read_arrow_list_type(session): - pa_table = pa.Table.from_arrays( - [pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"] - ) - df = session.read_arrow(pa_table) - - col_id = df._block.label_to_col_id["list_col"] - bigquery_schema = df._block.expr.schema.to_bigquery() - field = [field for field in bigquery_schema if field.name == col_id][0] - assert field.mode.upper() == "REPEATED" - assert field.field_type.upper() == "INTEGER" - - -def test_read_arrow_struct_type(session): - struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) - pa_table = pa.Table.from_arrays( - [pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], - names=["struct_col"], - ) - df = session.read_arrow(pa_table) - - col_id = df._block.label_to_col_id["struct_col"] - bigquery_schema = df._block.expr.schema.to_bigquery() - field = [field for field in bigquery_schema if field.name == col_id][0] - assert field.field_type.upper() == "RECORD" - assert field.fields[0].name == "a" - assert field.fields[1].name == "b" diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql deleted file mode 100644 index 1496f89f28..0000000000 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add/out.sql +++ /dev/null @@ -1,16 +0,0 @@ -WITH `bfcte_0` AS ( - SELECT - `int64_col` AS `bfcol_0`, - `rowindex` AS `bfcol_1` - FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` -), `bfcte_1` AS ( - SELECT - *, - `bfcol_1` AS `bfcol_4`, - `bfcol_0` + `bfcol_0` AS `bfcol_5` - FROM `bfcte_0` -) -SELECT - `bfcol_4` AS `rowindex`, - `bfcol_5` AS `int64_col` -FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql deleted file mode 100644 index 9c4b01a6df..0000000000 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_numerical_add_w_scalar/out.sql +++ /dev/null @@ -1,16 +0,0 @@ -WITH `bfcte_0` AS ( - SELECT - `int64_col` AS `bfcol_0`, - `rowindex` AS `bfcol_1` - FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` -), `bfcte_1` AS ( - SELECT - *, - `bfcol_1` AS `bfcol_4`, - `bfcol_0` + 1 AS `bfcol_5` - FROM `bfcte_0` -) -SELECT - `bfcol_4` AS `rowindex`, - `bfcol_5` AS `int64_col` -FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql deleted file mode 100644 index 7a8ab83df1..0000000000 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_scalar_expr/test_compile_string_add/out.sql +++ /dev/null @@ -1,16 +0,0 @@ -WITH `bfcte_0` AS ( - SELECT - `rowindex` AS `bfcol_0`, - `string_col` AS `bfcol_1` - FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` -), `bfcte_1` AS ( - SELECT - *, - `bfcol_0` AS `bfcol_4`, - CONCAT(`bfcol_1`, 'a') AS `bfcol_5` - FROM `bfcte_0` -) -SELECT - `bfcol_4` AS `rowindex`, - `bfcol_5` AS `string_col` -FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py b/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py deleted file mode 100644 index 862ee2467c..0000000000 --- a/tests/unit/core/compile/sqlglot/test_compile_scalar_expr.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -import bigframes.pandas as bpd - -pytest.importorskip("pytest_snapshot") - - -def test_compile_numerical_add(scalars_types_df: bpd.DataFrame, snapshot): - bf_df = scalars_types_df[["int64_col"]] - bf_df["int64_col"] = bf_df["int64_col"] + bf_df["int64_col"] - snapshot.assert_match(bf_df.sql, "out.sql") - - -def test_compile_numerical_add_w_scalar(scalars_types_df: bpd.DataFrame, snapshot): - bf_df = scalars_types_df[["int64_col"]] - bf_df["int64_col"] = bf_df["int64_col"] + 1 - snapshot.assert_match(bf_df.sql, "out.sql") - - -def test_compile_string_add(scalars_types_df: bpd.DataFrame, snapshot): - bf_df = scalars_types_df[["string_col"]] - bf_df["string_col"] = bf_df["string_col"] + "a" - snapshot.assert_match(bf_df.sql, "out.sql") diff --git a/tests/unit/session/test_io_arrow.py b/tests/unit/session/test_io_arrow.py index c95b9121d3..d5266220d9 100644 --- a/tests/unit/session/test_io_arrow.py +++ b/tests/unit/session/test_io_arrow.py @@ -18,7 +18,6 @@ import pytest import bigframes.pandas as bpd -from bigframes.session._io.arrow import create_dataframe_from_arrow_table from bigframes.testing import mocks @@ -28,214 +27,107 @@ def session(): return mocks.create_bigquery_session() -def test_create_dataframe_from_arrow_table(session): - pa_table = pa.Table.from_pydict( +def test_read_arrow_empty_table(session): + empty_table = pa.Table.from_pydict( { - "col1": [1, 2, 3], - "col2": ["a", "b", "c"], + "col_a": pa.array([], type=pa.int64()), + "col_b": pa.array([], type=pa.string()), } ) - # The mock session might need specific setup for to_pandas if it hits BQ client - # For now, let's assume the LocalNode execution path for to_pandas is self-contained enough - # or that the default mock bqclient.query_and_wait is sufficient. - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert len(df.columns) == 2 - # The default mock session's query_and_wait returns an empty table by default for non-special queries. - # We need to mock the specific table result for to_pandas to work as expected for LocalDataNode. - # This is a bit of a deeper mock interaction. - # A simpler unit test might avoid to_pandas() and check Block properties. - # However, if create_bigquery_session's mocks are sufficient, this might pass. - # For LocalDataNode, to_pandas eventually calls executor.execute(block.expr) - # which for the default mock session might not return the actual data. - # Let's adjust how the mock session's query_and_wait works for this specific table. - - # For local data, to_pandas will use the local data directly if possible, - # avoiding BQ calls. Let's see if the current mocks are enough. - pd_df = df.to_pandas() - assert pd_df.shape == (3,2) - assert list(df.columns) == ["col1", "col2"] - assert pd_df["col1"].dtype == "int64" - assert pd_df["col2"].dtype == "object" - - -def test_create_dataframe_from_arrow_table_empty(session): - pa_table = pa.Table.from_pydict( - { - "col1": pa.array([], type=pa.int64()), - "col2": pa.array([], type=pa.string()), - } - ) - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert len(df.columns) == 2 - pd_df = df.to_pandas() - assert pd_df.shape == (0,2) - assert list(df.columns) == ["col1", "col2"] - assert pd_df["col1"].dtype == "int64" - assert pd_df["col2"].dtype == "object" - - -def test_create_dataframe_from_arrow_table_all_types(session): - pa_table = pa.Table.from_pydict( - { - "int_col": [1, None, 3], - "float_col": [1.0, None, 3.0], - "bool_col": [True, None, False], - "string_col": ["a", None, "c"], - "bytes_col": [b"a", None, b"c"], - "date_col": pa.array([datetime.date(2023, 1, 1), None, datetime.date(2023, 1, 3)], type=pa.date32()), - "timestamp_col": pa.array([datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), None, datetime.datetime(2023, 1, 3, 12, 0, 0, tzinfo=datetime.timezone.utc)], type=pa.timestamp("us", tz="UTC")), - } - ) - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert len(df.columns) == 7 - - # For LocalDataNode, head() should work locally. - df_head = df.head(5) - assert len(df_head) == 3 - - pd_df = df.to_pandas() - assert pd_df["int_col"].dtype == "Int64" - assert pd_df["float_col"].dtype == "float64" - assert pd_df["bool_col"].dtype == "boolean" - assert pd_df["string_col"].dtype == "object" - assert pd_df["bytes_col"].dtype == "object" - assert pd_df["date_col"].dtype.name.startswith("date32") - assert pd_df["timestamp_col"].dtype.name.startswith("timestamp[us, tz=UTC]") - -@pytest.fixture(scope="module") # Changed to module as it's shared and doesn't change -def arrow_sample_data(): - return pa.Table.from_pydict( - { - "id": [1, 2, 3, 4, 5], - "name": ["foo", "bar", "baz", "qux", "quux"], - "value": [10.0, 20.0, 30.0, 40.0, 50.0], - } - ) - -def test_session_read_arrow(session, arrow_sample_data): - session._set_arrow_table_for_shape(arrow_sample_data) - df = session.read_arrow(arrow_sample_data) + df = session.read_arrow(empty_table) assert isinstance(df, bpd.DataFrame) - # assert df.shape == (5, 3) - assert list(df.columns) == ["id", "name", "value"] + assert df.shape == (0, 2) + assert list(df.columns) == ["col_a", "col_b"] pd_df = df.to_pandas() - assert pd_df["id"].tolist() == [1,2,3,4,5] - assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] - -def test_pandas_read_arrow(arrow_sample_data, mocker): - # This test uses the global session, so we mock what get_global_session returns - mock_session_instance = session() # Get an instance of our MockSession - mock_session_instance._set_arrow_table_for_shape(arrow_sample_data) - mocker.patch("bigframes.pandas.global_session.get_global_session", return_value=mock_session_instance) - - df = bpd.read_arrow(arrow_sample_data) - assert isinstance(df, bpd.DataFrame) - # assert df.shape == (5, 3) - assert list(df.columns) == ["id", "name", "value"] + assert pd_df.empty + assert list(pd_df.columns) == ["col_a", "col_b"] + assert pd_df["col_a"].dtype == "Int64" + assert pd_df["col_b"].dtype == "string[pyarrow]" + + +@pytest.mark.parametrize( + "data,arrow_type,expected_bq_type_kind", + [ + ([1, 2], pa.int8(), "INTEGER"), + ([1, 2], pa.int16(), "INTEGER"), + ([1, 2], pa.int32(), "INTEGER"), + ([1, 2], pa.int64(), "INTEGER"), + ([1.0, 2.0], pa.float32(), "FLOAT"), + ([1.0, 2.0], pa.float64(), "FLOAT"), + ([True, False], pa.bool_(), "BOOLEAN"), + (["a", "b"], pa.string(), "STRING"), + (["a", "b"], pa.large_string(), "STRING"), + ([b"a", b"b"], pa.binary(), "BYTES"), + ([b"a", b"b"], pa.large_binary(), "BYTES"), + ( + [ + pa.scalar(1000, type=pa.duration("s")), + pa.scalar(2000, type=pa.duration("s")), + ], + pa.duration("s"), + "INTEGER", + ), + ([datetime.date(2023, 1, 1)], pa.date32(), "DATE"), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("s", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("ms", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("us", tz="UTC"), + "TIMESTAMP", + ), + ([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"), + ], +) +def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind): + """ + Tests that various arrow types are mapped to the expected BigQuery types. + This is an indirect check via the resulting DataFrame's schema. + """ + pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"]) + df = session.read_arrow(pa_table) + + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.field_type.upper() == expected_bq_type_kind + + # Also check pandas dtype after conversion for good measure pd_df = df.to_pandas() + assert pd_df["col"].shape == (len(data),) -def test_create_dataframe_from_arrow_table(session): - pa_table = pa.Table.from_pydict( - { - "col1": [1, 2, 3], - "col2": ["a", "b", "c"], - } +def test_read_arrow_list_type(session): + pa_table = pa.Table.from_arrays( + [pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"] ) - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert df.shape == (3, 2) - assert list(df.columns) == ["col1", "col2"] - # Verify dtypes by converting to pandas - more direct checks might be needed depending on internal structure - pd_df = df.to_pandas() - assert pd_df["col1"].dtype == "int64" - assert pd_df["col2"].dtype == "object" # StringDtype might be more accurate depending on session config + df = session.read_arrow(pa_table) - -def test_create_dataframe_from_arrow_table_empty(session): - pa_table = pa.Table.from_pydict( - { - "col1": pa.array([], type=pa.int64()), - "col2": pa.array([], type=pa.string()), - } - ) - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert df.shape == (0, 2) - assert list(df.columns) == ["col1", "col2"] - pd_df = df.to_pandas() - assert pd_df["col1"].dtype == "int64" - assert pd_df["col2"].dtype == "object" + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.mode.upper() == "REPEATED" + assert field.field_type.upper() == "INTEGER" -def test_create_dataframe_from_arrow_table_all_types(session): - pa_table = pa.Table.from_pydict( - { - "int_col": [1, None, 3], - "float_col": [1.0, None, 3.0], - "bool_col": [True, None, False], - "string_col": ["a", None, "c"], - "bytes_col": [b"a", None, b"c"], - "date_col": pa.array([datetime.date(2023, 1, 1), None, datetime.date(2023, 1, 3)], type=pa.date32()), - "timestamp_col": pa.array([datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), None, datetime.datetime(2023, 1, 3, 12, 0, 0, tzinfo=datetime.timezone.utc)], type=pa.timestamp("us", tz="UTC")), - } +def test_read_arrow_struct_type(session): + struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) + pa_table = pa.Table.from_arrays( + [pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], + names=["struct_col"], ) - df = create_dataframe_from_arrow_table(pa_table, session=session) - assert df.shape == (3, 7) - # This will execute the query and fetch results, good for basic validation - df_head = df.head(5) - assert len(df_head) == 3 - - # More detailed dtype checks might require looking at the bq schema or ibis schema - # For now, we rely on to_pandas() for a basic check - pd_df = df.to_pandas() - assert pd_df["int_col"].dtype == "Int64" # Pandas uses nullable Int64Dtype - assert pd_df["float_col"].dtype == "float64" - assert pd_df["bool_col"].dtype == "boolean" # Pandas uses nullable BooleanDtype - assert pd_df["string_col"].dtype == "object" # Or StringDtype - assert pd_df["bytes_col"].dtype == "object" # Or ArrowDtype(pa.binary()) - assert pd_df["date_col"].dtype.name.startswith("date32") # ArrowDtype(pa.date32()) - assert pd_df["timestamp_col"].dtype.name.startswith("timestamp[us, tz=UTC]") # ArrowDtype(pa.timestamp('us', tz='UTC')) - -@pytest.fixture(scope="session") -def arrow_sample_data(): - return pa.Table.from_pydict( - { - "id": [1, 2, 3, 4, 5], - "name": ["foo", "bar", "baz", "qux", "quux"], - "value": [10.0, 20.0, 30.0, 40.0, 50.0], - } - ) - -def test_session_read_arrow(session, arrow_sample_data): - df = session.read_arrow(arrow_sample_data) - assert isinstance(df, bpd.DataFrame) - assert df.shape == (5, 3) - assert list(df.columns) == ["id", "name", "value"] - pd_df = df.to_pandas() - assert pd_df["id"].tolist() == [1,2,3,4,5] - assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] - - def test_pandas_read_arrow(arrow_sample_data, mocker): - # This test uses the global session, so we mock what get_global_session returns - mock_session_instance = session() # Get an instance of our MockSession - mocker.patch("bigframes.pandas.global_session.get_global_session", return_value=mock_session_instance) - - df = bpd.read_arrow(arrow_sample_data) - assert isinstance(df, bpd.DataFrame) - assert df.shape == (5, 3) - assert list(df.columns) == ["id", "name", "value"] - pd_df = df.to_pandas() - assert pd_df["id"].tolist() == [1,2,3,4,5] - assert pd_df["name"].tolist() == ["foo", "bar", "baz", "qux", "quux"] - -def test_read_arrow_with_index_col_error(session, arrow_sample_data): - # read_arrow doesn't support index_col, ensure it's not accidentally passed or used - # For now, there's no index_col parameter. If added, this test would need adjustment. - # This test is more of a placeholder to remember this aspect. - # If create_dataframe_from_arrow_table were to accept index_col, this would be relevant. - with pytest.raises(TypeError) as excinfo: - session.read_arrow(arrow_sample_data, index_col="id") # type: ignore - assert "got an unexpected keyword argument 'index_col'" in str(excinfo.value) - - with pytest.raises(TypeError) as excinfo: - bpd.read_arrow(arrow_sample_data, index_col="id") # type: ignore - assert "got an unexpected keyword argument 'index_col'" in str(excinfo.value) + df = session.read_arrow(pa_table) + + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.field_type.upper() == "RECORD" + assert field.fields[0].name == "a" + assert field.fields[1].name == "b"