Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bigframes/bigquery/_operations/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def create_vector_index(

read_gbq_query(sql)


import bigframes.perf_inspect as perf_inspect
@perf_inspect.runtime_logger
def vector_search(
base_table: str,
column_to_search: str,
Expand Down Expand Up @@ -246,5 +247,4 @@ def vector_search(
df.index.names = index_labels
else:
df = query._session.read_gbq_query(sql, allow_large_results=allow_large_results)

return df
14 changes: 14 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
Union,
)
import warnings
import time
import bigframes.perf_inspect as perf_inspect

import bigframes_vendored.constants as constants
import google.cloud.bigquery as bigquery
Expand Down Expand Up @@ -940,6 +942,8 @@ def split(
]
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]


@perf_inspect.runtime_logger
def _compute_dry_run(
self,
value_keys: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -1629,6 +1633,7 @@ def slice(
# Using cache to optimize for Jupyter Notebook's behavior where both '__repr__'
# and '__repr_html__' are called in a single display action, reducing redundant
# queries.
@perf_inspect.runtime_logger
@functools.cache
def retrieve_repr_request_results(
self, max_results: int
Expand All @@ -1642,26 +1647,34 @@ def retrieve_repr_request_results(

# head caches full underlying expression, so row_count will be free after
executor = self.session._executor
start_time = time.monotonic()
executor.cached(
array_value=self.expr,
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
)
print("Time taken to cache table: {:.2f} seconds".format(time.monotonic() - start_time))
start_time = time.monotonic()
head_result = self.session._executor.execute(
self.expr.slice(start=None, stop=max_results, step=None),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=True,
),
)
print("Time taken to execute head: {:.2f} seconds".format(time.monotonic() - start_time))
start_time = time.monotonic()
row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
).to_py_scalar()
print("Time taken to execute row_count: {:.2f} seconds".format(time.monotonic() - start_time))

start_time = time.monotonic()
head_df = head_result.to_pandas()
print("Time taken to execute to_pandas: {:.2f} seconds".format(time.monotonic() - start_time))
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
Expand Down Expand Up @@ -2696,6 +2709,7 @@ def to_placeholder_table(
self._view_ref = self.session._create_temp_view(sql)
return self._view_ref

@perf_inspect.runtime_logger
def cached(self, *, force: bool = False, session_aware: bool = False) -> None:
"""Write the block to a session table."""
# use a heuristic for whether something needs to be cached
Expand Down
2 changes: 2 additions & 0 deletions bigframes/core/compile/ibis_compiler/ibis_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import bigframes.core.nodes as nodes
import bigframes.core.ordering as bf_ordering
import bigframes.core.rewrite as rewrites
import bigframes.perf_inspect as perf_inspect

if typing.TYPE_CHECKING:
import bigframes.core


@perf_inspect.runtime_logger
def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
output_names = tuple((expression.DerefOp(id), id.sql) for id in request.node.ids)
result_node = nodes.ResultNode(
Expand Down
2 changes: 2 additions & 0 deletions bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def close_session() -> None:
bigframes._config.options.bigquery._session_started = False


import bigframes.perf_inspect as perf_inspect
@perf_inspect.runtime_logger
def get_global_session():
"""Gets the global session.

Expand Down
1 change: 1 addition & 0 deletions bigframes/core/tree_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
import itertools
from typing import Callable, Dict, Optional, Sequence
import bigframes.perf_inspect as perf_inspect

import bigframes.core.nodes as nodes

Expand Down
10 changes: 10 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import pyarrow
import tabulate

import bigframes.perf_inspect as perf_inspect

import bigframes._config.display_options as display_options
import bigframes.constants
import bigframes.core
Expand Down Expand Up @@ -116,6 +118,8 @@ class DataFrame(vendored_pandas_frame.DataFrame):
# Must be above 5000 for pandas to delegate to bigframes for binops
__pandas_priority__ = 15000

import bigframes.perf_inspect as perf_inspect
@perf_inspect.runtime_logger
def __init__(
self,
data=None,
Expand Down Expand Up @@ -725,6 +729,7 @@ def __setattr__(self, key: str, value):
else:
object.__setattr__(self, key, value)

@perf_inspect.runtime_logger
def __repr__(self) -> str:
"""Converts a DataFrame to a string. Calls to_pandas.

Expand Down Expand Up @@ -777,6 +782,7 @@ def __repr__(self) -> str:
lines.append(f"[{row_count} rows x {column_count} columns]")
return "\n".join(lines)

@perf_inspect.runtime_logger
def _repr_html_(self) -> str:
"""
Returns an html string primarily for use by notebooks for displaying
Expand Down Expand Up @@ -1703,6 +1709,9 @@ def to_pandas(
) -> pandas.Series:
...


import bigframes.perf_inspect as perf_inspect
@perf_inspect.runtime_logger
def to_pandas(
self,
max_download_size: Optional[int] = None,
Expand Down Expand Up @@ -1887,6 +1896,7 @@ def to_pandas_batches(
allow_large_results=allow_large_results,
)

@perf_inspect.runtime_logger
def _compute_dry_run(self) -> bigquery.QueryJob:
_, query_job = self._block._compute_dry_run()
return query_job
Expand Down
2 changes: 0 additions & 2 deletions bigframes/formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
table_html += "</table>"
return widgets.HTML(table_html)


def repr_query_job(query_job: Optional[bigquery.QueryJob]):
"""Return query job as a formatted string.
Args:
Expand Down Expand Up @@ -118,7 +117,6 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
res += f"""{key}: {job_val}"""
return res


def wait_for_query_job(
query_job: bigquery.QueryJob,
max_results: Optional[int] = None,
Expand Down
41 changes: 41 additions & 0 deletions bigframes/perf_inspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import time

global_counter = 0

def runtime_logger(func):
"""Decorator to log the runtime of a function."""

@functools.wraps(func)
def wrapper(*args, **kwargs):
global global_counter
global_counter += 1
prefix = "--" * global_counter

start_time = time.monotonic()

print(f"|{prefix}{func.__qualname__} started at {start_time:.2f} seconds")
result = func(*args, **kwargs)
end_time = time.monotonic()
print(
f"|{prefix}{func.__qualname__} ended at {end_time:.2f} seconds. "
f"Runtime: {end_time - start_time:.2f} seconds"
)
global_counter -= 1
return result

return wrapper
4 changes: 2 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class Session(
An object providing client library objects.
"""

import bigframes.perf_inspect as perf_inspect
@perf_inspect.runtime_logger
def __init__(
self,
context: Optional[bigquery_options.BigQueryOptions] = None,
Expand All @@ -143,7 +145,6 @@ def __init__(

if context is None:
context = bigquery_options.BigQueryOptions()

if context.location is None:
self._location = "US"
msg = bfe.format_message(
Expand Down Expand Up @@ -184,7 +185,6 @@ def __init__(
client_endpoints_override=context.client_endpoints_override,
requests_transport_adapters=context.requests_transport_adapters,
)

# TODO(shobs): Remove this logic after https://github.com/ibis-project/ibis/issues/8494
# has been fixed. The ibis client changes the default query job config
# so we are going to remember the current config and restore it after
Expand Down
2 changes: 2 additions & 0 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import bigframes.core.sql
import bigframes.formatting_helpers as formatting_helpers
import bigframes.session.metrics
import bigframes.perf_inspect as perf_inspect

CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."

Expand Down Expand Up @@ -116,6 +117,7 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str:
return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`"


@perf_inspect.runtime_logger
def create_temp_table(
bqclient: bigquery.Client,
table_ref: bigquery.TableReference,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/anonymous_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def create_temp_table(
self.allocate_temp_table(),
expiration,
schema=schema,
cluster_columns=list(cluster_cols),
# cluster_columns=list(cluster_cols),
kms_key=self._kms_key,
)
return bigquery.TableReference.from_string(table)
Expand Down
2 changes: 2 additions & 0 deletions bigframes/session/bigquery_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from bigframes.core.compile import googlesql
from bigframes.session import temporary_storage
import bigframes.perf_inspect as perf_inspect

KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0

Expand All @@ -49,6 +50,7 @@ def __init__(self, bqclient: bigquery.Client, location: str):
def location(self):
return self._location

@perf_inspect.runtime_logger
def create_temp_table(
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
) -> bigquery.TableReference:
Expand Down
Loading