From e65e8a67d956daf39bedeed487324be16842f0d9 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 19 Aug 2025 19:35:05 +0000 Subject: [PATCH 1/5] refactor: Unify bigquery execution paths --- bigframes/core/blocks.py | 66 +++- bigframes/core/indexes/base.py | 11 +- bigframes/dataframe.py | 56 ++-- bigframes/session/bq_caching_executor.py | 382 +++++++++++------------ bigframes/session/execution_spec.py | 53 ++++ bigframes/session/executor.py | 47 +-- bigframes/testing/compiler_session.py | 7 + bigframes/testing/polars_session.py | 37 +-- tests/system/small/test_session.py | 7 +- 9 files changed, 355 insertions(+), 311 deletions(-) create mode 100644 bigframes/session/execution_spec.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d2662da509..fa35a0a8e6 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -69,7 +69,7 @@ import bigframes.exceptions as bfe import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops -from bigframes.session import dry_runs +from bigframes.session import dry_runs, execution_spec from bigframes.session import executor as executors # Type constraint for wherever column labels are used @@ -257,7 +257,10 @@ def shape(self) -> typing.Tuple[int, int]: except Exception: pass - row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar() + row_count = self.session._executor.execute( + self.expr.row_count(), + execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False), + ).to_py_scalar() return (row_count, len(self.value_columns)) @property @@ -539,8 +542,17 @@ def to_arrow( allow_large_results: Optional[bool] = None, ) -> Tuple[pa.Table, Optional[bigquery.QueryJob]]: """Run query and download results as a pyarrow Table.""" + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else bigframes.options._allow_large_results + ) execute_result = self.session._executor.execute( - self.expr, ordered=ordered, use_explicit_destination=allow_large_results + self.expr, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=ordered, + ), ) pa_table = execute_result.to_arrow_table() @@ -629,8 +641,15 @@ def try_peek( self, n: int = 20, force: bool = False, allow_large_results=None ) -> typing.Optional[pd.DataFrame]: if force or self.expr.supports_fast_peek: - result = self.session._executor.peek( - self.expr, n, use_explicit_destination=allow_large_results + # really, we should just block insane peek values and always assume <10gb + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else bigframes.options._allow_large_results + ) + result = self.session._executor.execute( + self.expr, + execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n), ) df = result.to_pandas() return self._copy_index_to_pandas(df) @@ -647,10 +666,18 @@ def to_pandas_batches( page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result""" + + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else bigframes.options._allow_large_results + ) execute_result = self.session._executor.execute( self.expr, - ordered=True, - use_explicit_destination=allow_large_results, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=True, + ), ) # To reduce the number of edge cases to consider when working with the @@ -696,10 +723,17 @@ def _materialize_local( ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. + under_10gb = ( + (not materialize_options.allow_large_results) + if (materialize_options.allow_large_results is not None) + else bigframes.options._allow_large_results + ) execute_result = self.session._executor.execute( self.expr, - ordered=materialize_options.ordered, - use_explicit_destination=materialize_options.allow_large_results, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=True, + ), ) sample_config = materialize_options.downsampling if execute_result.total_bytes is not None: @@ -1616,9 +1650,19 @@ def retrieve_repr_request_results( config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"), ) head_result = self.session._executor.execute( - self.expr.slice(start=None, stop=max_results, step=None) + self.expr.slice(start=None, stop=max_results, step=None), + execution_spec.ExecutionSpec( + promise_under_10gb=True, + ordered=True, + ), ) - row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar() + row_count = self.session._executor.execute( + self.expr.row_count(), + execution_spec.ExecutionSpec( + promise_under_10gb=True, + ordered=False, + ), + ).to_py_scalar() head_df = head_result.to_pandas() return self._copy_index_to_pandas(head_df), row_count, head_result.query_job diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index e022b3f151..f8ec38621d 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -38,6 +38,7 @@ import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops import bigframes.series +import bigframes.session.execution_spec as ex_spec if typing.TYPE_CHECKING: import bigframes.dataframe @@ -283,8 +284,9 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]: # Check if key exists at all by counting count_agg = ex.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id)) count_result = filtered_block._expr.aggregate([(count_agg, "count")]) + count_scalar = self._block.session._executor.execute( - count_result + count_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ).to_py_scalar() if count_scalar == 0: @@ -295,7 +297,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]: min_agg = ex.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id)) position_result = filtered_block._expr.aggregate([(min_agg, "position")]) position_scalar = self._block.session._executor.execute( - position_result + position_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ).to_py_scalar() return int(position_scalar) @@ -326,7 +328,10 @@ def _get_monotonic_slice(self, filtered_block, offsets_id: str) -> slice: combined_result = filtered_block._expr.aggregate(min_max_aggs) # Execute query and extract positions - result_df = self._block.session._executor.execute(combined_result).to_pandas() + result_df = self._block.session._executor.execute( + combined_result, + execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True), + ).to_pandas() min_pos = int(result_df["min_pos"].iloc[0]) max_pos = int(result_df["max_pos"].iloc[0]) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index bcad00830d..a1e672e730 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -84,6 +84,7 @@ import bigframes.operations.structs import bigframes.series import bigframes.session._io.bigquery +import bigframes.session.execution_spec as ex_spec if typing.TYPE_CHECKING: from _typeshed import SupportsRichComparison @@ -4184,17 +4185,19 @@ def to_csv( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - options = { + options: dict[str, Union[bool, str]] = { "field_delimiter": sep, "header": header, } - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path_or_buf, - format="csv", - export_options=options, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec( + uri=path_or_buf, format="csv", export_options=tuple(options.items()) + ) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_json( @@ -4237,13 +4240,13 @@ def to_json( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path_or_buf, - format="json", - export_options={}, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=()) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_gbq( @@ -4316,16 +4319,21 @@ def to_gbq( ) ) - query_job = self._session._executor.export_gbq( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - destination=destination, - cluster_cols=clustering_fields, - if_exists=if_exists, + ex_spec.ExecutionSpec( + ex_spec.TableOutputSpec( + destination, + cluster_cols=tuple(clustering_fields), + if_exists=if_exists, + ) + ), ) - self._set_internal_query_job(query_job) + assert result.query_job is not None + self._set_internal_query_job(result.query_job) # The query job should have finished, so there should be always be a result table. - result_table = query_job.destination + result_table = result.query_job.destination assert result_table is not None if temp_table_ref: @@ -4393,13 +4401,17 @@ def to_parquet( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path, - format="parquet", - export_options=export_options, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec( + uri=path, + format="parquet", + export_options=tuple(export_options.items()), + ) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_dict( diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index a970e75a0f..aeaf895d42 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -14,11 +14,10 @@ from __future__ import annotations -import dataclasses import math import os import threading -from typing import cast, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import Literal, Mapping, Optional, Sequence, Tuple import warnings import weakref @@ -35,6 +34,7 @@ from bigframes.core import compile, local_data, rewrite import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir import bigframes.core.guid +import bigframes.core.identifiers import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.schema as schemata @@ -49,6 +49,7 @@ semi_executor, ) import bigframes.session._io.bigquery as bq_io +import bigframes.session.execution_spec as ex_spec import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage @@ -61,21 +62,6 @@ MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G -@dataclasses.dataclass -class OutputSpec: - require_bq_table: bool - cluster_cols: tuple[str, ...] - - def with_require_table(self, value: bool) -> OutputSpec: - return dataclasses.replace(self, require_bq_table=value) - - -def _get_default_output_spec() -> OutputSpec: - return OutputSpec( - require_bq_table=bigframes.options._allow_large_results, cluster_cols=() - ) - - SourceIdMapping = Mapping[str, str] @@ -189,7 +175,11 @@ def to_sql( ) -> str: if offset_column: array_value, _ = array_value.promote_offsets() - node = self.logical_plan(array_value.node) if enable_cache else array_value.node + node = ( + self.prepare_plan(array_value.node, locality="original") + if enable_cache + else array_value.node + ) node = self._substitute_large_local_sources(node) compiled = compile.compile_sql(compile.CompileRequest(node, sort_rows=ordered)) return compiled.sql @@ -197,86 +187,108 @@ def to_sql( def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = None, + execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: - if bigframes.options.compute.enable_multi_query_execution: - self._simplify_with_caching(array_value) - - output_spec = _get_default_output_spec() - if use_explicit_destination is not None: - output_spec = output_spec.with_require_table(use_explicit_destination) + # TODO: Support export jobs in combination with semi executors + if execution_spec.destination_spec is None: + plan = self.prepare_plan(array_value.node, locality="original") + for exec in self._semi_executors: + maybe_result = exec.execute( + plan, ordered=execution_spec.ordered, peek=execution_spec.peek + ) + if maybe_result: + return maybe_result - plan = self.logical_plan(array_value.node) - return self._execute_plan( - plan, - ordered=ordered, - output_spec=output_spec, + # next: prepare output spec + if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): + if execution_spec.peek or execution_spec.ordered: + raise NotImplementedError( + "Ordering and peeking not supported for gbq export" + ) + # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml + return self._export_gbq(array_value, execution_spec.destination_spec) + + result = self._execute_plan_gbq( + array_value.node, + ordered=execution_spec.ordered, + peek=execution_spec.peek, + cache_spec=execution_spec.destination_spec + if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) + else None, + must_create_table=not execution_spec.promise_under_10gb, ) + # post steps: export + if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): + self._export_result_gcs(result, execution_spec.destination_spec) - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = None, - ) -> executor.ExecuteResult: - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - plan = self.logical_plan(array_value.node) - if not tree_properties.can_fast_peek(plan): - msg = bfe.format_message("Peeking this value cannot be done efficiently.") - warnings.warn(msg) - - output_spec = _get_default_output_spec() - if use_explicit_destination is not None: - output_spec = output_spec.with_require_table(use_explicit_destination) + return result - return self._execute_plan( - plan, ordered=False, output_spec=output_spec, peek=n_rows - ) - - def export_gbq( - self, - array_value: bigframes.core.ArrayValue, - destination: bigquery.TableReference, - if_exists: Literal["fail", "replace", "append"] = "fail", - cluster_cols: Sequence[str] = [], + def _export_result_gcs( + self, result: executor.ExecuteResult, gcs_export_spec: ex_spec.GcsOutputSpec ): - """ - Export the ArrayValue to an existing BigQuery table. - """ - if bigframes.options.compute.enable_multi_query_execution: - self._simplify_with_caching(array_value) + query_job = result.query_job + assert query_job is not None + result_table = query_job.destination + assert result_table is not None + export_data_statement = bq_io.create_export_data_statement( + f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", + uri=gcs_export_spec.uri, + format=gcs_export_spec.format, + export_options=dict(gcs_export_spec.export_options), + ) + bq_io.start_query_with_client( + self.bqclient, + export_data_statement, + job_config=bigquery.QueryJobConfig(), + metrics=self.metrics, + project=None, + location=None, + timeout=None, + query_with_job=True, + ) - table_exists = True + def _maybe_find_existing_table( + self, spec: ex_spec.TableOutputSpec + ) -> Optional[bigquery.Table]: + # validate destination table try: - table = self.bqclient.get_table(destination) - if if_exists == "fail": - raise ValueError(f"Table already exists: {destination.__str__()}") - except google.api_core.exceptions.NotFound: - table_exists = False + table = self.bqclient.get_table(spec.table) + if spec.if_exists == "fail": + raise ValueError(f"Table already exists: {spec.table.__str__()}") - if len(cluster_cols) != 0: - if table_exists and table.clustering_fields != cluster_cols: + if table.clustering_fields != spec.cluster_cols: raise ValueError( "Table clustering fields cannot be changed after the table has " f"been created. Existing clustering fields: {table.clustering_fields}" ) + return table + except google.api_core.exceptions.NotFound: + return None + + def _export_gbq( + self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec + ) -> executor.ExecuteResult: + """ + Export the ArrayValue to an existing BigQuery table. + """ + + # validate destination table + existing_table = self._maybe_find_existing_table(spec) sql = self.to_sql(array_value, ordered=False) - if table_exists and _if_schema_match(table.schema, array_value.schema): + if (existing_table is not None) and _if_schema_match( + existing_table.schema, array_value.schema + ): # b/409086472: Uses DML for table appends and replacements to avoid # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: # https://cloud.google.com/bigquery/quotas#standard_tables job_config = bigquery.QueryJobConfig() ir = sqlglot_ir.SQLGlotIR.from_query_string(sql) - if if_exists == "append": - sql = ir.insert(destination) + if spec.if_exists == "append": + sql = ir.insert(spec.table) else: # for "replace" - assert if_exists == "replace" - sql = ir.replace(destination) + assert spec.if_exists == "replace" + sql = ir.replace(spec.table) else: dispositions = { "fail": bigquery.WriteDisposition.WRITE_EMPTY, @@ -284,14 +296,14 @@ def export_gbq( "append": bigquery.WriteDisposition.WRITE_APPEND, } job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[if_exists], - destination=destination, - clustering_fields=cluster_cols if cluster_cols else None, + write_disposition=dispositions[spec.if_exists], + destination=spec.table, + clustering_fields=spec.cluster_cols if spec.cluster_cols else None, ) # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. - _, query_job = self._run_execute_query( + row_iter, query_job = self._run_execute_query( sql=sql, job_config=job_config, ) @@ -300,48 +312,16 @@ def export_gbq( t == bigframes.dtypes.TIMEDELTA_DTYPE for t in array_value.schema.dtypes ) - if if_exists != "append" and has_timedelta_col: + if spec.if_exists != "append" and has_timedelta_col: # Only update schema if this is not modifying an existing table, and the # new table contains timedelta columns. - table = self.bqclient.get_table(destination) + table = self.bqclient.get_table(spec.table) table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) - return query_job - - def export_gcs( - self, - array_value: bigframes.core.ArrayValue, - uri: str, - format: Literal["json", "csv", "parquet"], - export_options: Mapping[str, Union[bool, str]], - ): - query_job = self.execute( - array_value, - ordered=False, - use_explicit_destination=True, - ).query_job - assert query_job is not None - result_table = query_job.destination - assert result_table is not None - export_data_statement = bq_io.create_export_data_statement( - f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", - uri=uri, - format=format, - export_options=dict(export_options), - ) - - bq_io.start_query_with_client( - self.bqclient, - export_data_statement, - job_config=bigquery.QueryJobConfig(), - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, + return executor.ExecuteResult( + row_iter.to_arrow_iterable(), array_value.schema, query_job ) - return query_job def dry_run( self, array_value: bigframes.core.ArrayValue, ordered: bool = True @@ -446,59 +426,46 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): # Once rewriting is available, will want to rewrite before # evaluating execution cost. return tree_properties.is_trivially_executable( - self.logical_plan(array_value.node) + self.prepare_plan(array_value.node) ) - def logical_plan(self, root: nodes.BigFrameNode) -> nodes.BigFrameNode: + def prepare_plan( + self, + root: nodes.BigFrameNode, + locality: Literal["bq_compat", "original"] = "bq_compat", + ) -> nodes.BigFrameNode: """ Apply universal logical simplifications that are helpful regardless of engine. """ plan = self.replace_cached_subtrees(root) plan = rewrite.column_pruning(plan) plan = plan.top_down(rewrite.fold_row_counts) + + if locality == "bq_compat": + plan = self._substitute_large_local_sources(plan) + return plan def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" - plan = self.logical_plan(array_value.node) - plan = self._substitute_large_local_sources(plan) - compiled = compile.compile_sql( - compile.CompileRequest( - plan, sort_rows=False, materialize_all_order_keys=True - ) + execution_spec = ex_spec.ExecutionSpec( + destination_spec=ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols)) ) - tmp_table_ref, num_rows = self._sql_as_cached_temp_table( - compiled.sql, - compiled.sql_schema, - cluster_cols=bq_io.select_cluster_cols(compiled.sql_schema, cluster_cols), - ) - tmp_table = self.bqclient.get_table(tmp_table_ref) - assert compiled.row_order is not None - self.cache.cache_results_table( - array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + self.execute( + array_value, + execution_spec=execution_spec, ) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" - offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") - w_offsets, offset_column = array_value.promote_offsets() - compiled = compile.compile_sql( - compile.CompileRequest( - self.logical_plan(self._substitute_large_local_sources(w_offsets.node)), - sort_rows=False, - ) + execution_spec = ex_spec.ExecutionSpec( + destination_spec=ex_spec.CacheSpec(cluster_cols=tuple()) ) - tmp_table_ref, num_rows = self._sql_as_cached_temp_table( - compiled.sql, - compiled.sql_schema, - cluster_cols=[offset_column], - ) - tmp_table = self.bqclient.get_table(tmp_table_ref) - assert compiled.row_order is not None - self.cache.cache_results_table( - array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + self.execute( + array_value, + execution_spec=execution_spec, ) def _cache_with_session_awareness( @@ -520,17 +487,14 @@ def _cache_with_session_awareness( else: self._cache_with_cluster_cols(bigframes.core.ArrayValue(target), []) - def _simplify_with_caching(self, array_value: bigframes.core.ArrayValue): + def _simplify_with_caching(self, plan: nodes.BigFrameNode): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first for _ in range(MAX_SUBTREE_FACTORINGS): - if ( - self.logical_plan(array_value.node).planning_complexity - < QUERY_COMPLEXITY_LIMIT - ): + if self.prepare_plan(plan).planning_complexity < QUERY_COMPLEXITY_LIMIT: return - did_cache = self._cache_most_complex_subtree(array_value.node) + did_cache = self._cache_most_complex_subtree(plan) if not did_cache: return @@ -552,29 +516,6 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: self._cache_with_cluster_cols(bigframes.core.ArrayValue(selection), []) return True - def _sql_as_cached_temp_table( - self, - sql: str, - schema: Sequence[bigquery.SchemaField], - cluster_cols: Sequence[str], - ) -> tuple[bigquery.TableReference, Optional[int]]: - assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS - temp_table = self.storage_manager.create_temp_table(schema, cluster_cols) - - # TODO: Get default job config settings - job_config = cast( - bigquery.QueryJobConfig, - bigquery.QueryJobConfig.from_api_repr({}), - ) - job_config.destination = temp_table - _, query_job = self._run_execute_query( - sql, - job_config=job_config, - ) - assert query_job is not None - iter = query_job.result() - return query_job.destination, iter.total_rows - def _validate_result_schema( self, array_value: bigframes.core.ArrayValue, @@ -582,7 +523,7 @@ def _validate_result_schema( ): actual_schema = _sanitize(tuple(bq_schema)) ibis_schema = compile.test_only_ibis_inferred_schema( - self.logical_plan(array_value.node) + self.prepare_plan(array_value.node, locality="original") ).to_bigquery() internal_schema = _sanitize(array_value.schema.to_bigquery()) if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: @@ -646,52 +587,83 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable): ) self.cache.cache_remote_replacement(local_table, uploaded) - def _execute_plan( + def _execute_plan_gbq( self, plan: nodes.BigFrameNode, ordered: bool, - output_spec: OutputSpec, peek: Optional[int] = None, + cache_spec: Optional[ex_spec.CacheSpec] = None, + must_create_table: bool = True, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # First try to execute fast-paths - if not output_spec.require_bq_table: - for exec in self._semi_executors: - maybe_result = exec.execute(plan, ordered=ordered, peek=peek) - if maybe_result: - return maybe_result + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + + og_plan = plan + og_schema = plan.schema - # Use explicit destination to avoid 10GB limit of temporary table - destination_table = ( - self.storage_manager.create_temp_table( - plan.schema.to_bigquery(), cluster_cols=output_spec.cluster_cols + plan = self.prepare_plan(plan, locality="bq_compat") + if bigframes.options.compute.enable_multi_query_execution: + self._simplify_with_caching(plan) + + create_table = must_create_table + cluster_cols: Sequence[str] = [] + if cache_spec is not None: + if peek is not None: + raise ValueError("peek is not compatible with caching.") + + create_table = True + if not cache_spec.cluster_cols: + assert len(cache_spec.cluster_cols) <= _MAX_CLUSTER_COLUMNS + offsets_id = bigframes.core.identifiers.ColumnId( + bigframes.core.guid.generate_guid() + ) + plan = nodes.PromoteOffsetsNode(plan, offsets_id) + cluster_cols = [offsets_id.sql] + else: + cluster_cols = cache_spec.cluster_cols + + compiled = compile.compile_sql( + compile.CompileRequest( + plan, + sort_rows=ordered, + peek_count=peek, + materialize_all_order_keys=(cache_spec is not None), ) - if output_spec.require_bq_table - else None ) + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema + + destination_table: Optional[bigquery.TableReference] = None - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. job_config = bigquery.QueryJobConfig() - # Use explicit destination to avoid 10GB limit of temporary table - if destination_table is not None: + if create_table: + destination_table = self.storage_manager.create_temp_table( + compiled_schema, cluster_cols + ) job_config.destination = destination_table - plan = self._substitute_large_local_sources(plan) - compiled = compile.compile_sql( - compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) - ) iterator, query_job = self._run_execute_query( sql=compiled.sql, job_config=job_config, query_with_job=(destination_table is not None), ) - if query_job: - size_bytes = self.bqclient.get_table(query_job.destination).num_bytes + table_info: Optional[bigquery.Table] = None + if query_job and query_job.destination: + table_info = self.bqclient.get_table(query_job.destination) + size_bytes = table_info.num_bytes else: size_bytes = None + # we could actually cache even when caching is not explicitly requested, but being conservative for now + if cache_spec is not None: + assert table_info is not None + assert compiled.row_order is not None + self.cache.cache_results_table( + og_plan, table_info, compiled.row_order, num_rows=table_info.num_rows + ) + if size_bytes is not None and size_bytes >= MAX_SMALL_RESULT_BYTES: msg = bfe.format_message( "The query result size has exceeded 10 GB. In BigFrames 2.0 and " @@ -711,7 +683,7 @@ def _execute_plan( _arrow_batches=iterator.to_arrow_iterable( bqstorage_client=self.bqstoragereadclient ), - schema=plan.schema, + schema=og_schema, query_job=query_job, total_bytes=size_bytes, total_rows=iterator.total_rows, diff --git a/bigframes/session/execution_spec.py b/bigframes/session/execution_spec.py new file mode 100644 index 0000000000..c9431dbd11 --- /dev/null +++ b/bigframes/session/execution_spec.py @@ -0,0 +1,53 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +from typing import Literal, Optional, Union + +from google.cloud import bigquery + + +@dataclasses.dataclass(frozen=True) +class ExecutionSpec: + destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None + peek: Optional[int] = None + ordered: bool = ( + False # ordered and promise_under_10gb must both be together for bq execution + ) + # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur + promise_under_10gb: bool = False + + +# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes +# that will be cached only when a super-tree is executed +@dataclasses.dataclass(frozen=True) +class CacheSpec: + cluster_cols: tuple[str, ...] + + +@dataclasses.dataclass(frozen=True) +class TableOutputSpec: + table: bigquery.TableReference + cluster_cols: tuple[str, ...] + if_exists: Literal["fail", "replace", "append"] = "fail" + + +@dataclasses.dataclass(frozen=True) +class GcsOutputSpec: + uri: str + format: Literal["json", "csv", "parquet"] + # sequence of (option, value) pairs + export_options: tuple[tuple[str, Union[bool, str]], ...] diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index cc8f086f9f..748b10647a 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -18,7 +18,7 @@ import dataclasses import functools import itertools -from typing import Iterator, Literal, Mapping, Optional, Sequence, Union +from typing import Iterator, Literal, Optional, Union from google.cloud import bigquery import pandas as pd @@ -29,6 +29,7 @@ from bigframes.core import pyarrow_utils import bigframes.core.schema import bigframes.session._io.pandas as io_pandas +import bigframes.session.execution_spec as ex_spec _ROW_LIMIT_EXCEEDED_TEMPLATE = ( "Execution has downloaded {result_rows} rows so far, which exceeds the " @@ -147,41 +148,16 @@ def to_sql( """ raise NotImplementedError("to_sql not implemented for this executor") + @abc.abstractmethod def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = False, + execution_spec: ex_spec.ExecutionSpec, ) -> ExecuteResult: """ - Execute the ArrayValue, storing the result to a temporary session-owned table. - """ - raise NotImplementedError("execute not implemented for this executor") - - def export_gbq( - self, - array_value: bigframes.core.ArrayValue, - destination: bigquery.TableReference, - if_exists: Literal["fail", "replace", "append"] = "fail", - cluster_cols: Sequence[str] = [], - ) -> bigquery.QueryJob: - """ - Export the ArrayValue to an existing BigQuery table. + Execute the ArrayValue. """ - raise NotImplementedError("export_gbq not implemented for this executor") - - def export_gcs( - self, - array_value: bigframes.core.ArrayValue, - uri: str, - format: Literal["json", "csv", "parquet"], - export_options: Mapping[str, Union[bool, str]], - ) -> bigquery.QueryJob: - """ - Export the ArrayValue to gcs. - """ - raise NotImplementedError("export_gcs not implemented for this executor") + ... def dry_run( self, array_value: bigframes.core.ArrayValue, ordered: bool = True @@ -193,17 +169,6 @@ def dry_run( """ raise NotImplementedError("dry_run not implemented for this executor") - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = False, - ) -> ExecuteResult: - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - raise NotImplementedError("peek not implemented for this executor") - def cached( self, array_value: bigframes.core.ArrayValue, diff --git a/bigframes/testing/compiler_session.py b/bigframes/testing/compiler_session.py index 35114d95d0..289b2600fd 100644 --- a/bigframes/testing/compiler_session.py +++ b/bigframes/testing/compiler_session.py @@ -41,3 +41,10 @@ def to_sql( return self.compiler.SQLGlotCompiler().compile( array_value.node, ordered=ordered ) + + def execute( + self, + array_value, + execution_spec, + ): + raise NotImplementedError("SQLCompilerExecutor.execute not implemented") diff --git a/bigframes/testing/polars_session.py b/bigframes/testing/polars_session.py index 3710c40eae..29eae20b7a 100644 --- a/bigframes/testing/polars_session.py +++ b/bigframes/testing/polars_session.py @@ -13,7 +13,7 @@ # limitations under the License. import dataclasses -from typing import Optional, Union +from typing import Union import weakref import pandas @@ -23,48 +23,31 @@ import bigframes.core.blocks import bigframes.core.compile.polars import bigframes.dataframe +import bigframes.session.execution_spec import bigframes.session.executor import bigframes.session.metrics -# Does not support to_sql, export_gbq, export_gcs, dry_run, peek, head, get_row_count, cached +# Does not support to_sql, dry_run, peek, cached @dataclasses.dataclass class TestExecutor(bigframes.session.executor.Executor): compiler = bigframes.core.compile.polars.PolarsCompiler() - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = False, - ): - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) - pa_table = lazy_frame.collect().limit(n_rows).to_arrow() - # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. - # Nullability may be different, and might use large versions of list, string datatypes. - return bigframes.session.executor.ExecuteResult( - _arrow_batches=pa_table.to_batches(), - schema=array_value.schema, - total_bytes=pa_table.nbytes, - total_rows=pa_table.num_rows, - ) - def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = False, - page_size: Optional[int] = None, - max_results: Optional[int] = None, + execution_spec: bigframes.session.execution_spec.ExecutionSpec, ): """ Execute the ArrayValue, storing the result to a temporary session-owned table. """ + if execution_spec.destination_spec is not None: + raise ValueError( + f"TestExecutor does not support destination spec: {execution_spec.destination_spec}" + ) lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) + if execution_spec.peek is not None: + lazy_frame = lazy_frame.limit(execution_spec.peek) pa_table = lazy_frame.collect().to_arrow() # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes. diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index a04da64af0..3e65136350 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -36,6 +36,7 @@ import bigframes.dataframe import bigframes.dtypes import bigframes.ml.linear_model +import bigframes.session.execution_spec from bigframes.testing import utils all_write_engines = pytest.mark.parametrize( @@ -113,7 +114,8 @@ def test_read_gbq_tokyo( # use_explicit_destination=True, otherwise might use path with no query_job exec_result = session_tokyo._executor.execute( - df._block.expr, use_explicit_destination=True + df._block.expr, + bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False), ) assert exec_result.query_job is not None assert exec_result.query_job.location == tokyo_location @@ -896,7 +898,8 @@ def test_read_pandas_tokyo( expected = scalars_pandas_df_index result = session_tokyo._executor.execute( - df._block.expr, use_explicit_destination=True + df._block.expr, + bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False), ) assert result.query_job is not None assert result.query_job.location == tokyo_location From 1c20901a1cc37c6e90ac048fba98f9e66313993d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 20 Aug 2025 00:10:23 +0000 Subject: [PATCH 2/5] fix usage of options._allow_large_results --- bigframes/core/blocks.py | 10 +++++----- bigframes/session/bq_caching_executor.py | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index fa35a0a8e6..105d98558d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -545,7 +545,7 @@ def to_arrow( under_10gb = ( (not allow_large_results) if (allow_large_results is not None) - else bigframes.options._allow_large_results + else not bigframes.options._allow_large_results ) execute_result = self.session._executor.execute( self.expr, @@ -645,7 +645,7 @@ def try_peek( under_10gb = ( (not allow_large_results) if (allow_large_results is not None) - else bigframes.options._allow_large_results + else not bigframes.options._allow_large_results ) result = self.session._executor.execute( self.expr, @@ -670,7 +670,7 @@ def to_pandas_batches( under_10gb = ( (not allow_large_results) if (allow_large_results is not None) - else bigframes.options._allow_large_results + else not bigframes.options._allow_large_results ) execute_result = self.session._executor.execute( self.expr, @@ -726,13 +726,13 @@ def _materialize_local( under_10gb = ( (not materialize_options.allow_large_results) if (materialize_options.allow_large_results is not None) - else bigframes.options._allow_large_results + else (not bigframes.options._allow_large_results) ) execute_result = self.session._executor.execute( self.expr, execution_spec.ExecutionSpec( promise_under_10gb=under_10gb, - ordered=True, + ordered=materialize_options.ordered, ), ) sample_config = materialize_options.downsampling diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index aeaf895d42..457419ba7f 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -256,7 +256,9 @@ def _maybe_find_existing_table( if spec.if_exists == "fail": raise ValueError(f"Table already exists: {spec.table.__str__()}") - if table.clustering_fields != spec.cluster_cols: + if (len(spec.cluster_cols) != 0) and ( + table.clustering_fields != spec.cluster_cols + ): raise ValueError( "Table clustering fields cannot be changed after the table has " f"been created. Existing clustering fields: {table.clustering_fields}" From ced361821bca7716fa3fb18f77b5f43020b08fde Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 20 Aug 2025 22:58:01 +0000 Subject: [PATCH 3/5] fix tokyo tests, improve prepare_plan helper --- bigframes/session/bq_caching_executor.py | 87 +++++++----------------- tests/system/small/test_session.py | 8 ++- 2 files changed, 32 insertions(+), 63 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 457419ba7f..2c72c73682 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -15,7 +15,6 @@ from __future__ import annotations import math -import os import threading from typing import Literal, Mapping, Optional, Sequence, Tuple import warnings @@ -40,7 +39,6 @@ import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes -import bigframes.features from bigframes.session import ( executor, loader, @@ -176,7 +174,7 @@ def to_sql( if offset_column: array_value, _ = array_value.promote_offsets() node = ( - self.prepare_plan(array_value.node, locality="original") + self.prepare_plan(array_value.node, target="simplify") if enable_cache else array_value.node ) @@ -191,7 +189,7 @@ def execute( ) -> executor.ExecuteResult: # TODO: Support export jobs in combination with semi executors if execution_spec.destination_spec is None: - plan = self.prepare_plan(array_value.node, locality="original") + plan = self.prepare_plan(array_value.node, target="simplify") for exec in self._semi_executors: maybe_result = exec.execute( plan, ordered=execution_spec.ordered, peek=execution_spec.peek @@ -199,7 +197,6 @@ def execute( if maybe_result: return maybe_result - # next: prepare output spec if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): if execution_spec.peek or execution_spec.ordered: raise NotImplementedError( @@ -273,11 +270,14 @@ def _export_gbq( """ Export the ArrayValue to an existing BigQuery table. """ + plan = self.prepare_plan(array_value.node, target="bq_execution") # validate destination table existing_table = self._maybe_find_existing_table(spec) - sql = self.to_sql(array_value, ordered=False) + compiled = compile.compile_sql(compile.CompileRequest(plan, sort_rows=False)) + sql = compiled.sql + if (existing_table is not None) and _if_schema_match( existing_table.schema, array_value.schema ): @@ -433,17 +433,27 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): def prepare_plan( self, - root: nodes.BigFrameNode, - locality: Literal["bq_compat", "original"] = "bq_compat", + plan: nodes.BigFrameNode, + target: Literal["simplify", "bq_execution"] = "simplify", ) -> nodes.BigFrameNode: """ - Apply universal logical simplifications that are helpful regardless of engine. + Prepare the plan by simplifying it with caches, removing unused operators. Has modes for different contexts. + + "simplify" removes unused operations and subsitutes subtrees with their previously cached equivalents + "bq_execution" is the most heavy option, preparing the plan for bq execution by also caching subtrees, uploading large local sources """ - plan = self.replace_cached_subtrees(root) + # TODO: We should model plan decomposition and data uploading as work steps rather than as plan preparation. + if ( + target == "bq_execution" + and bigframes.options.compute.enable_multi_query_execution + ): + self._simplify_with_caching(plan) + + plan = self.replace_cached_subtrees(plan) plan = rewrite.column_pruning(plan) plan = plan.top_down(rewrite.fold_row_counts) - if locality == "bq_compat": + if target == "bq_execution": plan = self._substitute_large_local_sources(plan) return plan @@ -493,7 +503,10 @@ def _simplify_with_caching(self, plan: nodes.BigFrameNode): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first for _ in range(MAX_SUBTREE_FACTORINGS): - if self.prepare_plan(plan).planning_complexity < QUERY_COMPLEXITY_LIMIT: + if ( + self.prepare_plan(plan, "simplify").planning_complexity + < QUERY_COMPLEXITY_LIMIT + ): return did_cache = self._cache_most_complex_subtree(plan) @@ -518,29 +531,6 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: self._cache_with_cluster_cols(bigframes.core.ArrayValue(selection), []) return True - def _validate_result_schema( - self, - array_value: bigframes.core.ArrayValue, - bq_schema: list[bigquery.SchemaField], - ): - actual_schema = _sanitize(tuple(bq_schema)) - ibis_schema = compile.test_only_ibis_inferred_schema( - self.prepare_plan(array_value.node, locality="original") - ).to_bigquery() - internal_schema = _sanitize(array_value.schema.to_bigquery()) - if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: - return - - if internal_schema != actual_schema: - raise ValueError( - f"This error should only occur while testing. BigFrames internal schema: {internal_schema} does not match actual schema: {actual_schema}" - ) - - if ibis_schema != actual_schema: - raise ValueError( - f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" - ) - def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): """ Replace large local sources with the uploaded version of those datasources. @@ -604,10 +594,7 @@ def _execute_plan_gbq( og_plan = plan og_schema = plan.schema - plan = self.prepare_plan(plan, locality="bq_compat") - if bigframes.options.compute.enable_multi_query_execution: - self._simplify_with_caching(plan) - + plan = self.prepare_plan(plan, target="bq_execution") create_table = must_create_table cluster_cols: Sequence[str] = [] if cache_spec is not None: @@ -674,12 +661,6 @@ def _execute_plan_gbq( "`bigframes.options.compute.allow_large_results=True`." ) warnings.warn(msg, FutureWarning) - # Runs strict validations to ensure internal type predictions and ibis are completely in sync - # Do not execute these validations outside of testing suite. - if "PYTEST_CURRENT_TEST" in os.environ: - self._validate_result_schema( - bigframes.core.ArrayValue(plan), iterator.schema - ) return executor.ExecuteResult( _arrow_batches=iterator.to_arrow_iterable( @@ -705,19 +686,3 @@ def _if_schema_match( ): return False return True - - -def _sanitize( - schema: Tuple[bigquery.SchemaField, ...] -) -> Tuple[bigquery.SchemaField, ...]: - # Schema inferred from SQL strings and Ibis expressions contain only names, types and modes, - # so we disregard other fields (e.g timedelta description for timedelta columns) for validations. - return tuple( - bigquery.SchemaField( - f.name, - f.field_type, - f.mode, # type:ignore - fields=_sanitize(f.fields), - ) - for f in schema - ) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 3e65136350..3340e0cf47 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -115,7 +115,9 @@ def test_read_gbq_tokyo( # use_explicit_destination=True, otherwise might use path with no query_job exec_result = session_tokyo._executor.execute( df._block.expr, - bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False), + bigframes.session.execution_spec.ExecutionSpec( + bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + ), ) assert exec_result.query_job is not None assert exec_result.query_job.location == tokyo_location @@ -899,7 +901,9 @@ def test_read_pandas_tokyo( result = session_tokyo._executor.execute( df._block.expr, - bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False), + bigframes.session.execution_spec.ExecutionSpec( + bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + ), ) assert result.query_job is not None assert result.query_job.location == tokyo_location From 03249a1c7cec0a84c206f8c1359d31c5075c7c59 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 21 Aug 2025 19:49:50 +0000 Subject: [PATCH 4/5] fix cluster col comparison --- bigframes/session/bq_caching_executor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 2c72c73682..e49c2d9e32 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -253,12 +253,14 @@ def _maybe_find_existing_table( if spec.if_exists == "fail": raise ValueError(f"Table already exists: {spec.table.__str__()}") - if (len(spec.cluster_cols) != 0) and ( - table.clustering_fields != spec.cluster_cols + if ( + (len(spec.cluster_cols) != 0) + and (table.clustering_fields is not None) + and (tuple(table.clustering_fields) != spec.cluster_cols) ): raise ValueError( "Table clustering fields cannot be changed after the table has " - f"been created. Existing clustering fields: {table.clustering_fields}" + f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" ) return table except google.api_core.exceptions.NotFound: From f8da6f894c0baad6ea3c6a4e14001d8f45e59cfb Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 27 Aug 2025 01:25:37 +0000 Subject: [PATCH 5/5] fix incompatible clustering check --- bigframes/session/bq_caching_executor.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index e49c2d9e32..b428cd646c 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -253,15 +253,14 @@ def _maybe_find_existing_table( if spec.if_exists == "fail": raise ValueError(f"Table already exists: {spec.table.__str__()}") - if ( - (len(spec.cluster_cols) != 0) - and (table.clustering_fields is not None) - and (tuple(table.clustering_fields) != spec.cluster_cols) - ): - raise ValueError( - "Table clustering fields cannot be changed after the table has " - f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" - ) + if len(spec.cluster_cols) != 0: + if (table.clustering_fields is None) or ( + tuple(table.clustering_fields) != spec.cluster_cols + ): + raise ValueError( + "Table clustering fields cannot be changed after the table has " + f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" + ) return table except google.api_core.exceptions.NotFound: return None