Skip to content

Commit d619add

Browse files
committed
fix with some test code
1 parent badddcb commit d619add

File tree

3 files changed

+71
-37
lines changed

3 files changed

+71
-37
lines changed

bigframes/blob/_functions.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,11 @@
1313
# limitations under the License.
1414

1515
from dataclasses import dataclass
16-
import datetime
1716
import inspect
1817
from typing import Callable, Iterable, Union
19-
import warnings
2018

2119
import google.cloud.bigquery as bigquery
2220

23-
import bigframes.exceptions as bfe
2421
import bigframes.session
2522
import bigframes.session._io.bigquery as bf_io_bigquery
2623

@@ -31,9 +28,6 @@
3128
bytes: "BYTES",
3229
bool: "BOOL",
3330
}
34-
# UDFs older than this many days are considered stale and will be deleted
35-
# from the anonymous dataset before creating a new UDF.
36-
_UDF_CLEANUP_THRESHOLD_DAYS = 3
3731

3832

3933
@dataclass(frozen=True)
@@ -78,43 +72,12 @@ def _output_bq_type(self):
7872
sig = inspect.signature(self._func)
7973
return _PYTHON_TO_BQ_TYPES[sig.return_annotation]
8074

81-
def _cleanup_old_udfs(self):
82-
"""Clean up old UDFs in the anonymous dataset."""
83-
dataset = self._session._anon_dataset_manager.dataset
84-
routines = list(self._session.bqclient.list_routines(dataset))
85-
cleanup_cutoff_time = datetime.datetime.now(
86-
datetime.timezone.utc
87-
) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS)
88-
89-
for routine in routines:
90-
if (
91-
routine.created < cleanup_cutoff_time
92-
and routine._properties["routineType"] == "SCALAR_FUNCTION"
93-
):
94-
try:
95-
self._session.bqclient.delete_routine(routine.reference)
96-
except Exception:
97-
pass
98-
9975
def _create_udf(self):
10076
"""Create Python UDF in BQ. Return name of the UDF."""
10177
udf_name = str(
10278
self._session._anon_dataset_manager.generate_unique_resource_id()
10379
)
10480

105-
try:
106-
# Before creating a new UDF, attempt to clean up any uncollected,
107-
# old Python UDFs residing in the anonymous dataset. These UDFs
108-
# accumulate over time and can eventually exceed resource limits.
109-
# See more from b/450913424.
110-
self._cleanup_old_udfs()
111-
except Exception as e:
112-
# Log a warning on the failure, do not interrupt the workflow.
113-
msg = bfe.format_message(
114-
f"Failed to clean up the old Python UDFs before creating {udf_name}: {e}"
115-
)
116-
warnings.warn(msg, category=bfe.CleanupFailedWarning)
117-
11881
func_body = inspect.getsource(self._func)
11982
func_name = self._func.__name__
12083
packages = str(list(self._requirements))

bigframes/session/anonymous_dataset.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@
1616
import threading
1717
from typing import List, Optional, Sequence
1818
import uuid
19+
import warnings
1920

2021
import google.cloud.bigquery as bigquery
2122

2223
from bigframes import constants
2324
import bigframes.core.events
25+
import bigframes.exceptions as bfe
2426
from bigframes.session import temporary_storage
2527
import bigframes.session._io.bigquery as bf_io_bigquery
2628

2729
_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
30+
# UDFs older than this many days are considered stale and will be deleted
31+
# from the anonymous dataset before creating a new UDF.
32+
_UDF_CLEANUP_THRESHOLD_DAYS = 30
2833

2934

3035
class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager):
@@ -137,8 +142,41 @@ def generate_unique_resource_id(self) -> bigquery.TableReference:
137142
)
138143
return self.dataset.table(table_id)
139144

145+
def _cleanup_old_udfs(self):
146+
"""Clean up old UDFs in the anonymous dataset."""
147+
dataset = self.dataset
148+
routines = list(self.bqclient.list_routines(dataset))
149+
cleanup_cutoff_time = datetime.datetime.now(
150+
datetime.timezone.utc
151+
) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS)
152+
153+
for routine in routines:
154+
print("Routine: ", routine.routine_id, routine.created)
155+
if (
156+
routine.created < cleanup_cutoff_time
157+
and routine._properties["routineType"] == "SCALAR_FUNCTION"
158+
):
159+
try:
160+
self.bqclient.delete_routine(routine.reference)
161+
print(">>>> This routine gets deleted!")
162+
except Exception:
163+
pass
164+
140165
def close(self):
141166
"""Delete tables that were created with this session's session_id."""
142167
for table_ref in self._table_ids:
143168
self.bqclient.delete_table(table_ref, not_found_ok=True)
144169
self._table_ids.clear()
170+
171+
try:
172+
# Before closing the session, attempt to clean up any uncollected,
173+
# old Python UDFs residing in the anonymous dataset. These UDFs
174+
# accumulate over time and can eventually exceed resource limits.
175+
# See more from b/450913424.
176+
self._cleanup_old_udfs()
177+
except Exception as e:
178+
# Log a warning on the failure, do not interrupt the workflow.
179+
msg = bfe.format_message(
180+
f"Failed to clean up the old Python UDFs before closing the session: {e}"
181+
)
182+
warnings.warn(msg, category=bfe.CleanupFailedWarning)

tests/system/large/test_session.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import datetime
16+
from unittest import mock
1617

1718
import google.cloud.bigquery as bigquery
1819
import google.cloud.exceptions
@@ -138,3 +139,35 @@ def test_clean_up_via_context_manager(session_creator):
138139
bqclient.delete_table(full_id_1)
139140
with pytest.raises(google.cloud.exceptions.NotFound):
140141
bqclient.delete_table(full_id_2)
142+
143+
144+
def test_cleanup_old_udfs(session: bigframes.Session):
145+
routine_ref = session._anon_dataset_manager.dataset.routine("test_routine_cleanup")
146+
147+
# Create a dummy function to be deleted.
148+
create_function_sql = f"""
149+
CREATE OR REPLACE FUNCTION `{routine_ref.project}.{routine_ref.dataset_id}.{routine_ref.routine_id}`(x INT64)
150+
RETURNS INT64 LANGUAGE python
151+
OPTIONS (entry_point='dummy_func', runtime_version='python-3.11')
152+
AS r'''
153+
def dummy_func(x):
154+
return x + 1
155+
'''
156+
"""
157+
session.bqclient.query(create_function_sql).result()
158+
159+
assert session.bqclient.get_routine(routine_ref) is not None
160+
161+
mock_routine = mock.MagicMock(spec=bigquery.Routine)
162+
mock_routine.created = datetime.datetime.now(
163+
datetime.timezone.utc
164+
) - datetime.timedelta(days=100)
165+
mock_routine.reference = routine_ref
166+
mock_routine._properties = {"routineType": "SCALAR_FUNCTION"}
167+
routines = [mock_routine]
168+
169+
with mock.patch.object(session.bqclient, "list_routines", return_value=routines):
170+
session._anon_dataset_manager._cleanup_old_udfs()
171+
172+
with pytest.raises(google.cloud.exceptions.NotFound):
173+
session.bqclient.get_routine(routine_ref)

0 commit comments

Comments
 (0)