Skip to content

Commit 70d8324

Browse files
committed
report execution started/stopped in read_gbq_query
1 parent e6c3ba9 commit 70d8324

File tree

7 files changed

+558
-173
lines changed

7 files changed

+558
-173
lines changed

bigframes/core/events.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import google.cloud.bigquery.table
2626

2727
import bigframes.formatting_helpers
28+
import bigframes.session.executor
2829

2930

3031
@dataclasses.dataclass(frozen=True)
@@ -83,8 +84,9 @@ class ExecutionRunning(Event):
8384
pass
8485

8586

86-
class ExecutionStopped(Event):
87-
pass
87+
@dataclasses.dataclass(frozen=True)
88+
class ExecutionFinished(Event):
89+
result: Optional[bigframes.session.executor.ExecuteResult] = None
8890

8991

9092
@dataclasses.dataclass(frozen=True)

bigframes/formatting_helpers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
125125

126126
current_display: Optional[display.HTML] = None
127127
current_display_id: Optional[str] = None
128+
previous_message: str = ""
128129

129130

130131
def progress_callback(
@@ -149,24 +150,27 @@ def progress_callback(
149150
):
150151
current_display_id = str(random.random())
151152
current_display = display.HTML("Starting execution.")
152-
display.display(current_display)
153+
display.display(
154+
current_display,
155+
display_id=current_display_id,
156+
)
153157

154158
if isinstance(event, bigframes.core.events.ExecutionRunning):
155159
display.update_display(
156160
display.HTML("Execution happening."),
157161
display_id=current_display_id,
158162
)
159-
elif isinstance(event, bigframes.core.events.ExecutionStopped):
163+
elif isinstance(event, bigframes.core.events.ExecutionFinished):
160164
display.update_display(
161-
display.HTML("Execution done."),
165+
display.HTML(f"{previous_message} Execution done."),
162166
display_id=current_display_id,
163167
)
164168
elif progress_bar == "terminal":
165169
if isinstance(event, bigframes.core.events.ExecutionStarted):
166170
print("Starting execution.")
167171
elif isinstance(event, bigframes.core.events.ExecutionRunning):
168172
print("Execution happening.")
169-
elif isinstance(event, bigframes.core.events.ExecutionStopped):
173+
elif isinstance(event, bigframes.core.events.ExecutionFinished):
170174
print("Execution done.")
171175

172176

bigframes/session/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@
7575
import bigframes.functions.function as bff
7676
from bigframes.session import bigquery_session, bq_caching_executor, executor
7777
import bigframes.session._io.bigquery as bf_io_bigquery
78-
import bigframes.session.anonymous_dataset
7978
import bigframes.session.clients
80-
import bigframes.session.loader
81-
import bigframes.session.metrics
8279
import bigframes.session.validation
8380

8481
# Avoid circular imports.

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import bigframes_vendored.constants as constants
2727
import google.api_core.exceptions
2828
import google.cloud.bigquery as bigquery
29+
import google.cloud.bigquery.table
2930

30-
import bigframes.core.sql
3131
import bigframes.exceptions as bfe
3232
import bigframes.session._io.bigquery
3333

@@ -101,7 +101,7 @@ def get_table_metadata(
101101

102102
def is_time_travel_eligible(
103103
bqclient: bigquery.Client,
104-
table: bigquery.table.Table,
104+
table: google.cloud.bigquery.table.Table,
105105
columns: Optional[Sequence[str]],
106106
snapshot_time: datetime.datetime,
107107
filter_str: Optional[str] = None,
@@ -210,10 +210,8 @@ def is_time_travel_eligible(
210210

211211

212212
def infer_unique_columns(
213-
bqclient: bigquery.Client,
214-
table: bigquery.table.Table,
213+
table: google.cloud.bigquery.table.Table,
215214
index_cols: List[str],
216-
metadata_only: bool = False,
217215
) -> Tuple[str, ...]:
218216
"""Return a set of columns that can provide a unique row key or empty if none can be inferred.
219217
@@ -227,14 +225,34 @@ def infer_unique_columns(
227225
# Essentially, just reordering the primary key to match the index col order
228226
return tuple(index_col for index_col in index_cols if index_col in primary_keys)
229227

230-
if primary_keys or metadata_only or (not index_cols):
231-
# Sometimes not worth scanning data to check uniqueness
228+
if primary_keys:
232229
return primary_keys
230+
231+
return ()
232+
233+
234+
def check_if_index_columns_are_unique(
235+
bqclient: bigquery.Client,
236+
table: google.cloud.bigquery.table.Table,
237+
index_cols: List[str],
238+
) -> Tuple[str, ...]:
239+
import bigframes.core.sql
240+
import bigframes.session._io.bigquery
241+
233242
# TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring
234243
# table_expression only selects just index_cols.
235244
is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference)
236245
job_config = bigquery.QueryJobConfig()
237-
results = bqclient.query_and_wait(is_unique_sql, job_config=job_config)
246+
results, _ = bigframes.session._io.bigquery.start_query_with_client(
247+
bq_client=bqclient,
248+
sql=is_unique_sql,
249+
job_config=job_config,
250+
timeout=None,
251+
location=None,
252+
project=None,
253+
metrics=None,
254+
query_with_job=False,
255+
)
238256
row = next(iter(results))
239257

240258
if row["total_count"] == row["distinct_count"]:
@@ -243,7 +261,7 @@ def infer_unique_columns(
243261

244262

245263
def _get_primary_keys(
246-
table: bigquery.table.Table,
264+
table: google.cloud.bigquery.table.Table,
247265
) -> List[str]:
248266
"""Get primary keys from table if they are set."""
249267

@@ -261,7 +279,7 @@ def _get_primary_keys(
261279

262280

263281
def _is_table_clustered_or_partitioned(
264-
table: bigquery.table.Table,
282+
table: google.cloud.bigquery.table.Table,
265283
) -> bool:
266284
"""Returns True if the table is clustered or partitioned."""
267285

@@ -284,7 +302,7 @@ def _is_table_clustered_or_partitioned(
284302

285303

286304
def get_index_cols(
287-
table: bigquery.table.Table,
305+
table: google.cloud.bigquery.table.Table,
288306
index_col: Iterable[str]
289307
| str
290308
| Iterable[int]

bigframes/session/bq_caching_executor.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import bigframes.core
3333
from bigframes.core import compile, local_data, rewrite
3434
import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir
35+
import bigframes.core.events
3536
import bigframes.core.guid
3637
import bigframes.core.identifiers
3738
import bigframes.core.nodes as nodes
@@ -187,6 +188,8 @@ def execute(
187188
array_value: bigframes.core.ArrayValue,
188189
execution_spec: ex_spec.ExecutionSpec,
189190
) -> executor.ExecuteResult:
191+
bigframes.core.events.publisher.send(bigframes.core.events.ExecutionStarted())
192+
190193
# TODO: Support export jobs in combination with semi executors
191194
if execution_spec.destination_spec is None:
192195
plan = self.prepare_plan(array_value.node, target="simplify")
@@ -195,6 +198,11 @@ def execute(
195198
plan, ordered=execution_spec.ordered, peek=execution_spec.peek
196199
)
197200
if maybe_result:
201+
bigframes.core.events.publisher.send(
202+
bigframes.core.events.ExecutionFinished(
203+
result=maybe_result,
204+
)
205+
)
198206
return maybe_result
199207

200208
if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec):
@@ -203,7 +211,13 @@ def execute(
203211
"Ordering and peeking not supported for gbq export"
204212
)
205213
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
206-
return self._export_gbq(array_value, execution_spec.destination_spec)
214+
result = self._export_gbq(array_value, execution_spec.destination_spec)
215+
bigframes.core.events.publisher.send(
216+
bigframes.core.events.ExecutionFinished(
217+
result=result,
218+
)
219+
)
220+
return result
207221

208222
result = self._execute_plan_gbq(
209223
array_value.node,
@@ -218,6 +232,11 @@ def execute(
218232
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
219233
self._export_result_gcs(result, execution_spec.destination_spec)
220234

235+
bigframes.core.events.publisher.send(
236+
bigframes.core.events.ExecutionFinished(
237+
result=result,
238+
)
239+
)
221240
return result
222241

223242
def _export_result_gcs(

bigframes/session/loader.py

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils
5151
import bigframes.core as core
5252
import bigframes.core.blocks as blocks
53+
import bigframes.core.events
5354
import bigframes.core.schema as schemata
5455
import bigframes.dtypes
5556
import bigframes.formatting_helpers as formatting_helpers
@@ -499,6 +500,7 @@ def read_gbq_table( # type: ignore[overload-overlap]
499500
force_total_order: Optional[bool] = ...,
500501
n_rows: Optional[int] = None,
501502
index_col_in_columns: bool = False,
503+
publish_execution: bool = True,
502504
) -> dataframe.DataFrame:
503505
...
504506

@@ -522,6 +524,7 @@ def read_gbq_table(
522524
force_total_order: Optional[bool] = ...,
523525
n_rows: Optional[int] = None,
524526
index_col_in_columns: bool = False,
527+
publish_execution: bool = True,
525528
) -> pandas.Series:
526529
...
527530

@@ -544,6 +547,7 @@ def read_gbq_table(
544547
force_total_order: Optional[bool] = None,
545548
n_rows: Optional[int] = None,
546549
index_col_in_columns: bool = False,
550+
publish_execution: bool = True,
547551
) -> dataframe.DataFrame | pandas.Series:
548552
"""Read a BigQuery table into a BigQuery DataFrames DataFrame.
549553
@@ -603,8 +607,12 @@ def read_gbq_table(
603607
when the index is selected from the data columns (e.g., in a
604608
``read_csv`` scenario). The column will be used as the
605609
DataFrame's index and removed from the list of value columns.
610+
publish_execution (bool, optional):
611+
If True, sends an execution started and stopped event if this
612+
causes a query. Set to False if using read_gbq_table from
613+
another function that is reporting execution.
606614
"""
607-
import bigframes._tools.strings
615+
import bigframes.core.events
608616
import bigframes.dataframe as dataframe
609617

610618
# ---------------------------------
@@ -768,12 +776,26 @@ def read_gbq_table(
768776
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
769777
# check.
770778
primary_key = bf_read_gbq_table.infer_unique_columns(
771-
bqclient=self._bqclient,
772779
table=table,
773780
index_cols=index_cols,
774-
# If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique
775-
metadata_only=not self._scan_index_uniqueness,
776781
)
782+
783+
# If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique
784+
if not primary_key and self._scan_index_uniqueness and index_cols:
785+
if publish_execution:
786+
bigframes.core.events.publisher.send(
787+
bigframes.core.events.ExecutionStarted(),
788+
)
789+
primary_key = bf_read_gbq_table.check_if_index_columns_are_unique(
790+
self._bqclient,
791+
table=table,
792+
index_cols=index_cols,
793+
)
794+
if publish_execution:
795+
bigframes.core.events.publisher.send(
796+
bigframes.core.events.ExecutionFinished(),
797+
)
798+
777799
schema = schemata.ArraySchema.from_bq_table(table)
778800
if not include_all_columns:
779801
schema = schema.select(index_cols + columns)
@@ -991,6 +1013,12 @@ def read_gbq_query(
9911013
query_job, list(columns), index_cols
9921014
)
9931015

1016+
# We want to make sure we show progress when we actually do execute a
1017+
# query. Since we have got this far, we know it's not a dry run.
1018+
bigframes.core.events.publisher.send(
1019+
bigframes.core.events.ExecutionStarted(),
1020+
)
1021+
9941022
query_job_for_metrics: Optional[bigquery.QueryJob] = None
9951023
destination: Optional[bigquery.TableReference] = None
9961024

@@ -1046,20 +1074,28 @@ def read_gbq_query(
10461074
# makes sense to download the results beyond the first page, even if
10471075
# there is a job and destination table available.
10481076
if query_job_for_metrics is None and rows is not None:
1049-
return bf_read_gbq_query.create_dataframe_from_row_iterator(
1077+
df = bf_read_gbq_query.create_dataframe_from_row_iterator(
10501078
rows,
10511079
session=self._session,
10521080
index_col=index_col,
10531081
columns=columns,
10541082
)
1083+
bigframes.core.events.publisher.send(
1084+
bigframes.core.events.ExecutionFinished(),
1085+
)
1086+
return df
10551087

10561088
# We already checked rows, so if there's no destination table, then
10571089
# there are no results to return.
10581090
if destination is None:
1059-
return bf_read_gbq_query.create_dataframe_from_query_job_stats(
1091+
df = bf_read_gbq_query.create_dataframe_from_query_job_stats(
10601092
query_job_for_metrics,
10611093
session=self._session,
10621094
)
1095+
bigframes.core.events.publisher.send(
1096+
bigframes.core.events.ExecutionFinished(),
1097+
)
1098+
return df
10631099

10641100
# If the query was DDL or DML, return some job metadata. See
10651101
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
@@ -1070,10 +1106,14 @@ def read_gbq_query(
10701106
query_job_for_metrics is not None
10711107
and not bf_read_gbq_query.should_return_query_results(query_job_for_metrics)
10721108
):
1073-
return bf_read_gbq_query.create_dataframe_from_query_job_stats(
1109+
df = bf_read_gbq_query.create_dataframe_from_query_job_stats(
10741110
query_job_for_metrics,
10751111
session=self._session,
10761112
)
1113+
bigframes.core.events.publisher.send(
1114+
bigframes.core.events.ExecutionFinished(),
1115+
)
1116+
return df
10771117

10781118
# Speed up counts by getting counts from result metadata.
10791119
if rows is not None:
@@ -1083,16 +1123,21 @@ def read_gbq_query(
10831123
else:
10841124
n_rows = None
10851125

1086-
return self.read_gbq_table(
1126+
df = self.read_gbq_table(
10871127
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
10881128
index_col=index_col,
10891129
columns=columns,
10901130
use_cache=configuration["query"]["useQueryCache"],
10911131
force_total_order=force_total_order,
10921132
n_rows=n_rows,
1133+
publish_execution=False,
10931134
# max_results and filters are omitted because they are already
10941135
# handled by to_query(), above.
10951136
)
1137+
bigframes.core.events.publisher.send(
1138+
bigframes.core.events.ExecutionFinished(),
1139+
)
1140+
return df
10961141

10971142
def _query_to_destination(
10981143
self,

0 commit comments

Comments
 (0)