From 59c0ec56adba883e386e75c0b27927204aa0d2b2 Mon Sep 17 00:00:00 2001 From: jialuo Date: Wed, 15 Oct 2025 18:13:47 +0000 Subject: [PATCH 01/11] chore: Add cleanup step for old UDFs in anonymous dataset --- bigframes/blob/_functions.py | 31 +++++++++++++++++++++++++++++++ bigframes/exceptions.py | 2 +- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 8dd9328fb8..fe92659762 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -13,15 +13,19 @@ # limitations under the License. from dataclasses import dataclass +import datetime import inspect from typing import Callable, Iterable, Union +import warnings import google.cloud.bigquery as bigquery +import bigframes.exceptions as bfe import bigframes.session import bigframes.session._io.bigquery as bf_io_bigquery _PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} +_UDF_CLEANUP_THRESHOLD_DAYS = 3 @dataclass(frozen=True) @@ -66,12 +70,39 @@ def _output_bq_type(self): sig = inspect.signature(self._func) return _PYTHON_TO_BQ_TYPES[sig.return_annotation] + def _cleanup_old_udfs(self): + """Clean up old UDFs in the anonymous dataset.""" + dataset = self._session._anon_dataset_manager.dataset + routines = list(self._session.bqclient.list_routines(dataset)) + seven_days_ago = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) + + cleaned_up_routines = 0 + for routine in routines: + if ( + routine.created < seven_days_ago + and routine._properties["routineType"] == "SCALAR_FUNCTION" + ): + self._session.bqclient.delete_routine(routine.reference) + cleaned_up_routines += 1 + def _create_udf(self): """Create Python UDF in BQ. Return name of the UDF.""" udf_name = str( self._session._anon_dataset_manager.generate_unique_resource_id() ) + # Try to clean up the old Python UDFs in the anonymous dataset. Do not + # raise an error when it fails for this step. + try: + self._cleanup_old_udfs() + except Exception as e: + msg = bfe.format_message( + f"Failed to clean up the old Python UDFs before creating {udf_name}: {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) + func_body = inspect.getsource(self._func) func_name = self._func.__name__ packages = str(list(self._requirements)) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index ef51f96575..1fb86d7bd6 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -30,7 +30,7 @@ class UnknownLocationWarning(Warning): class CleanupFailedWarning(Warning): - """Bigframes failed to clean up a table resource.""" + """Bigframes failed to clean up a table or function resource.""" class DefaultIndexWarning(Warning): From e333a9bf5ab8a1ce2e183a2dc55d003887792f28 Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 16 Oct 2025 17:52:13 +0000 Subject: [PATCH 02/11] fix --- bigframes/blob/_functions.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index fe92659762..442d581154 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -78,14 +78,15 @@ def _cleanup_old_udfs(self): datetime.timezone.utc ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) - cleaned_up_routines = 0 for routine in routines: if ( routine.created < seven_days_ago and routine._properties["routineType"] == "SCALAR_FUNCTION" ): - self._session.bqclient.delete_routine(routine.reference) - cleaned_up_routines += 1 + try: + self._session.bqclient.delete_routine(routine.reference) + except Exception: + pass def _create_udf(self): """Create Python UDF in BQ. Return name of the UDF.""" @@ -93,9 +94,13 @@ def _create_udf(self): self._session._anon_dataset_manager.generate_unique_resource_id() ) - # Try to clean up the old Python UDFs in the anonymous dataset. Do not - # raise an error when it fails for this step. + # Try to clean up the old Python UDFs in the anonymous dataset. Failure + # to clean up is logged as a warning but does not halt execution. try: + # Before creating a new UDF, attempt to clean up any uncollected, + # old Python UDFs residing in the anonymous dataset. These UDFs + # accumulate over time and can eventually exceed resource limits. + # See more from b/450913424. self._cleanup_old_udfs() except Exception as e: msg = bfe.format_message( From badddcb51a427c2c6fe1cb620f142bc5c690687e Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 16 Oct 2025 20:15:09 +0000 Subject: [PATCH 03/11] fix --- bigframes/blob/_functions.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 25ab6fe888..38d85be19f 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -31,6 +31,8 @@ bytes: "BYTES", bool: "BOOL", } +# UDFs older than this many days are considered stale and will be deleted +# from the anonymous dataset before creating a new UDF. _UDF_CLEANUP_THRESHOLD_DAYS = 3 @@ -80,13 +82,13 @@ def _cleanup_old_udfs(self): """Clean up old UDFs in the anonymous dataset.""" dataset = self._session._anon_dataset_manager.dataset routines = list(self._session.bqclient.list_routines(dataset)) - seven_days_ago = datetime.datetime.now( + cleanup_cutoff_time = datetime.datetime.now( datetime.timezone.utc ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) for routine in routines: if ( - routine.created < seven_days_ago + routine.created < cleanup_cutoff_time and routine._properties["routineType"] == "SCALAR_FUNCTION" ): try: @@ -100,8 +102,6 @@ def _create_udf(self): self._session._anon_dataset_manager.generate_unique_resource_id() ) - # Try to clean up the old Python UDFs in the anonymous dataset. Failure - # to clean up is logged as a warning but does not halt execution. try: # Before creating a new UDF, attempt to clean up any uncollected, # old Python UDFs residing in the anonymous dataset. These UDFs @@ -109,6 +109,7 @@ def _create_udf(self): # See more from b/450913424. self._cleanup_old_udfs() except Exception as e: + # Log a warning on the failure, do not interrupt the workflow. msg = bfe.format_message( f"Failed to clean up the old Python UDFs before creating {udf_name}: {e}" ) From d619add6b876fb348a86e7dbde76ae63bf8735ab Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 24 Oct 2025 22:24:53 +0000 Subject: [PATCH 04/11] fix with some test code --- bigframes/blob/_functions.py | 37 ------------------------- bigframes/session/anonymous_dataset.py | 38 ++++++++++++++++++++++++++ tests/system/large/test_session.py | 33 ++++++++++++++++++++++ 3 files changed, 71 insertions(+), 37 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 38d85be19f..2a11974b8d 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -13,14 +13,11 @@ # limitations under the License. from dataclasses import dataclass -import datetime import inspect from typing import Callable, Iterable, Union -import warnings import google.cloud.bigquery as bigquery -import bigframes.exceptions as bfe import bigframes.session import bigframes.session._io.bigquery as bf_io_bigquery @@ -31,9 +28,6 @@ bytes: "BYTES", bool: "BOOL", } -# UDFs older than this many days are considered stale and will be deleted -# from the anonymous dataset before creating a new UDF. -_UDF_CLEANUP_THRESHOLD_DAYS = 3 @dataclass(frozen=True) @@ -78,43 +72,12 @@ def _output_bq_type(self): sig = inspect.signature(self._func) return _PYTHON_TO_BQ_TYPES[sig.return_annotation] - def _cleanup_old_udfs(self): - """Clean up old UDFs in the anonymous dataset.""" - dataset = self._session._anon_dataset_manager.dataset - routines = list(self._session.bqclient.list_routines(dataset)) - cleanup_cutoff_time = datetime.datetime.now( - datetime.timezone.utc - ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) - - for routine in routines: - if ( - routine.created < cleanup_cutoff_time - and routine._properties["routineType"] == "SCALAR_FUNCTION" - ): - try: - self._session.bqclient.delete_routine(routine.reference) - except Exception: - pass - def _create_udf(self): """Create Python UDF in BQ. Return name of the UDF.""" udf_name = str( self._session._anon_dataset_manager.generate_unique_resource_id() ) - try: - # Before creating a new UDF, attempt to clean up any uncollected, - # old Python UDFs residing in the anonymous dataset. These UDFs - # accumulate over time and can eventually exceed resource limits. - # See more from b/450913424. - self._cleanup_old_udfs() - except Exception as e: - # Log a warning on the failure, do not interrupt the workflow. - msg = bfe.format_message( - f"Failed to clean up the old Python UDFs before creating {udf_name}: {e}" - ) - warnings.warn(msg, category=bfe.CleanupFailedWarning) - func_body = inspect.getsource(self._func) func_name = self._func.__name__ packages = str(list(self._requirements)) diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index 3c1757806b..d6faf3fe39 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -16,15 +16,20 @@ import threading from typing import List, Optional, Sequence import uuid +import warnings import google.cloud.bigquery as bigquery from bigframes import constants import bigframes.core.events +import bigframes.exceptions as bfe from bigframes.session import temporary_storage import bigframes.session._io.bigquery as bf_io_bigquery _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}" +# UDFs older than this many days are considered stale and will be deleted +# from the anonymous dataset before creating a new UDF. +_UDF_CLEANUP_THRESHOLD_DAYS = 30 class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager): @@ -137,8 +142,41 @@ def generate_unique_resource_id(self) -> bigquery.TableReference: ) return self.dataset.table(table_id) + def _cleanup_old_udfs(self): + """Clean up old UDFs in the anonymous dataset.""" + dataset = self.dataset + routines = list(self.bqclient.list_routines(dataset)) + cleanup_cutoff_time = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) + + for routine in routines: + print("Routine: ", routine.routine_id, routine.created) + if ( + routine.created < cleanup_cutoff_time + and routine._properties["routineType"] == "SCALAR_FUNCTION" + ): + try: + self.bqclient.delete_routine(routine.reference) + print(">>>> This routine gets deleted!") + except Exception: + pass + def close(self): """Delete tables that were created with this session's session_id.""" for table_ref in self._table_ids: self.bqclient.delete_table(table_ref, not_found_ok=True) self._table_ids.clear() + + try: + # Before closing the session, attempt to clean up any uncollected, + # old Python UDFs residing in the anonymous dataset. These UDFs + # accumulate over time and can eventually exceed resource limits. + # See more from b/450913424. + self._cleanup_old_udfs() + except Exception as e: + # Log a warning on the failure, do not interrupt the workflow. + msg = bfe.format_message( + f"Failed to clean up the old Python UDFs before closing the session: {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index d28146498d..a525defe59 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -13,6 +13,7 @@ # limitations under the License. import datetime +from unittest import mock import google.cloud.bigquery as bigquery import google.cloud.exceptions @@ -138,3 +139,35 @@ def test_clean_up_via_context_manager(session_creator): bqclient.delete_table(full_id_1) with pytest.raises(google.cloud.exceptions.NotFound): bqclient.delete_table(full_id_2) + + +def test_cleanup_old_udfs(session: bigframes.Session): + routine_ref = session._anon_dataset_manager.dataset.routine("test_routine_cleanup") + + # Create a dummy function to be deleted. + create_function_sql = f""" +CREATE OR REPLACE FUNCTION `{routine_ref.project}.{routine_ref.dataset_id}.{routine_ref.routine_id}`(x INT64) +RETURNS INT64 LANGUAGE python +OPTIONS (entry_point='dummy_func', runtime_version='python-3.11') +AS r''' +def dummy_func(x): + return x + 1 +''' + """ + session.bqclient.query(create_function_sql).result() + + assert session.bqclient.get_routine(routine_ref) is not None + + mock_routine = mock.MagicMock(spec=bigquery.Routine) + mock_routine.created = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=100) + mock_routine.reference = routine_ref + mock_routine._properties = {"routineType": "SCALAR_FUNCTION"} + routines = [mock_routine] + + with mock.patch.object(session.bqclient, "list_routines", return_value=routines): + session._anon_dataset_manager._cleanup_old_udfs() + + with pytest.raises(google.cloud.exceptions.NotFound): + session.bqclient.get_routine(routine_ref) From 6d487f3432fcf1f29dd98a0e8e931544aa6dcce1 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 27 Oct 2025 23:35:56 +0000 Subject: [PATCH 05/11] fix --- bigframes/session/anonymous_dataset.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index d6faf3fe39..f24a615d9e 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -29,7 +29,7 @@ _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}" # UDFs older than this many days are considered stale and will be deleted # from the anonymous dataset before creating a new UDF. -_UDF_CLEANUP_THRESHOLD_DAYS = 30 +_UDF_CLEANUP_THRESHOLD_DAYS = 3 class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager): @@ -151,15 +151,13 @@ def _cleanup_old_udfs(self): ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) for routine in routines: - print("Routine: ", routine.routine_id, routine.created) if ( routine.created < cleanup_cutoff_time and routine._properties["routineType"] == "SCALAR_FUNCTION" ): try: - self.bqclient.delete_routine(routine.reference) - print(">>>> This routine gets deleted!") - except Exception: + self.bqclient.delete_routine(routine.reference, not_found_ok=True) + except Exception as e: pass def close(self): From 93b873fa1992d5def43caabb067abf20183c8fc6 Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 31 Oct 2025 17:30:43 +0000 Subject: [PATCH 06/11] no retry --- bigframes/session/anonymous_dataset.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index f24a615d9e..c6a94f8457 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -156,7 +156,9 @@ def _cleanup_old_udfs(self): and routine._properties["routineType"] == "SCALAR_FUNCTION" ): try: - self.bqclient.delete_routine(routine.reference, not_found_ok=True) + self.bqclient.delete_routine( + routine.reference, not_found_ok=True, retry=None + ) except Exception as e: pass From 8daee373351fe7f5cf12f0e88e4de0369c429617 Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 31 Oct 2025 18:13:57 +0000 Subject: [PATCH 07/11] fix retry --- bigframes/session/anonymous_dataset.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index c6a94f8457..26978fd11b 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -18,6 +18,7 @@ import uuid import warnings +from google.api_core import retry as api_core_retry import google.cloud.bigquery as bigquery from bigframes import constants @@ -157,9 +158,11 @@ def _cleanup_old_udfs(self): ): try: self.bqclient.delete_routine( - routine.reference, not_found_ok=True, retry=None + routine.reference, + not_found_ok=True, + retry=api_core_retry.Retry(timeout=0), ) - except Exception as e: + except Exception: pass def close(self): From d10e74ac2c8b89f0ad9291b4e4d4330b88fa55f2 Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 31 Oct 2025 20:02:45 +0000 Subject: [PATCH 08/11] disable doctest --- third_party/bigframes_vendored/pandas/core/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index b434b51fb3..21bbbc2409 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -419,7 +419,7 @@ def to_gbq( >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> destination = df.to_gbq(ordering_id="ordering_id") >>> # The table created can be read outside of the current session. - >>> bpd.close_session() # Optional, to demonstrate a new session. + >>> bpd.close_session() # Optional, to demonstrate a new session. # doctest: +SKIP >>> bpd.read_gbq(destination, index_col="ordering_id") col1 col2 ordering_id From 5dab3be249402bb8632b1cbbb07cf5d903104907 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 3 Nov 2025 20:01:25 +0000 Subject: [PATCH 09/11] testing - increase timeout --- scripts/run_and_publish_benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_and_publish_benchmark.py b/scripts/run_and_publish_benchmark.py index 859d68e60e..7e6ca1a7d7 100644 --- a/scripts/run_and_publish_benchmark.py +++ b/scripts/run_and_publish_benchmark.py @@ -374,7 +374,7 @@ def run_notebook_benchmark(benchmark_file: str, region: str): pytest_command = [ "py.test", "--nbmake", - "--nbmake-timeout=900", # 15 minutes + "--nbmake-timeout=3600", # 1 hour within this PR (will revert later). "--durations=0", "--color=yes", ] From d6e55e24d55791bc22b86325d8535df5edd5e858 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 3 Nov 2025 21:45:35 +0000 Subject: [PATCH 10/11] revert timout --- scripts/run_and_publish_benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_and_publish_benchmark.py b/scripts/run_and_publish_benchmark.py index 7e6ca1a7d7..859d68e60e 100644 --- a/scripts/run_and_publish_benchmark.py +++ b/scripts/run_and_publish_benchmark.py @@ -374,7 +374,7 @@ def run_notebook_benchmark(benchmark_file: str, region: str): pytest_command = [ "py.test", "--nbmake", - "--nbmake-timeout=3600", # 1 hour within this PR (will revert later). + "--nbmake-timeout=900", # 15 minutes "--durations=0", "--color=yes", ] From 5a347a9e7f9ffae3b1a010b84d2cf99bcc1e1a34 Mon Sep 17 00:00:00 2001 From: jialuo Date: Tue, 4 Nov 2025 01:35:35 +0000 Subject: [PATCH 11/11] add warning --- bigframes/session/anonymous_dataset.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index 26978fd11b..bdc6e7f59c 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -162,8 +162,11 @@ def _cleanup_old_udfs(self): not_found_ok=True, retry=api_core_retry.Retry(timeout=0), ) - except Exception: - pass + except Exception as e: + msg = bfe.format_message( + f"Unable to clean this old UDF '{routine.reference}': {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) def close(self): """Delete tables that were created with this session's session_id."""