From 08f516c557c527b0bdac35d3bc116531d1b530a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Fri, 17 Oct 2025 16:14:41 +0000 Subject: [PATCH] chore: create `DF._to_pandas_batches()` for better type checking of PandasBatches` --- bigframes/core/blocks.py | 2 +- bigframes/dataframe.py | 13 +++++++++++++ bigframes/display/anywidget.py | 9 ++++++--- tests/benchmark/read_gbq_colab/aggregate_output.py | 10 ++++++---- tests/benchmark/read_gbq_colab/filter_output.py | 12 +++++++----- tests/benchmark/read_gbq_colab/first_page.py | 5 +++-- tests/benchmark/read_gbq_colab/last_page.py | 5 +++-- tests/benchmark/read_gbq_colab/sort_output.py | 10 ++++++---- 8 files changed, 45 insertions(+), 21 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9896784bb..166841dfbd 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -693,7 +693,7 @@ def to_pandas_batches( page_size: Optional[int] = None, max_results: Optional[int] = None, allow_large_results: Optional[bool] = None, - ) -> Iterator[pd.DataFrame]: + ) -> PandasBatches: """Download results one message at a time. page_size and max_results determine the size and number of batches, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ec458cc462..be41ec9e99 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1930,6 +1930,19 @@ def to_pandas_batches( form the original dataframe. Results stream from bigquery, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable """ + return self._to_pandas_batches( + page_size=page_size, + max_results=max_results, + allow_large_results=allow_large_results, + ) + + def _to_pandas_batches( + self, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + *, + allow_large_results: Optional[bool] = None, + ) -> blocks.PandasBatches: return self._block.to_pandas_batches( page_size=page_size, max_results=max_results, diff --git a/bigframes/display/anywidget.py b/bigframes/display/anywidget.py index 5a20ddcb7f..3d12a2032c 100644 --- a/bigframes/display/anywidget.py +++ b/bigframes/display/anywidget.py @@ -23,6 +23,7 @@ import pandas as pd import bigframes +import bigframes.dataframe import bigframes.display.html # anywidget and traitlets are optional dependencies. We don't want the import of this @@ -73,7 +74,7 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): initial_page_size = bigframes.options.display.max_rows # Initialize data fetching attributes. - self._batches = dataframe.to_pandas_batches(page_size=initial_page_size) + self._batches = dataframe._to_pandas_batches(page_size=initial_page_size) # set traitlets properties that trigger observers self.page_size = initial_page_size @@ -82,7 +83,9 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): # SELECT COUNT(*) query. It is a must have however. # TODO(b/428238610): Start iterating over the result of `to_pandas_batches()` # before we get here so that the count might already be cached. - self.row_count = len(dataframe) + # TODO(b/452747934): Allow row_count to be None and check to see if + # there are multiple pages and show "page 1 of many" in this case. + self.row_count = self._batches.total_rows or 0 # get the initial page self._set_table_html() @@ -180,7 +183,7 @@ def _cached_data(self) -> pd.DataFrame: def _reset_batches_for_new_page_size(self): """Reset the batch iterator when page size changes.""" - self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size) + self._batches = self._dataframe._to_pandas_batches(page_size=self.page_size) self._cached_batches = [] self._batch_iter = None self._all_data_loaded = False diff --git a/tests/benchmark/read_gbq_colab/aggregate_output.py b/tests/benchmark/read_gbq_colab/aggregate_output.py index cd33ed2640..e5620d8e16 100644 --- a/tests/benchmark/read_gbq_colab/aggregate_output.py +++ b/tests/benchmark/read_gbq_colab/aggregate_output.py @@ -26,8 +26,9 @@ def aggregate_output(*, project_id, dataset_id, table_id): df = bpd._read_gbq_colab(f"SELECT * FROM `{project_id}`.{dataset_id}.{table_id}") # Simulate getting the first page, since we'll always do that first in the UI. - df.shape - next(iter(df.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) # To simulate very small rows that can only fit a boolean, # some tables don't have an integer column. If an integer column is available, @@ -43,8 +44,9 @@ def aggregate_output(*, project_id, dataset_id, table_id): .sum(numeric_only=True) ) - df_aggregated.shape - next(iter(df_aggregated.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df_aggregated._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) if __name__ == "__main__": diff --git a/tests/benchmark/read_gbq_colab/filter_output.py b/tests/benchmark/read_gbq_colab/filter_output.py index b3c9181770..dc88d31366 100644 --- a/tests/benchmark/read_gbq_colab/filter_output.py +++ b/tests/benchmark/read_gbq_colab/filter_output.py @@ -31,17 +31,19 @@ def filter_output( df = bpd._read_gbq_colab(f"SELECT * FROM `{project_id}`.{dataset_id}.{table_id}") # Simulate getting the first page, since we'll always do that first in the UI. - df.shape - next(iter(df.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) # Simulate the user filtering by a column and visualizing those results df_filtered = df[df["col_bool_0"]] - rows, _ = df_filtered.shape + batches = df_filtered._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + first_page = next(iter(batches)) # It's possible we don't have any pages at all, since we filtered out all # matching rows. - first_page = next(iter(df_filtered.to_pandas_batches(page_size=PAGE_SIZE))) - assert len(first_page.index) <= rows + assert len(first_page.index) <= tr if __name__ == "__main__": diff --git a/tests/benchmark/read_gbq_colab/first_page.py b/tests/benchmark/read_gbq_colab/first_page.py index 7f8cdb0d51..33e2a24bd7 100644 --- a/tests/benchmark/read_gbq_colab/first_page.py +++ b/tests/benchmark/read_gbq_colab/first_page.py @@ -28,8 +28,9 @@ def first_page(*, project_id, dataset_id, table_id): ) # Get number of rows (to calculate number of pages) and the first page. - df.shape - next(iter(df.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) if __name__ == "__main__": diff --git a/tests/benchmark/read_gbq_colab/last_page.py b/tests/benchmark/read_gbq_colab/last_page.py index 7786e2f8bd..2e485a070a 100644 --- a/tests/benchmark/read_gbq_colab/last_page.py +++ b/tests/benchmark/read_gbq_colab/last_page.py @@ -28,8 +28,9 @@ def last_page(*, project_id, dataset_id, table_id): ) # Get number of rows (to calculate number of pages) and then all pages. - df.shape - for _ in df.to_pandas_batches(page_size=PAGE_SIZE): + batches = df._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + for _ in batches: pass diff --git a/tests/benchmark/read_gbq_colab/sort_output.py b/tests/benchmark/read_gbq_colab/sort_output.py index 7933c4472e..3044e0c2a3 100644 --- a/tests/benchmark/read_gbq_colab/sort_output.py +++ b/tests/benchmark/read_gbq_colab/sort_output.py @@ -28,8 +28,9 @@ def sort_output(*, project_id, dataset_id, table_id): ) # Simulate getting the first page, since we'll always do that first in the UI. - df.shape - next(iter(df.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) # Simulate the user sorting by a column and visualizing those results sort_column = "col_int64_1" @@ -37,8 +38,9 @@ def sort_output(*, project_id, dataset_id, table_id): sort_column = "col_bool_0" df_sorted = df.sort_values(sort_column) - df_sorted.shape - next(iter(df_sorted.to_pandas_batches(page_size=PAGE_SIZE))) + batches = df_sorted._to_pandas_batches(page_size=PAGE_SIZE) + assert (tr := batches.total_rows) is not None and tr >= 0 + next(iter(batches)) if __name__ == "__main__":