Skip to content

Commit f8a3f64

Browse files
committed
feat: add custom progress reporting to anywidget display mode
1 parent 6370d3b commit f8a3f64

22 files changed

+398
-346
lines changed

bigframes/core/blocks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import textwrap
3131
import typing
3232
from typing import (
33+
Callable,
3334
Iterable,
3435
Iterator,
3536
List,
@@ -679,6 +680,7 @@ def to_pandas_batches(
679680
page_size: Optional[int] = None,
680681
max_results: Optional[int] = None,
681682
allow_large_results: Optional[bool] = None,
683+
callback: Callable = lambda _: None,
682684
) -> Iterator[pd.DataFrame]:
683685
"""Download results one message at a time.
684686
@@ -696,6 +698,7 @@ def to_pandas_batches(
696698
promise_under_10gb=under_10gb,
697699
ordered=True,
698700
),
701+
callback=callback,
699702
)
700703

701704
# To reduce the number of edge cases to consider when working with the

bigframes/dataframe.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,6 +1885,21 @@ def to_pandas_batches(
18851885
allow_large_results=allow_large_results,
18861886
)
18871887

1888+
def _to_pandas_batches_colab(
1889+
self,
1890+
page_size: Optional[int] = None,
1891+
max_results: Optional[int] = None,
1892+
*,
1893+
allow_large_results: Optional[bool] = None,
1894+
callback: Callable = lambda _: None,
1895+
) -> Iterable[pandas.DataFrame]:
1896+
return self._block.to_pandas_batches(
1897+
page_size=page_size,
1898+
max_results=max_results,
1899+
allow_large_results=allow_large_results,
1900+
callback=callback,
1901+
)
1902+
18881903
def _compute_dry_run(self) -> bigquery.QueryJob:
18891904
_, query_job = self._block._compute_dry_run()
18901905
return query_job

bigframes/display/anywidget.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import pandas as pd
2424

2525
import bigframes
26+
import bigframes.dataframe
2627
import bigframes.display.html
2728

2829
# anywidget and traitlets are optional dependencies. We don't want the import of this
@@ -73,7 +74,7 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame):
7374
initial_page_size = bigframes.options.display.max_rows
7475

7576
# Initialize data fetching attributes.
76-
self._batches = dataframe.to_pandas_batches(page_size=initial_page_size)
77+
# self._batches = dataframe._to_pandas_batches_colab(page_size=initial_page_size, callback=self._update_progress)
7778

7879
# set traitlets properties that trigger observers
7980
self.page_size = initial_page_size
@@ -100,6 +101,7 @@ def _css(self):
100101
page = traitlets.Int(0).tag(sync=True)
101102
page_size = traitlets.Int(25).tag(sync=True)
102103
row_count = traitlets.Int(0).tag(sync=True)
104+
progress_html = traitlets.Unicode().tag(sync=True)
103105
table_html = traitlets.Unicode().tag(sync=True)
104106

105107
@traitlets.validate("page")
@@ -145,6 +147,10 @@ def _validate_page_size(self, proposal: Dict[str, Any]) -> int:
145147
max_page_size = 1000
146148
return min(value, max_page_size)
147149

150+
def _update_progress(self, event):
151+
# TODO: use formatting helpers here.
152+
self.progress_html = f"<code>{repr(event)}"
153+
148154
def _get_next_batch(self) -> bool:
149155
"""
150156
Gets the next batch of data from the generator and appends to cache.
@@ -180,7 +186,9 @@ def _cached_data(self) -> pd.DataFrame:
180186

181187
def _reset_batches_for_new_page_size(self):
182188
"""Reset the batch iterator when page size changes."""
183-
self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size)
189+
self._batches = self._dataframe._to_pandas_batches_colab(
190+
page_size=self.page_size, callback=self._update_progress
191+
)
184192
self._cached_batches = []
185193
self._batch_iter = None
186194
self._all_data_loaded = False

bigframes/display/table_widget.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ const ModelProperty = {
1919
PAGE_SIZE: "page_size",
2020
ROW_COUNT: "row_count",
2121
TABLE_HTML: "table_html",
22+
PROGRESS_HTML: "progress_html",
2223
};
2324

2425
const Event = {
2526
CHANGE: "change",
2627
CHANGE_TABLE_HTML: `change:${ModelProperty.TABLE_HTML}`,
28+
CHANGE_PROGRESS_HTML: `change:${ModelProperty.PROGRESS_HTML}`,
2729
CLICK: "click",
2830
};
2931

@@ -39,6 +41,7 @@ function render({ model, el }) {
3941
el.classList.add("bigframes-widget");
4042

4143
// Structure
44+
const progressContainer = document.createElement("div");
4245
const tableContainer = document.createElement("div");
4346
const footer = document.createElement("div");
4447

@@ -119,6 +122,13 @@ function render({ model, el }) {
119122
}
120123
}
121124

125+
/** Updates the HTML in the progress container. */
126+
function handleTableHTMLChange() {
127+
// Note: Using innerHTML is safe here because the content is generated
128+
// by a trusted backend (formatting_helpers).
129+
progressContainer.innerHTML = model.get(ModelProperty.PROGRESS_HTML);
130+
}
131+
122132
/** Updates the HTML in the table container and refreshes button states. */
123133
function handleTableHTMLChange() {
124134
// Note: Using innerHTML is safe here because the content is generated

bigframes/formatting_helpers.py

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313
# limitations under the License.
1414

1515
"""Shared helper functions for formatting jobs related info."""
16-
# TODO(orrbradford): cleanup up typings and documenttion in this file
16+
17+
from __future__ import annotations
1718

1819
import datetime
1920
import random
20-
from typing import Any, Optional, Type, Union
21+
from typing import Any, Callable, Optional, Type, Union
2122

2223
import bigframes_vendored.constants as constants
2324
import google.api_core.exceptions as api_core_exceptions
2425
import google.cloud.bigquery as bigquery
26+
import google.cloud.bigquery._job_helpers
2527
import humanize
2628
import IPython
2729
import IPython.display as display
@@ -124,6 +126,7 @@ def wait_for_query_job(
124126
max_results: Optional[int] = None,
125127
page_size: Optional[int] = None,
126128
progress_bar: Optional[str] = None,
129+
callback: Callable = lambda _: None,
127130
) -> bigquery.table.RowIterator:
128131
"""Return query results. Displays a progress bar while the query is running
129132
Args:
@@ -141,35 +144,67 @@ def wait_for_query_job(
141144
if progress_bar == "auto":
142145
progress_bar = "notebook" if in_ipython() else "terminal"
143146

144-
try:
145-
if progress_bar == "notebook":
146-
display_id = str(random.random())
147-
loading_bar = display.HTML(get_query_job_loading_html(query_job))
148-
display.display(loading_bar, display_id=display_id)
149-
query_result = query_job.result(
150-
max_results=max_results, page_size=page_size
151-
)
152-
query_job.reload()
147+
if progress_bar == "notebook":
148+
loading_bar = display.HTML(get_query_job_loading_html(query_job))
149+
display_id = str(random.random())
150+
display.display(loading_bar, display_id=display_id)
151+
152+
def extended_callback(event):
153+
callback(event)
153154
display.update_display(
154155
display.HTML(get_query_job_loading_html(query_job)),
155156
display_id=display_id,
156157
)
157-
elif progress_bar == "terminal":
158-
initial_loading_bar = get_query_job_loading_string(query_job)
159-
print(initial_loading_bar)
160-
query_result = query_job.result(
161-
max_results=max_results, page_size=page_size
162-
)
163-
query_job.reload()
158+
159+
elif progress_bar == "terminal":
160+
initial_loading_bar = get_query_job_loading_string(query_job)
161+
print(initial_loading_bar)
162+
163+
def extended_callback(event):
164+
callback(event)
165+
164166
if initial_loading_bar != get_query_job_loading_string(query_job):
165167
print(get_query_job_loading_string(query_job))
166-
else:
167-
# No progress bar.
168-
query_result = query_job.result(
169-
max_results=max_results, page_size=page_size
168+
169+
else:
170+
extended_callback = callback
171+
172+
try:
173+
extended_callback(
174+
# DONOTSUBMIT: we should create our own events.
175+
google.cloud.bigquery._job_helpers.QueryReceivedEvent(
176+
billing_project=query_job.project,
177+
location=query_job.location,
178+
job_id=query_job.job_id,
179+
statement_type=query_job.statement_type,
180+
state=query_job.state,
181+
query_plan=query_job.query_plan,
182+
created=query_job.created,
183+
started=query_job.started,
184+
ended=query_job.ended,
170185
)
171-
query_job.reload()
172-
return query_result
186+
)
187+
query_results = query_job.result(
188+
page_size=page_size,
189+
max_results=max_results,
190+
)
191+
extended_callback(
192+
# DONOTSUBMIT: we should create our own events.
193+
google.cloud.bigquery._job_helpers.QueryFinishedEvent(
194+
billing_project=query_job.project,
195+
location=query_results.location,
196+
query_id=query_results.query_id,
197+
job_id=query_results.job_id,
198+
total_rows=query_results.total_rows,
199+
total_bytes_processed=query_results.total_bytes_processed,
200+
slot_millis=query_results.slot_millis,
201+
destination=query_job.destination,
202+
created=query_job.created,
203+
started=query_job.started,
204+
ended=query_job.ended,
205+
)
206+
)
207+
return query_results
173208
except api_core_exceptions.RetryError as exc:
174209
add_feedback_link(exc)
175210
raise

bigframes/pandas/io/api.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ def _try_read_gbq_colab_sessionless_dry_run(
273273
with _default_location_lock:
274274
if not config.options.bigquery._session_started:
275275
return _run_read_gbq_colab_sessionless_dry_run(
276-
query, pyformat_args=pyformat_args
276+
query,
277+
pyformat_args=pyformat_args,
277278
)
278279

279280
# Explicitly return None to indicate that we didn't run the dry run query.
@@ -305,6 +306,7 @@ def _read_gbq_colab(
305306
*,
306307
pyformat_args: Optional[Dict[str, Any]] = None,
307308
dry_run: bool = False,
309+
callback: Callable = lambda _: None,
308310
) -> bigframes.dataframe.DataFrame | pandas.Series:
309311
"""A Colab-specific version of read_gbq.
310312
@@ -319,6 +321,8 @@ def _read_gbq_colab(
319321
dry_run (bool):
320322
If True, estimates the query results size without returning data.
321323
The return will be a pandas Series with query metadata.
324+
callback (Callable):
325+
A callback function used by bigframes to report query progress.
322326
323327
Returns:
324328
Union[bigframes.dataframe.DataFrame, pandas.Series]:
@@ -364,6 +368,7 @@ def _read_gbq_colab(
364368
query_or_table,
365369
pyformat_args=pyformat_args,
366370
dry_run=dry_run,
371+
callback=callback,
367372
)
368373

369374

bigframes/session/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ def _read_gbq_colab(
512512
*,
513513
pyformat_args: Optional[Dict[str, Any]] = None,
514514
dry_run: bool = False,
515+
callback: Callable = lambda _: None,
515516
) -> Union[dataframe.DataFrame, pandas.Series]:
516517
"""A version of read_gbq that has the necessary default values for use in colab integrations.
517518
@@ -528,6 +529,11 @@ def _read_gbq_colab(
528529
instead. Note: unlike read_gbq / read_gbq_query, even if set to
529530
None, this function always assumes {var} refers to a variable
530531
that is supposed to be supplied in this dictionary.
532+
dry_run (bool):
533+
If True, estimates the query results size without returning data.
534+
The return will be a pandas Series with query metadata.
535+
callback (Callable):
536+
A callback function used by bigframes to report query progress.
531537
"""
532538
if pyformat_args is None:
533539
pyformat_args = {}
@@ -547,6 +553,7 @@ def _read_gbq_colab(
547553
force_total_order=False,
548554
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
549555
allow_large_results=allow_large_results,
556+
callback=callback,
550557
)
551558

552559
@overload

0 commit comments

Comments
 (0)