Skip to content

Commit da2bf08

Browse files
committed
split method instead of using reloads
1 parent 7cd7371 commit da2bf08

File tree

1 file changed

+55
-58
lines changed

1 file changed

+55
-58
lines changed

bigframes/session/loader.py

Lines changed: 55 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,9 @@ def read_gbq_query(
851851

852852
# TODO(b/420984164): We may want to set a page_size here to limit
853853
# the number of results in the first jobs.query response.
854-
rows, _ = self._start_query(
855-
query, job_config=job_config, query_with_job=False
854+
rows = self._start_query_with_job_optional(
855+
query,
856+
job_config=job_config,
856857
)
857858

858859
# If there is a query job, fetch it so that we can get the
@@ -918,11 +919,12 @@ def _query_to_destination(
918919
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
919920
dry_run_config = bigquery.QueryJobConfig()
920921
dry_run_config.dry_run = True
921-
_, dry_run_job = self._start_query(
922-
query, job_config=dry_run_config, query_with_job=True
922+
dry_run_job = self._start_query_with_job(
923+
query,
924+
job_config=dry_run_config,
923925
)
924926
if dry_run_job.statement_type != "SELECT":
925-
_, query_job = self._start_query(query, query_with_job=True)
927+
query_job = self._start_query_with_job(query)
926928
return query_job.destination, query_job
927929

928930
# Create a table to workaround BigQuery 10 GB query results limit. See:
@@ -956,88 +958,83 @@ def _query_to_destination(
956958
# Write to temp table to workaround BigQuery 10 GB query results
957959
# limit. See: internal issue 303057336.
958960
job_config.labels["error_caught"] = "true"
959-
_, query_job = self._start_query(
961+
query_job = self._start_query_with_job(
960962
query,
961963
job_config=job_config,
962964
timeout=timeout,
963-
query_with_job=True,
964965
)
965966
return query_job.destination, query_job
966967
except google.api_core.exceptions.BadRequest:
967968
# Some SELECT statements still aren't compatible with cluster
968969
# tables as the destination. For example, if the query has a
969970
# top-level ORDER BY, this conflicts with our ability to cluster
970971
# the table by the index column(s).
971-
_, query_job = self._start_query(
972-
query, timeout=timeout, query_with_job=True
973-
)
972+
query_job = self._start_query_with_job(query, timeout=timeout)
974973
return query_job.destination, query_job
975974

976-
@overload
977-
def _start_query(
975+
def _prepare_job_config(
978976
self,
979-
sql: str,
980977
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
981-
timeout: Optional[float] = None,
982-
*,
983-
query_with_job: Literal[True],
984-
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
985-
...
978+
) -> google.cloud.bigquery.QueryJobConfig:
979+
job_config = bigquery.QueryJobConfig() if job_config is None else job_config
986980

987-
@overload
988-
def _start_query(
981+
if bigframes.options.compute.maximum_bytes_billed is not None:
982+
# Maybe this should be pushed down into start_query_with_client
983+
job_config.maximum_bytes_billed = (
984+
bigframes.options.compute.maximum_bytes_billed
985+
)
986+
987+
return job_config
988+
989+
def _start_query_with_job_optional(
989990
self,
990991
sql: str,
992+
*,
991993
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
992994
timeout: Optional[float] = None,
993-
*,
994-
query_with_job: Literal[False],
995-
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
996-
...
995+
) -> google.cloud.bigquery.table.RowIterator:
996+
"""
997+
Starts BigQuery query with job optional and waits for results.
998+
999+
Do not execute dataframe through this API, instead use the executor.
1000+
"""
1001+
job_config = self._prepare_job_config(job_config)
1002+
rows, _ = bf_io_bigquery.start_query_with_client(
1003+
self._bqclient,
1004+
sql,
1005+
job_config=job_config,
1006+
timeout=timeout,
1007+
location=None,
1008+
project=None,
1009+
metrics=None,
1010+
query_with_job=False,
1011+
)
1012+
return rows
9971013

998-
def _start_query(
1014+
def _start_query_with_job(
9991015
self,
10001016
sql: str,
1017+
*,
10011018
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
10021019
timeout: Optional[float] = None,
1003-
*,
1004-
query_with_job: bool = True,
1005-
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
1020+
) -> bigquery.QueryJob:
10061021
"""
10071022
Starts BigQuery query job and waits for results.
10081023
10091024
Do not execute dataframe through this API, instead use the executor.
10101025
"""
1011-
job_config = bigquery.QueryJobConfig() if job_config is None else job_config
1012-
if bigframes.options.compute.maximum_bytes_billed is not None:
1013-
# Maybe this should be pushed down into start_query_with_client
1014-
job_config.maximum_bytes_billed = (
1015-
bigframes.options.compute.maximum_bytes_billed
1016-
)
1017-
1018-
# Trick the type checker into thinking we're using literals.
1019-
if query_with_job:
1020-
return bf_io_bigquery.start_query_with_client(
1021-
self._bqclient,
1022-
sql,
1023-
job_config=job_config,
1024-
timeout=timeout,
1025-
location=None,
1026-
project=None,
1027-
metrics=None,
1028-
query_with_job=True,
1029-
)
1030-
else:
1031-
return bf_io_bigquery.start_query_with_client(
1032-
self._bqclient,
1033-
sql,
1034-
job_config=job_config,
1035-
timeout=timeout,
1036-
location=None,
1037-
project=None,
1038-
metrics=None,
1039-
query_with_job=False,
1040-
)
1026+
job_config = self._prepare_job_config(job_config)
1027+
_, query_job = bf_io_bigquery.start_query_with_client(
1028+
self._bqclient,
1029+
sql,
1030+
job_config=job_config,
1031+
timeout=timeout,
1032+
location=None,
1033+
project=None,
1034+
metrics=None,
1035+
query_with_job=True,
1036+
)
1037+
return query_job
10411038

10421039

10431040
def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:

0 commit comments

Comments
 (0)