Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import functools
import inspect
import os
import threading
import typing
from typing import (
Expand Down Expand Up @@ -56,6 +57,7 @@
from bigframes.session import dry_runs
import bigframes.session._io.bigquery
import bigframes.session.clients
import bigframes.session.metrics

# Note: the following methods are duplicated from Session. This duplication
# enables the following:
Expand Down Expand Up @@ -625,6 +627,11 @@ def _get_bqclient() -> bigquery.Client:

def _dry_run(query, bqclient) -> bigquery.QueryJob:
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))

# Fix for b/435183833. Log metrics even if a Session isn't available.
if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ:
metrics = bigframes.session.metrics.ExecutionMetrics()
metrics.count_job_stats(job)
return job


Expand Down
78 changes: 50 additions & 28 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,54 @@ def count_job_stats(
):
if query_job is None:
assert row_iterator is not None
total_bytes_processed = getattr(row_iterator, "total_bytes_processed", None)
query = getattr(row_iterator, "query", None)
if total_bytes_processed is None or query is None:
return

# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0)
query_char_count = len(getattr(row_iterator, "query", ""))
slot_millis = getattr(row_iterator, "slot_millis", 0)
exec_seconds = 0.0

self.execution_count += 1
self.query_char_count += len(query)
self.bytes_processed += total_bytes_processed
write_stats_to_disk(len(query), total_bytes_processed)
return
self.query_char_count += query_char_count
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis

elif query_job.configuration.dry_run:
query_char_count = len(query_job.query)

if query_job.configuration.dry_run:
write_stats_to_disk(len(query_job.query), 0, 0, 0)
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0

stats = get_performance_stats(query_job)
if stats is not None:
query_char_count, bytes_processed, slot_millis, execution_secs = stats
elif (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
self.execution_count += 1
self.query_char_count += query_char_count
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
self.execution_secs += execution_secs
self.execution_secs += exec_seconds
write_stats_to_disk(
query_char_count, bytes_processed, slot_millis, execution_secs
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)

else:
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
query_char_count = 0
slot_millis = 0
exec_seconds = 0

write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)


def get_performance_stats(
query_job: bigquery.QueryJob,
Expand Down Expand Up @@ -103,10 +125,11 @@ def get_performance_stats(


def write_stats_to_disk(
*,
query_char_count: int,
bytes_processed: int,
slot_millis: Optional[int] = None,
exec_seconds: Optional[float] = None,
slot_millis: int,
exec_seconds: float,
):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
Expand All @@ -118,18 +141,17 @@ def write_stats_to_disk(
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()

if (slot_millis is not None) and (exec_seconds is not None):
# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")
# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")

# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")
# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")

# store length of query
query_char_count_file = os.path.join(
Expand Down
3 changes: 1 addition & 2 deletions tests/benchmark/read_gbq_colab/aggregate_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def aggregate_output(*, project_id, dataset_id, table_id):


if __name__ == "__main__":
config = utils.get_configuration(include_table_id=True)
config = utils.get_configuration(include_table_id=True, start_session=False)
current_path = pathlib.Path(__file__).absolute()

utils.get_execution_time(
Expand All @@ -58,5 +58,4 @@ def aggregate_output(*, project_id, dataset_id, table_id):
project_id=config.project_id,
dataset_id=config.dataset_id,
table_id=config.table_id,
session=config.session,
)