Skip to content

Commit d03e5d1

Browse files
committed
first attempt at publisher
1 parent 1eccb3a commit d03e5d1

File tree

6 files changed

+245
-527
lines changed

6 files changed

+245
-527
lines changed

bigframes/core/events.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import google.cloud.bigquery.job.query
2525
import google.cloud.bigquery.table
2626

27+
import bigframes.formatting_helpers
28+
2729

2830
@dataclasses.dataclass(frozen=True)
2931
class Subscriber:
@@ -66,6 +68,7 @@ def send(self, event: Event):
6668

6769

6870
publisher = Publisher()
71+
publisher.subscribe(bigframes.formatting_helpers.progress_callback)
6972

7073

7174
class Event:
@@ -85,7 +88,7 @@ class ExecutionStopped(Event):
8588

8689

8790
@dataclasses.dataclass(frozen=True)
88-
class BigQuerySentEvent(ExecutionStarted):
91+
class BigQuerySentEvent(ExecutionRunning):
8992
"""Query sent to BigQuery."""
9093

9194
query: str
@@ -158,7 +161,7 @@ def from_bqclient(
158161

159162

160163
@dataclasses.dataclass(frozen=True)
161-
class BigQueryFinishedEvent(ExecutionStopped):
164+
class BigQueryFinishedEvent(ExecutionRunning):
162165
"""Query finished successfully."""
163166

164167
billing_project: Optional[str] = None

bigframes/formatting_helpers.py

Lines changed: 45 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
# limitations under the License.
1414

1515
"""Shared helper functions for formatting jobs related info."""
16-
# TODO(orrbradford): cleanup up typings and documenttion in this file
16+
17+
from __future__ import annotations
1718

1819
import datetime
1920
import random
20-
from typing import Any, Optional, Type, Union
21+
from typing import Any, Optional, Type, TYPE_CHECKING, Union
2122

2223
import bigframes_vendored.constants as constants
2324
import google.api_core.exceptions as api_core_exceptions
@@ -27,6 +28,9 @@
2728
import IPython.display as display
2829
import ipywidgets as widgets
2930

31+
if TYPE_CHECKING:
32+
import bigframes.core.events
33+
3034
GenericJob = Union[
3135
bigquery.LoadJob, bigquery.ExtractJob, bigquery.QueryJob, bigquery.CopyJob
3236
]
@@ -119,71 +123,51 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
119123
return res
120124

121125

122-
def wait_for_query_job(
123-
query_job: bigquery.QueryJob,
124-
max_results: Optional[int] = None,
125-
page_size: Optional[int] = None,
126-
progress_bar: Optional[str] = None,
127-
) -> bigquery.table.RowIterator:
128-
"""Return query results. Displays a progress bar while the query is running
129-
Args:
130-
query_job (bigquery.QueryJob, Optional):
131-
The job representing the execution of the query on the server.
132-
max_results (int, Optional):
133-
The maximum number of rows the row iterator should return.
134-
page_size (int, Optional):
135-
The number of results to return on each results page.
136-
progress_bar (str, Optional):
137-
Which progress bar to show.
138-
Returns:
139-
A row iterator over the query results.
140-
"""
126+
current_display: Optional[display.HTML] = None
127+
current_display_id: Optional[str] = None
128+
129+
130+
def progress_callback(
131+
event: bigframes.core.events.Event,
132+
):
133+
"""Displays a progress bar while the query is running"""
134+
global current_display, current_display_id
135+
136+
import bigframes._config
137+
import bigframes.core.events
138+
139+
progress_bar = bigframes._config.options.display.progress_bar
140+
141141
if progress_bar == "auto":
142142
progress_bar = "notebook" if in_ipython() else "terminal"
143143

144-
try:
145-
if progress_bar == "notebook":
146-
display_id = str(random.random())
147-
loading_bar = display.HTML(get_query_job_loading_html(query_job))
148-
display.display(loading_bar, display_id=display_id)
149-
query_result = query_job.result(
150-
max_results=max_results, page_size=page_size
151-
)
152-
query_job.reload()
144+
if progress_bar == "notebook":
145+
if (
146+
isinstance(event, bigframes.core.events.ExecutionStarted)
147+
or current_display is None
148+
or current_display_id is None
149+
):
150+
current_display_id = str(random.random())
151+
current_display = display.HTML("Starting execution.")
152+
display.display(current_display)
153+
154+
if isinstance(event, bigframes.core.events.ExecutionRunning):
153155
display.update_display(
154-
display.HTML(get_query_job_loading_html(query_job)),
155-
display_id=display_id,
156+
display.HTML("Execution happening."),
157+
display_id=current_display_id,
156158
)
157-
elif progress_bar == "terminal":
158-
initial_loading_bar = get_query_job_loading_string(query_job)
159-
print(initial_loading_bar)
160-
query_result = query_job.result(
161-
max_results=max_results, page_size=page_size
162-
)
163-
query_job.reload()
164-
if initial_loading_bar != get_query_job_loading_string(query_job):
165-
print(get_query_job_loading_string(query_job))
166-
else:
167-
# No progress bar.
168-
query_result = query_job.result(
169-
max_results=max_results, page_size=page_size
159+
elif isinstance(event, bigframes.core.events.ExecutionStopped):
160+
display.update_display(
161+
display.HTML("Execution done."),
162+
display_id=current_display_id,
170163
)
171-
query_job.reload()
172-
return query_result
173-
except api_core_exceptions.RetryError as exc:
174-
add_feedback_link(exc)
175-
raise
176-
except api_core_exceptions.GoogleAPICallError as exc:
177-
add_feedback_link(exc)
178-
raise
179-
except KeyboardInterrupt:
180-
query_job.cancel()
181-
print(
182-
f"Requested cancellation for {query_job.job_type.capitalize()}"
183-
f" job {query_job.job_id} in location {query_job.location}..."
184-
)
185-
# begin the cancel request before immediately rethrowing
186-
raise
164+
elif progress_bar == "terminal":
165+
if isinstance(event, bigframes.core.events.ExecutionStarted):
166+
print("Starting execution.")
167+
elif isinstance(event, bigframes.core.events.ExecutionRunning):
168+
print("Execution happening.")
169+
elif isinstance(event, bigframes.core.events.ExecutionStopped):
170+
print("Execution done.")
187171

188172

189173
def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):

bigframes/session/_io/bigquery/__init__.py

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import google.api_core.exceptions
3030
import google.api_core.retry
3131
import google.cloud.bigquery as bigquery
32+
import google.cloud.bigquery._job_helpers
33+
import google.cloud.bigquery.table
3234

3335
from bigframes.core import log_adapter
3436
import bigframes.core.compile.googlesql as googlesql
37+
import bigframes.core.events
3538
import bigframes.core.sql
36-
import bigframes.formatting_helpers as formatting_helpers
3739
import bigframes.session.metrics
3840

3941
CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
@@ -238,6 +240,15 @@ def add_and_trim_labels(job_config):
238240
)
239241

240242

243+
def publish_bq_event(event):
244+
if isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
245+
bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
246+
else:
247+
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
248+
249+
bigframes.core.events.publisher.send(bf_event)
250+
251+
241252
@overload
242253
def start_query_with_client(
243254
bq_client: bigquery.Client,
@@ -249,7 +260,7 @@ def start_query_with_client(
249260
timeout: Optional[float],
250261
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
251262
query_with_job: Literal[True],
252-
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
263+
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
253264
...
254265

255266

@@ -264,7 +275,7 @@ def start_query_with_client(
264275
timeout: Optional[float],
265276
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
266277
query_with_job: Literal[False],
267-
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
278+
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
268279
...
269280

270281

@@ -280,7 +291,7 @@ def start_query_with_client(
280291
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
281292
query_with_job: Literal[True],
282293
job_retry: google.api_core.retry.Retry,
283-
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
294+
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
284295
...
285296

286297

@@ -296,7 +307,7 @@ def start_query_with_client(
296307
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
297308
query_with_job: Literal[False],
298309
job_retry: google.api_core.retry.Retry,
299-
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
310+
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
300311
...
301312

302313

@@ -315,23 +326,25 @@ def start_query_with_client(
315326
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
316327
# version 3.36.0 or later.
317328
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
318-
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
329+
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
319330
"""
320331
Starts query job and waits for results.
321332
"""
333+
# Note: Ensure no additional labels are added to job_config after this
334+
# point, as `add_and_trim_labels` ensures the label count does not
335+
# exceed MAX_LABELS_COUNT.
336+
add_and_trim_labels(job_config)
337+
322338
try:
323-
# Note: Ensure no additional labels are added to job_config after this
324-
# point, as `add_and_trim_labels` ensures the label count does not
325-
# exceed MAX_LABELS_COUNT.
326-
add_and_trim_labels(job_config)
327339
if not query_with_job:
328-
results_iterator = bq_client.query_and_wait(
340+
results_iterator = bq_client._query_and_wait_bigframes(
329341
sql,
330342
job_config=job_config,
331343
location=location,
332344
project=project,
333345
api_timeout=timeout,
334346
job_retry=job_retry,
347+
callback=publish_bq_event,
335348
)
336349
if metrics is not None:
337350
metrics.count_job_stats(row_iterator=results_iterator)
@@ -350,14 +363,32 @@ def start_query_with_client(
350363
ex.message += CHECK_DRIVE_PERMISSIONS
351364
raise
352365

353-
opts = bigframes.options.display
354-
if opts.progress_bar is not None and not query_job.configuration.dry_run:
355-
results_iterator = formatting_helpers.wait_for_query_job(
356-
query_job,
357-
progress_bar=opts.progress_bar,
366+
if not query_job.configuration.dry_run:
367+
bigframes.core.events.publisher.send(
368+
bigframes.core.events.BigQuerySentEvent(
369+
sql,
370+
billing_project=query_job.project,
371+
location=query_job.location,
372+
job_id=query_job.job_id,
373+
request_id=None,
374+
)
375+
)
376+
results_iterator = query_job.result()
377+
if not query_job.configuration.dry_run:
378+
bigframes.core.events.publisher.send(
379+
bigframes.core.events.BigQueryFinishedEvent(
380+
billing_project=query_job.project,
381+
location=query_job.location,
382+
job_id=query_job.job_id,
383+
destination=query_job.destination,
384+
total_rows=results_iterator.total_rows,
385+
total_bytes_processed=query_job.total_bytes_processed,
386+
slot_millis=query_job.slot_millis,
387+
created=query_job.created,
388+
started=query_job.started,
389+
ended=query_job.ended,
390+
)
358391
)
359-
else:
360-
results_iterator = query_job.result()
361392

362393
if metrics is not None:
363394
metrics.count_job_stats(query_job=query_job)

0 commit comments

Comments
 (0)