diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index ef51f96575..1fb86d7bd6 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -30,7 +30,7 @@ class UnknownLocationWarning(Warning): class CleanupFailedWarning(Warning): - """Bigframes failed to clean up a table resource.""" + """Bigframes failed to clean up a table or function resource.""" class DefaultIndexWarning(Warning): diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index 3c1757806b..bdc6e7f59c 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -16,15 +16,21 @@ import threading from typing import List, Optional, Sequence import uuid +import warnings +from google.api_core import retry as api_core_retry import google.cloud.bigquery as bigquery from bigframes import constants import bigframes.core.events +import bigframes.exceptions as bfe from bigframes.session import temporary_storage import bigframes.session._io.bigquery as bf_io_bigquery _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}" +# UDFs older than this many days are considered stale and will be deleted +# from the anonymous dataset before creating a new UDF. +_UDF_CLEANUP_THRESHOLD_DAYS = 3 class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager): @@ -137,8 +143,46 @@ def generate_unique_resource_id(self) -> bigquery.TableReference: ) return self.dataset.table(table_id) + def _cleanup_old_udfs(self): + """Clean up old UDFs in the anonymous dataset.""" + dataset = self.dataset + routines = list(self.bqclient.list_routines(dataset)) + cleanup_cutoff_time = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) + + for routine in routines: + if ( + routine.created < cleanup_cutoff_time + and routine._properties["routineType"] == "SCALAR_FUNCTION" + ): + try: + self.bqclient.delete_routine( + routine.reference, + not_found_ok=True, + retry=api_core_retry.Retry(timeout=0), + ) + except Exception as e: + msg = bfe.format_message( + f"Unable to clean this old UDF '{routine.reference}': {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) + def close(self): """Delete tables that were created with this session's session_id.""" for table_ref in self._table_ids: self.bqclient.delete_table(table_ref, not_found_ok=True) self._table_ids.clear() + + try: + # Before closing the session, attempt to clean up any uncollected, + # old Python UDFs residing in the anonymous dataset. These UDFs + # accumulate over time and can eventually exceed resource limits. + # See more from b/450913424. + self._cleanup_old_udfs() + except Exception as e: + # Log a warning on the failure, do not interrupt the workflow. + msg = bfe.format_message( + f"Failed to clean up the old Python UDFs before closing the session: {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index d28146498d..a525defe59 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -13,6 +13,7 @@ # limitations under the License. import datetime +from unittest import mock import google.cloud.bigquery as bigquery import google.cloud.exceptions @@ -138,3 +139,35 @@ def test_clean_up_via_context_manager(session_creator): bqclient.delete_table(full_id_1) with pytest.raises(google.cloud.exceptions.NotFound): bqclient.delete_table(full_id_2) + + +def test_cleanup_old_udfs(session: bigframes.Session): + routine_ref = session._anon_dataset_manager.dataset.routine("test_routine_cleanup") + + # Create a dummy function to be deleted. + create_function_sql = f""" +CREATE OR REPLACE FUNCTION `{routine_ref.project}.{routine_ref.dataset_id}.{routine_ref.routine_id}`(x INT64) +RETURNS INT64 LANGUAGE python +OPTIONS (entry_point='dummy_func', runtime_version='python-3.11') +AS r''' +def dummy_func(x): + return x + 1 +''' + """ + session.bqclient.query(create_function_sql).result() + + assert session.bqclient.get_routine(routine_ref) is not None + + mock_routine = mock.MagicMock(spec=bigquery.Routine) + mock_routine.created = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=100) + mock_routine.reference = routine_ref + mock_routine._properties = {"routineType": "SCALAR_FUNCTION"} + routines = [mock_routine] + + with mock.patch.object(session.bqclient, "list_routines", return_value=routines): + session._anon_dataset_manager._cleanup_old_udfs() + + with pytest.raises(google.cloud.exceptions.NotFound): + session.bqclient.get_routine(routine_ref) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index b434b51fb3..21bbbc2409 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -419,7 +419,7 @@ def to_gbq( >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> destination = df.to_gbq(ordering_id="ordering_id") >>> # The table created can be read outside of the current session. - >>> bpd.close_session() # Optional, to demonstrate a new session. + >>> bpd.close_session() # Optional, to demonstrate a new session. # doctest: +SKIP >>> bpd.read_gbq(destination, index_col="ordering_id") col1 col2 ordering_id