2121import logging
2222import os
2323import re
24+ import secrets
2425import typing
2526from typing import (
2627 Any ,
3738 Tuple ,
3839 Union ,
3940)
41+ import uuid
4042import warnings
4143
4244# Even though the ibis.backends.bigquery import is unused, it's needed
100102
101103_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"
102104
105+ _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
106+
103107_MAX_CLUSTER_COLUMNS = 4
104108
105109# TODO(swast): Need to connect to regional endpoints when performing remote
@@ -203,7 +207,11 @@ def __init__(
203207 bq_kms_key_name = self ._bq_kms_key_name ,
204208 )
205209
206- self ._create_bq_datasets ()
210+ self ._anonymous_dataset = (
211+ bigframes .session ._io .bigquery .create_bq_dataset_reference (
212+ self .bqclient , location = self ._location
213+ )
214+ )
207215
208216 # TODO(shobs): Remove this logic after https://github.com/ibis-project/ibis/issues/8494
209217 # has been fixed. The ibis client changes the default query job config
@@ -233,6 +241,13 @@ def __init__(
233241 bigquery .TableReference , Tuple [datetime .datetime , bigquery .Table ]
234242 ] = {}
235243
244+ # unique session identifier, short enough to be human readable
245+ # only needs to be unique among sessions created by the same user
246+ # at the same time in the same region
247+ self ._session_id : str = "session" + secrets .token_hex (3 )
248+ self ._table_ids : List [str ] = []
249+ # store table ids and delete them when the session is closed
250+
236251 @property
237252 def bqclient (self ):
238253 return self ._clients_provider .bqclient
@@ -263,6 +278,10 @@ def bqconnectionmanager(self):
263278 )
264279 return self ._bq_connection_manager
265280
281+ @property
282+ def session_id (self ):
283+ return self ._session_id
284+
266285 @property
267286 def _project (self ):
268287 return self .bqclient .project
@@ -271,24 +290,15 @@ def __hash__(self):
271290 # Stable hash needed to use in expression tree
272291 return hash (str (self ._anonymous_dataset ))
273292
274- def _create_bq_datasets (self ):
275- """Create and identify dataset(s) for temporary BQ resources."""
276- query_job = self .bqclient .query ("SELECT 1" , location = self ._location )
277- query_job .result () # blocks until finished
278-
279- # The anonymous dataset is used by BigQuery to write query results and
280- # session tables. BigQuery DataFrames also writes temp tables directly
281- # to the dataset, no BigQuery Session required. Note: there is a
282- # different anonymous dataset per location. See:
283- # https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
284- query_destination = query_job .destination
285- self ._anonymous_dataset = bigquery .DatasetReference (
286- query_destination .project ,
287- query_destination .dataset_id ,
288- )
289-
290293 def close (self ):
291- """No-op. Temporary resources are deleted after 7 days."""
294+ """Delete tables that were created with this session's session_id."""
295+ client = self .bqclient
296+ project_id = self ._anonymous_dataset .project
297+ dataset_id = self ._anonymous_dataset .dataset_id
298+
299+ for table_id in self ._table_ids :
300+ full_id = "." .join ([project_id , dataset_id , table_id ])
301+ client .delete_table (full_id , not_found_ok = True )
292302
293303 def read_gbq (
294304 self ,
@@ -1063,7 +1073,7 @@ def _read_pandas_load_job(
10631073
10641074 job_config .labels = {"bigframes-api" : api_name }
10651075
1066- load_table_destination = bigframes_io . random_table ( self ._anonymous_dataset )
1076+ load_table_destination = self ._random_table ( )
10671077 load_job = self .bqclient .load_table_from_dataframe (
10681078 pandas_dataframe_copy ,
10691079 load_table_destination ,
@@ -1145,7 +1155,7 @@ def read_csv(
11451155 encoding : Optional [str ] = None ,
11461156 ** kwargs ,
11471157 ) -> dataframe .DataFrame :
1148- table = bigframes_io . random_table ( self ._anonymous_dataset )
1158+ table = self ._random_table ( )
11491159
11501160 if engine is not None and engine == "bigquery" :
11511161 if any (param is not None for param in (dtype , names )):
@@ -1282,7 +1292,7 @@ def read_parquet(
12821292 * ,
12831293 engine : str = "auto" ,
12841294 ) -> dataframe .DataFrame :
1285- table = bigframes_io . random_table ( self ._anonymous_dataset )
1295+ table = self ._random_table ( )
12861296
12871297 if engine == "bigquery" :
12881298 job_config = self ._prepare_load_job_config ()
@@ -1319,7 +1329,7 @@ def read_json(
13191329 engine : Literal ["ujson" , "pyarrow" , "bigquery" ] = "ujson" ,
13201330 ** kwargs ,
13211331 ) -> dataframe .DataFrame :
1322- table = bigframes_io . random_table ( self ._anonymous_dataset )
1332+ table = self ._random_table ( )
13231333
13241334 if engine == "bigquery" :
13251335
@@ -1416,14 +1426,12 @@ def _create_empty_temp_table(
14161426 ) -> bigquery .TableReference :
14171427 # Can't set a table in _SESSION as destination via query job API, so we
14181428 # run DDL, instead.
1419- dataset = self ._anonymous_dataset
14201429 expiration = (
14211430 datetime .datetime .now (datetime .timezone .utc ) + constants .DEFAULT_EXPIRATION
14221431 )
14231432
14241433 table = bigframes_io .create_temp_table (
1425- self .bqclient ,
1426- dataset ,
1434+ self ,
14271435 expiration ,
14281436 schema = schema ,
14291437 cluster_columns = cluster_cols ,
@@ -1939,6 +1947,32 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
19391947 else :
19401948 job .result ()
19411949
1950+ def _random_table (self , skip_cleanup : bool = False ) -> bigquery .TableReference :
1951+ """Generate a random table ID with BigQuery DataFrames prefix.
1952+
1953+ The generated ID will be stored and checked for deletion when the
1954+ session is closed, unless skip_cleanup is True.
1955+
1956+ Args:
1957+ skip_cleanup (bool, default False):
1958+ If True, do not add the generated ID to the list of tables
1959+ to clean up when the session is closed.
1960+
1961+ Returns:
1962+ google.cloud.bigquery.TableReference:
1963+ Fully qualified table ID of a table that doesn't exist.
1964+ """
1965+ dataset = self ._anonymous_dataset
1966+ session_id = self .session_id
1967+ now = datetime .datetime .now (datetime .timezone .utc )
1968+ random_id = uuid .uuid4 ().hex
1969+ table_id = _TEMP_TABLE_ID_FORMAT .format (
1970+ date = now .strftime ("%Y%m%d" ), session_id = session_id , random_id = random_id
1971+ )
1972+ if not skip_cleanup :
1973+ self ._table_ids .append (table_id )
1974+ return dataset .table (table_id )
1975+
19421976
19431977def connect (context : Optional [bigquery_options .BigQueryOptions ] = None ) -> Session :
19441978 return Session (context )
0 commit comments