Skip to content

Commit 628363b

Browse files
refactor query metadata
1 parent 1f85658 commit 628363b

File tree

3 files changed

+48
-37
lines changed

3 files changed

+48
-37
lines changed

bigframes/session/bq_caching_executor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ def _export_gbq(
320320

321321
# TODO(swast): plumb through the api_name of the user-facing api that
322322
# caused this query.
323-
_, query_job = self._run_execute_query(
323+
iterator, _ = self._run_execute_query(
324324
sql=sql,
325325
job_config=job_config,
326326
)
@@ -338,7 +338,7 @@ def _export_gbq(
338338

339339
return executor.EmptyExecuteResult(
340340
bf_schema=array_value.schema,
341-
query_job=query_job,
341+
execution_metadata=executor.ExecutionMetadata.from_iterator(iterator),
342342
)
343343

344344
def dry_run(
@@ -681,18 +681,20 @@ def _execute_plan_gbq(
681681
assert compiled.row_order is not None
682682
self.cache.cache_results_table(og_plan, result_bq_data)
683683

684+
execution_metadata = executor.ExecutionMetadata.from_iterator(iterator)
684685
if result_bq_data is not None:
685686
return executor.BQTableExecuteResult(
686687
data=result_bq_data,
687688
project_id=self.bqclient.project,
688689
storage_client=self.bqstoragereadclient,
689-
query_job=query_job,
690+
execution_metadata=execution_metadata,
690691
selected_fields=tuple((col, col) for col in og_schema.names),
691692
)
692693
else:
693694
return executor.LocalExecuteResult(
694695
data=iterator.to_arrow().select(og_schema.names),
695696
bf_schema=plan.schema,
697+
execution_metadata=execution_metadata,
696698
)
697699

698700

bigframes/session/direct_gbq_execution.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def execute(
6868
return executor.LocalExecuteResult(
6969
data=iterator.to_arrow(),
7070
bf_schema=plan.schema,
71+
execution_metadata=executor.ExecutionMetadata.from_iterator(iterator),
7172
)
7273

7374
def _run_execute_query(

bigframes/session/executor.py

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import Iterator, Literal, Optional, Sequence, Union
2222

2323
from google.cloud import bigquery, bigquery_storage_v1
24+
import google.cloud.bigquery.table as bq_table
2425
import pandas as pd
2526
import pyarrow
2627
import pyarrow as pa
@@ -40,7 +41,9 @@
4041

4142

4243
class ResultsIterator(Iterator[pa.RecordBatch]):
43-
"""Interface for mutable objects with state represented by a block value object."""
44+
"""
45+
Iterator for query results, with some extra metadata attached.
46+
"""
4447

4548
def __init__(
4649
self,
@@ -141,12 +144,7 @@ def to_py_scalar(self):
141144
class ExecuteResult(abc.ABC):
142145
@property
143146
@abc.abstractmethod
144-
def query_job(self) -> Optional[bigquery.QueryJob]:
145-
...
146-
147-
@property
148-
@abc.abstractmethod
149-
def total_bytes_processed(self) -> Optional[int]:
147+
def execution_metadata(self) -> ExecutionMetadata:
150148
...
151149

152150
@property
@@ -158,18 +156,40 @@ def schema(self) -> bigframes.core.schema.ArraySchema:
158156
def batches(self) -> ResultsIterator:
159157
...
160158

161-
162-
class LocalExecuteResult(ExecuteResult):
163-
def __init__(self, data: pa.Table, bf_schema: bigframes.core.schema.ArraySchema):
164-
self._data = local_data.ManagedArrowTable.from_pyarrow(data, bf_schema)
165-
166159
@property
167160
def query_job(self) -> Optional[bigquery.QueryJob]:
168-
return None
161+
return self.execution_metadata.query_job
169162

170163
@property
171164
def total_bytes_processed(self) -> Optional[int]:
172-
return None
165+
return self.execution_metadata.bytes_processed
166+
167+
168+
@dataclasses.dataclass(frozen=True)
169+
class ExecutionMetadata:
170+
query_job: Optional[bigquery.QueryJob] = None
171+
bytes_processed: Optional[int] = None
172+
173+
@classmethod
174+
def from_iterator(cls, iterator: bq_table.RowIterator) -> ExecutionMetadata:
175+
return cls(
176+
query_job=iterator.query, bytes_processed=iterator.total_bytes_processed
177+
)
178+
179+
180+
class LocalExecuteResult(ExecuteResult):
181+
def __init__(
182+
self,
183+
data: pa.Table,
184+
bf_schema: bigframes.core.schema.ArraySchema,
185+
execution_metadata: ExecutionMetadata = ExecutionMetadata(),
186+
):
187+
self._data = local_data.ManagedArrowTable.from_pyarrow(data, bf_schema)
188+
self._execution_metadata = execution_metadata
189+
190+
@property
191+
def execution_metadata(self) -> ExecutionMetadata:
192+
return self._execution_metadata
173193

174194
@property
175195
def schema(self) -> bigframes.core.schema.ArraySchema:
@@ -188,20 +208,14 @@ class EmptyExecuteResult(ExecuteResult):
188208
def __init__(
189209
self,
190210
bf_schema: bigframes.core.schema.ArraySchema,
191-
query_job: Optional[bigquery.QueryJob] = None,
211+
execution_metadata: ExecutionMetadata = ExecutionMetadata(),
192212
):
193213
self._schema = bf_schema
194-
self._query_job = query_job
214+
self._execution_metadata = execution_metadata
195215

196216
@property
197-
def query_job(self) -> Optional[bigquery.QueryJob]:
198-
return self._query_job
199-
200-
@property
201-
def total_bytes_processed(self) -> Optional[int]:
202-
if self.query_job:
203-
return self.query_job.total_bytes_processed
204-
return None
217+
def execution_metadata(self) -> ExecutionMetadata:
218+
return self._execution_metadata
205219

206220
@property
207221
def schema(self) -> bigframes.core.schema.ArraySchema:
@@ -218,28 +232,22 @@ def __init__(
218232
storage_client: bigquery_storage_v1.BigQueryReadClient,
219233
project_id: str,
220234
*,
221-
query_job: Optional[bigquery.QueryJob] = None,
235+
execution_metadata: ExecutionMetadata = ExecutionMetadata(),
222236
limit: Optional[int] = None,
223237
selected_fields: Optional[Sequence[tuple[str, str]]] = None,
224238
):
225239
self._data = data
226240
self._project_id = project_id
227-
self._query_job = query_job
241+
self._execution_metadata = execution_metadata
228242
self._storage_client = storage_client
229243
self._limit = limit
230244
self._selected_fields = selected_fields or [
231245
(name, name) for name in data.schema.names
232246
]
233247

234248
@property
235-
def query_job(self) -> Optional[bigquery.QueryJob]:
236-
return self._query_job
237-
238-
@property
239-
def total_bytes_processed(self) -> Optional[int]:
240-
if self.query_job:
241-
return self.query_job.total_bytes_processed
242-
return None
249+
def execution_metadata(self) -> ExecutionMetadata:
250+
return self._execution_metadata
243251

244252
@property
245253
@functools.cache

0 commit comments

Comments
 (0)