From b3c7ae988eb184b5f5f2e28cb34cb91401ccff4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 4 Sep 2025 19:18:46 +0000 Subject: [PATCH] chore: add total_bytes_processed to PandasBatches Fixes internal issue b/443094245 --- bigframes/core/blocks.py | 15 +++++++++++++-- bigframes/session/bq_caching_executor.py | 6 +++++- bigframes/session/direct_gbq_execution.py | 1 + bigframes/session/executor.py | 1 + tests/system/large/test_dataframe_io.py | 9 +++++---- tests/system/small/session/test_read_gbq_colab.py | 3 +++ tests/system/small/test_dataframe_io.py | 7 +++++++ 7 files changed, 35 insertions(+), 7 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 07d7e4c45b..597eedcf27 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -102,15 +102,24 @@ class PandasBatches(Iterator[pd.DataFrame]): """Interface for mutable objects with state represented by a block value object.""" def __init__( - self, pandas_batches: Iterator[pd.DataFrame], total_rows: Optional[int] = 0 + self, + pandas_batches: Iterator[pd.DataFrame], + total_rows: Optional[int] = 0, + *, + total_bytes_processed: Optional[int] = 0, ): self._dataframes: Iterator[pd.DataFrame] = pandas_batches self._total_rows: Optional[int] = total_rows + self._total_bytes_processed: Optional[int] = total_bytes_processed @property def total_rows(self) -> Optional[int]: return self._total_rows + @property + def total_bytes_processed(self) -> Optional[int]: + return self._total_bytes_processed + def __next__(self) -> pd.DataFrame: return next(self._dataframes) @@ -721,7 +730,9 @@ def to_pandas_batches( if (total_rows is not None) and (max_results is not None): total_rows = min(total_rows, max_results) - return PandasBatches(dfs, total_rows) + return PandasBatches( + dfs, total_rows, total_bytes_processed=execute_result.total_bytes_processed + ) def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame: """Set the index on pandas DataFrame to match this block.""" diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index b428cd646c..b7412346bd 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -323,7 +323,10 @@ def _export_gbq( self.bqclient.update_table(table, ["schema"]) return executor.ExecuteResult( - row_iter.to_arrow_iterable(), array_value.schema, query_job + row_iter.to_arrow_iterable(), + array_value.schema, + query_job, + total_bytes_processed=row_iter.total_bytes_processed, ) def dry_run( @@ -671,6 +674,7 @@ def _execute_plan_gbq( query_job=query_job, total_bytes=size_bytes, total_rows=iterator.total_rows, + total_bytes_processed=iterator.total_bytes_processed, ) diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py index ff91747a62..7538c9300f 100644 --- a/bigframes/session/direct_gbq_execution.py +++ b/bigframes/session/direct_gbq_execution.py @@ -63,6 +63,7 @@ def execute( schema=plan.schema, query_job=query_job, total_rows=iterator.total_rows, + total_bytes_processed=iterator.total_bytes_processed, ) def _run_execute_query( diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 748b10647a..d0cfe5f4f7 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -45,6 +45,7 @@ class ExecuteResult: query_job: Optional[bigquery.QueryJob] = None total_bytes: Optional[int] = None total_rows: Optional[int] = None + total_bytes_processed: Optional[int] = None @property def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]: diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py index 87d2acd34b..c60940109d 100644 --- a/tests/system/large/test_dataframe_io.py +++ b/tests/system/large/test_dataframe_io.py @@ -48,11 +48,12 @@ def test_to_pandas_batches_override_global_option( ): with bigframes.option_context(LARGE_TABLE_OPTION, False): df = session.read_gbq(WIKIPEDIA_TABLE) - pages = list( - df.to_pandas_batches( - page_size=500, max_results=1500, allow_large_results=True - ) + batches = df.sort_values("id").to_pandas_batches( + page_size=500, max_results=1500, allow_large_results=True ) + assert batches.total_rows > 0 + assert batches.total_bytes_processed > 0 + pages = list(batches) assert all((len(page) <= 500) for page in pages) assert sum(len(page) for page in pages) == 1500 diff --git a/tests/system/small/session/test_read_gbq_colab.py b/tests/system/small/session/test_read_gbq_colab.py index 9ace2dbed7..6d3cf6fe88 100644 --- a/tests/system/small/session/test_read_gbq_colab.py +++ b/tests/system/small/session/test_read_gbq_colab.py @@ -48,6 +48,9 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_sessi batches = df.to_pandas_batches( page_size=100, ) + assert batches.total_rows > 0 + assert batches.total_bytes_processed is None # No additional query. + executions_after = maybe_ordered_session._metrics.execution_count num_batches = 0 diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 1d6ae370c5..96d7881d67 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -339,6 +339,13 @@ def test_to_arrow_override_global_option(scalars_df_index): assert scalars_df_index._query_job.destination.table_id == table_id +def test_to_pandas_batches_populates_total_bytes_processed(scalars_df_default_index): + batches = scalars_df_default_index.sort_values( + "int64_col" + ).to_pandas_batches() # Do a sort to force query execution. + assert batches.total_bytes_processed > 0 + + def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): """Verify to_pandas_batches() APIs returns the expected dtypes.""" expected = scalars_df_default_index.dtypes