From ba90746da8fb7dd488b592eefb426f7273ed1b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Wed, 30 Jul 2025 13:01:44 -0500 Subject: [PATCH] chore: always write all metrics --- bigframes/pandas/io/api.py | 7 ++ bigframes/session/metrics.py | 78 ++++++++++++------- .../read_gbq_colab/aggregate_output.py | 3 +- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 5ec3626c7a..a88cc7a011 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -16,6 +16,7 @@ import functools import inspect +import os import threading import typing from typing import ( @@ -56,6 +57,7 @@ from bigframes.session import dry_runs import bigframes.session._io.bigquery import bigframes.session.clients +import bigframes.session.metrics # Note: the following methods are duplicated from Session. This duplication # enables the following: @@ -625,6 +627,11 @@ def _get_bqclient() -> bigquery.Client: def _dry_run(query, bqclient) -> bigquery.QueryJob: job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True)) + + # Fix for b/435183833. Log metrics even if a Session isn't available. + if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ: + metrics = bigframes.session.metrics.ExecutionMetrics() + metrics.count_job_stats(job) return job diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 75f247b028..36e48ee9ec 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -40,32 +40,54 @@ def count_job_stats( ): if query_job is None: assert row_iterator is not None - total_bytes_processed = getattr(row_iterator, "total_bytes_processed", None) - query = getattr(row_iterator, "query", None) - if total_bytes_processed is None or query is None: - return + + # TODO(tswast): Pass None after making benchmark publishing robust to missing data. + bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) + query_char_count = len(getattr(row_iterator, "query", "")) + slot_millis = getattr(row_iterator, "slot_millis", 0) + exec_seconds = 0.0 self.execution_count += 1 - self.query_char_count += len(query) - self.bytes_processed += total_bytes_processed - write_stats_to_disk(len(query), total_bytes_processed) - return + self.query_char_count += query_char_count + self.bytes_processed += bytes_processed + self.slot_millis += slot_millis + + elif query_job.configuration.dry_run: + query_char_count = len(query_job.query) - if query_job.configuration.dry_run: - write_stats_to_disk(len(query_job.query), 0, 0, 0) + # TODO(tswast): Pass None after making benchmark publishing robust to missing data. + bytes_processed = 0 + slot_millis = 0 + exec_seconds = 0.0 - stats = get_performance_stats(query_job) - if stats is not None: - query_char_count, bytes_processed, slot_millis, execution_secs = stats + elif (stats := get_performance_stats(query_job)) is not None: + query_char_count, bytes_processed, slot_millis, exec_seconds = stats self.execution_count += 1 self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis - self.execution_secs += execution_secs + self.execution_secs += exec_seconds write_stats_to_disk( - query_char_count, bytes_processed, slot_millis, execution_secs + query_char_count=query_char_count, + bytes_processed=bytes_processed, + slot_millis=slot_millis, + exec_seconds=exec_seconds, ) + else: + # TODO(tswast): Pass None after making benchmark publishing robust to missing data. + bytes_processed = 0 + query_char_count = 0 + slot_millis = 0 + exec_seconds = 0 + + write_stats_to_disk( + query_char_count=query_char_count, + bytes_processed=bytes_processed, + slot_millis=slot_millis, + exec_seconds=exec_seconds, + ) + def get_performance_stats( query_job: bigquery.QueryJob, @@ -103,10 +125,11 @@ def get_performance_stats( def write_stats_to_disk( + *, query_char_count: int, bytes_processed: int, - slot_millis: Optional[int] = None, - exec_seconds: Optional[float] = None, + slot_millis: int, + exec_seconds: float, ): """For pytest runs only, log information about the query job to a file in order to create a performance report. @@ -118,18 +141,17 @@ def write_stats_to_disk( test_name = os.environ[LOGGING_NAME_ENV_VAR] current_directory = os.getcwd() - if (slot_millis is not None) and (exec_seconds is not None): - # store slot milliseconds - slot_file = os.path.join(current_directory, test_name + ".slotmillis") - with open(slot_file, "a") as f: - f.write(str(slot_millis) + "\n") + # store slot milliseconds + slot_file = os.path.join(current_directory, test_name + ".slotmillis") + with open(slot_file, "a") as f: + f.write(str(slot_millis) + "\n") - # store execution time seconds - exec_time_file = os.path.join( - current_directory, test_name + ".bq_exec_time_seconds" - ) - with open(exec_time_file, "a") as f: - f.write(str(exec_seconds) + "\n") + # store execution time seconds + exec_time_file = os.path.join( + current_directory, test_name + ".bq_exec_time_seconds" + ) + with open(exec_time_file, "a") as f: + f.write(str(exec_seconds) + "\n") # store length of query query_char_count_file = os.path.join( diff --git a/tests/benchmark/read_gbq_colab/aggregate_output.py b/tests/benchmark/read_gbq_colab/aggregate_output.py index 3df6054d64..cd33ed2640 100644 --- a/tests/benchmark/read_gbq_colab/aggregate_output.py +++ b/tests/benchmark/read_gbq_colab/aggregate_output.py @@ -48,7 +48,7 @@ def aggregate_output(*, project_id, dataset_id, table_id): if __name__ == "__main__": - config = utils.get_configuration(include_table_id=True) + config = utils.get_configuration(include_table_id=True, start_session=False) current_path = pathlib.Path(__file__).absolute() utils.get_execution_time( @@ -58,5 +58,4 @@ def aggregate_output(*, project_id, dataset_id, table_id): project_id=config.project_id, dataset_id=config.dataset_id, table_id=config.table_id, - session=config.session, )