Skip to content
Merged
2 changes: 1 addition & 1 deletion bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class UnknownLocationWarning(Warning):


class CleanupFailedWarning(Warning):
"""Bigframes failed to clean up a table resource."""
"""Bigframes failed to clean up a table or function resource."""


class DefaultIndexWarning(Warning):
Expand Down
44 changes: 44 additions & 0 deletions bigframes/session/anonymous_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
import threading
from typing import List, Optional, Sequence
import uuid
import warnings

from google.api_core import retry as api_core_retry
import google.cloud.bigquery as bigquery

from bigframes import constants
import bigframes.core.events
import bigframes.exceptions as bfe
from bigframes.session import temporary_storage
import bigframes.session._io.bigquery as bf_io_bigquery

_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
# UDFs older than this many days are considered stale and will be deleted
# from the anonymous dataset before creating a new UDF.
_UDF_CLEANUP_THRESHOLD_DAYS = 3


class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager):
Expand Down Expand Up @@ -137,8 +143,46 @@ def generate_unique_resource_id(self) -> bigquery.TableReference:
)
return self.dataset.table(table_id)

def _cleanup_old_udfs(self):
"""Clean up old UDFs in the anonymous dataset."""
dataset = self.dataset
routines = list(self.bqclient.list_routines(dataset))
cleanup_cutoff_time = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS)

for routine in routines:
if (
routine.created < cleanup_cutoff_time
and routine._properties["routineType"] == "SCALAR_FUNCTION"
):
try:
self.bqclient.delete_routine(
routine.reference,
not_found_ok=True,
retry=api_core_retry.Retry(timeout=0),
)
except Exception as e:
msg = bfe.format_message(
f"Unable to clean this old UDF '{routine.reference}': {e}"
)
warnings.warn(msg, category=bfe.CleanupFailedWarning)

def close(self):
"""Delete tables that were created with this session's session_id."""
for table_ref in self._table_ids:
self.bqclient.delete_table(table_ref, not_found_ok=True)
self._table_ids.clear()

try:
# Before closing the session, attempt to clean up any uncollected,
# old Python UDFs residing in the anonymous dataset. These UDFs
# accumulate over time and can eventually exceed resource limits.
# See more from b/450913424.
self._cleanup_old_udfs()
except Exception as e:
# Log a warning on the failure, do not interrupt the workflow.
msg = bfe.format_message(
f"Failed to clean up the old Python UDFs before closing the session: {e}"
)
warnings.warn(msg, category=bfe.CleanupFailedWarning)
Comment on lines +183 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only be warned if the list_routines call fails, but if all the individual deletions fail, we won't warn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks.

33 changes: 33 additions & 0 deletions tests/system/large/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import datetime
from unittest import mock

import google.cloud.bigquery as bigquery
import google.cloud.exceptions
Expand Down Expand Up @@ -138,3 +139,35 @@ def test_clean_up_via_context_manager(session_creator):
bqclient.delete_table(full_id_1)
with pytest.raises(google.cloud.exceptions.NotFound):
bqclient.delete_table(full_id_2)


def test_cleanup_old_udfs(session: bigframes.Session):
routine_ref = session._anon_dataset_manager.dataset.routine("test_routine_cleanup")

# Create a dummy function to be deleted.
create_function_sql = f"""
CREATE OR REPLACE FUNCTION `{routine_ref.project}.{routine_ref.dataset_id}.{routine_ref.routine_id}`(x INT64)
RETURNS INT64 LANGUAGE python
OPTIONS (entry_point='dummy_func', runtime_version='python-3.11')
AS r'''
def dummy_func(x):
return x + 1
'''
"""
session.bqclient.query(create_function_sql).result()

assert session.bqclient.get_routine(routine_ref) is not None

mock_routine = mock.MagicMock(spec=bigquery.Routine)
mock_routine.created = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=100)
mock_routine.reference = routine_ref
mock_routine._properties = {"routineType": "SCALAR_FUNCTION"}
routines = [mock_routine]

with mock.patch.object(session.bqclient, "list_routines", return_value=routines):
session._anon_dataset_manager._cleanup_old_udfs()

with pytest.raises(google.cloud.exceptions.NotFound):
session.bqclient.get_routine(routine_ref)
2 changes: 1 addition & 1 deletion third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def to_gbq(
>>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
>>> destination = df.to_gbq(ordering_id="ordering_id")
>>> # The table created can be read outside of the current session.
>>> bpd.close_session() # Optional, to demonstrate a new session.
>>> bpd.close_session() # Optional, to demonstrate a new session. # doctest: +SKIP
>>> bpd.read_gbq(destination, index_col="ordering_id")
col1 col2
ordering_id
Expand Down