@@ -152,24 +152,28 @@ def validate_table(
152152 return False
153153
154154
155- def are_index_cols_unique (
155+ def infer_unique_columns (
156156 bqclient : bigquery .Client ,
157157 table : bigquery .table .Table ,
158158 index_cols : List [str ],
159159 api_name : str ,
160160 metadata_only : bool = False ,
161- ) -> bool :
162- if len (index_cols ) == 0 :
163- return False
161+ ) -> Tuple [str , ...]:
162+ """Return a set of columns that can provide a unique row key or empty if none can be inferred.
163+
164+ Note: primary keys are not enforced, but these are assumed to be unique
165+ by the query engine, so we make the same assumption here.
166+ """
164167 # If index_cols contain the primary_keys, the query engine assumes they are
165168 # provide a unique index.
166- primary_keys = frozenset (_get_primary_keys (table ))
167- if (len (primary_keys ) > 0 ) and primary_keys <= frozenset (index_cols ):
168- return True
169+ primary_keys = tuple (_get_primary_keys (table ))
170+ if (len (primary_keys ) > 0 ) and frozenset (primary_keys ) <= frozenset (index_cols ):
171+ # Essentially, just reordering the primary key to match the index col order
172+ return tuple (index_col for index_col in index_cols if index_col in primary_keys )
169173
170- if metadata_only :
174+ if primary_keys or metadata_only or ( not index_cols ) :
171175 # Sometimes not worth scanning data to check uniqueness
172- return False
176+ return primary_keys
173177 # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring
174178 # table_expression only selects just index_cols.
175179 is_unique_sql = bigframes .core .sql .is_distinct_sql (index_cols , table .reference )
@@ -178,7 +182,9 @@ def are_index_cols_unique(
178182 results = bqclient .query_and_wait (is_unique_sql , job_config = job_config )
179183 row = next (iter (results ))
180184
181- return row ["total_count" ] == row ["distinct_count" ]
185+ if row ["total_count" ] == row ["distinct_count" ]:
186+ return tuple (index_cols )
187+ return ()
182188
183189
184190def _get_primary_keys (
@@ -279,54 +285,3 @@ def get_index_cols(
279285 index_cols = primary_keys
280286
281287 return index_cols
282-
283-
284- def get_time_travel_datetime_and_table_metadata (
285- bqclient : bigquery .Client ,
286- table_ref : bigquery .TableReference ,
287- * ,
288- api_name : str ,
289- cache : Dict [bigquery .TableReference , Tuple [datetime .datetime , bigquery .Table ]],
290- use_cache : bool = True ,
291- ) -> Tuple [datetime .datetime , bigquery .Table ]:
292- cached_table = cache .get (table_ref )
293- if use_cache and cached_table is not None :
294- snapshot_timestamp , _ = cached_table
295-
296- # Cache hit could be unexpected. See internal issue 329545805.
297- # Raise a warning with more information about how to avoid the
298- # problems with the cache.
299- msg = (
300- f"Reading cached table from { snapshot_timestamp } to avoid "
301- "incompatibilies with previous reads of this table. To read "
302- "the latest version, set `use_cache=False` or close the "
303- "current session with Session.close() or "
304- "bigframes.pandas.close_session()."
305- )
306- # There are many layers before we get to (possibly) the user's code:
307- # pandas.read_gbq_table
308- # -> with_default_session
309- # -> Session.read_gbq_table
310- # -> _read_gbq_table
311- # -> _get_snapshot_sql_and_primary_key
312- # -> get_snapshot_datetime_and_table_metadata
313- warnings .warn (msg , stacklevel = 7 )
314- return cached_table
315-
316- # TODO(swast): It's possible that the table metadata is changed between now
317- # and when we run the CURRENT_TIMESTAMP() query to see when we can time
318- # travel to. Find a way to fetch the table metadata and BQ's current time
319- # atomically.
320- table = bqclient .get_table (table_ref )
321-
322- job_config = bigquery .QueryJobConfig ()
323- job_config .labels ["bigframes-api" ] = api_name
324- snapshot_timestamp = list (
325- bqclient .query (
326- "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`" ,
327- job_config = job_config ,
328- ).result ()
329- )[0 ][0 ]
330- cached_table = (snapshot_timestamp , table )
331- cache [table_ref ] = cached_table
332- return cached_table
0 commit comments