Skip to content

Commit 1eccb3a

Browse files
committed
fix: show progress even in job optional queries
1 parent 8fc098a commit 1eccb3a

File tree

3 files changed

+207
-2
lines changed

3 files changed

+207
-2
lines changed

bigframes/core/events.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import dataclasses
18+
import datetime
19+
import threading
20+
from typing import List, Optional
21+
import weakref
22+
23+
import google.cloud.bigquery._job_helpers
24+
import google.cloud.bigquery.job.query
25+
import google.cloud.bigquery.table
26+
27+
28+
@dataclasses.dataclass(frozen=True)
29+
class Subscriber:
30+
callback_ref: weakref.ref
31+
# TODO(tswast): Add block_id to allow filter in context managers.
32+
33+
34+
class Publisher:
35+
def __init__(self):
36+
self._subscribers: List[Subscriber] = []
37+
self._subscribers_lock = threading.Lock()
38+
39+
def subscribe(self, callback):
40+
subscriber = Subscriber(callback_ref=weakref.ref(callback))
41+
42+
with self._subscribers_lock:
43+
# TODO(tswast): Add block_id to allow filter in context managers.
44+
self._subscribers.append(subscriber)
45+
46+
def send(self, event: Event):
47+
to_delete = []
48+
to_call = []
49+
50+
with self._subscribers_lock:
51+
for sid, subscriber in enumerate(self._subscribers):
52+
callback = subscriber.callback_ref()
53+
54+
if callback is None:
55+
to_delete.append(sid)
56+
else:
57+
# TODO(tswast): Add if statement for block_id to allow filter
58+
# in context managers.
59+
to_call.append(callback)
60+
61+
for sid in reversed(to_delete):
62+
del self._subscribers[sid]
63+
64+
for callback in to_call:
65+
callback(event)
66+
67+
68+
publisher = Publisher()
69+
70+
71+
class Event:
72+
pass
73+
74+
75+
class ExecutionStarted(Event):
76+
pass
77+
78+
79+
class ExecutionRunning(Event):
80+
pass
81+
82+
83+
class ExecutionStopped(Event):
84+
pass
85+
86+
87+
@dataclasses.dataclass(frozen=True)
88+
class BigQuerySentEvent(ExecutionStarted):
89+
"""Query sent to BigQuery."""
90+
91+
query: str
92+
billing_project: Optional[str] = None
93+
location: Optional[str] = None
94+
job_id: Optional[str] = None
95+
request_id: Optional[str] = None
96+
97+
@classmethod
98+
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
99+
return cls(
100+
query=event.query,
101+
billing_project=event.billing_project,
102+
location=event.location,
103+
job_id=event.job_id,
104+
request_id=event.request_id,
105+
)
106+
107+
108+
@dataclasses.dataclass(frozen=True)
109+
class BigQueryRetryEvent(ExecutionRunning):
110+
"""Query sent another time because the previous attempt failed."""
111+
112+
query: str
113+
billing_project: Optional[str] = None
114+
location: Optional[str] = None
115+
job_id: Optional[str] = None
116+
request_id: Optional[str] = None
117+
118+
@classmethod
119+
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
120+
return cls(
121+
query=event.query,
122+
billing_project=event.billing_project,
123+
location=event.location,
124+
job_id=event.job_id,
125+
request_id=event.request_id,
126+
)
127+
128+
129+
@dataclasses.dataclass(frozen=True)
130+
class BigQueryReceivedEvent(ExecutionRunning):
131+
"""Query received and acknowledged by the BigQuery API."""
132+
133+
billing_project: Optional[str] = None
134+
location: Optional[str] = None
135+
job_id: Optional[str] = None
136+
statement_type: Optional[str] = None
137+
state: Optional[str] = None
138+
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
139+
created: Optional[datetime.datetime] = None
140+
started: Optional[datetime.datetime] = None
141+
ended: Optional[datetime.datetime] = None
142+
143+
@classmethod
144+
def from_bqclient(
145+
cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
146+
):
147+
return cls(
148+
billing_project=event.billing_project,
149+
location=event.location,
150+
job_id=event.job_id,
151+
statement_type=event.statement_type,
152+
state=event.state,
153+
query_plan=event.query_plan,
154+
created=event.created,
155+
started=event.started,
156+
ended=event.ended,
157+
)
158+
159+
160+
@dataclasses.dataclass(frozen=True)
161+
class BigQueryFinishedEvent(ExecutionStopped):
162+
"""Query finished successfully."""
163+
164+
billing_project: Optional[str] = None
165+
location: Optional[str] = None
166+
query_id: Optional[str] = None
167+
job_id: Optional[str] = None
168+
destination: Optional[google.cloud.bigquery.table.TableReference] = None
169+
total_rows: Optional[int] = None
170+
total_bytes_processed: Optional[int] = None
171+
slot_millis: Optional[int] = None
172+
created: Optional[datetime.datetime] = None
173+
started: Optional[datetime.datetime] = None
174+
ended: Optional[datetime.datetime] = None
175+
176+
@classmethod
177+
def from_bqclient(
178+
cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
179+
):
180+
return cls(
181+
billing_project=event.billing_project,
182+
location=event.location,
183+
query_id=event.query_id,
184+
job_id=event.job_id,
185+
destination=event.destination,
186+
total_rows=event.total_rows,
187+
total_bytes_processed=event.total_bytes_processed,
188+
slot_millis=event.slot_millis,
189+
created=event.created,
190+
started=event.started,
191+
ended=event.ended,
192+
)
193+
194+
195+
@dataclasses.dataclass(frozen=True)
196+
class BigQueryUnknownEvent(ExecutionRunning):
197+
"""Got unknown event from the BigQuery client library."""
198+
199+
# TODO: should we just skip sending unknown events?
200+
201+
event: object
202+
203+
@classmethod
204+
def from_bqclient(cls, event):
205+
return cls(event)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
"gcsfs >=2023.3.0, !=2025.5.0",
4040
"geopandas >=0.12.2",
4141
"google-auth >=2.15.0,<3.0",
42-
"google-cloud-bigquery[bqstorage,pandas] >=3.31.0",
42+
"google-cloud-bigquery[bqstorage,pandas] >=3.38.0",
4343
# 2.30 needed for arrow support.
4444
"google-cloud-bigquery-storage >= 2.30.0, < 3.0.0",
4545
"google-cloud-functions >=1.12.0",

testing/constraints-3.9.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ geopandas==0.12.2
66
google-auth==2.15.0
77
google-cloud-bigtable==2.24.0
88
google-cloud-pubsub==2.21.4
9-
google-cloud-bigquery==3.31.0
9+
google-cloud-bigquery==3.38.0
1010
google-cloud-functions==1.12.0
1111
google-cloud-bigquery-connection==1.12.0
1212
google-cloud-iam==2.12.1

0 commit comments

Comments
 (0)