|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | from dataclasses import dataclass |
| 16 | +import datetime |
16 | 17 | import inspect |
17 | 18 | from typing import Callable, Iterable, Union |
| 19 | +import warnings |
18 | 20 |
|
19 | 21 | import google.cloud.bigquery as bigquery |
20 | 22 |
|
| 23 | +import bigframes.exceptions as bfe |
21 | 24 | import bigframes.session |
22 | 25 | import bigframes.session._io.bigquery as bf_io_bigquery |
23 | 26 |
|
24 | 27 | _PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} |
| 28 | +_UDF_CLEANUP_THRESHOLD_DAYS = 3 |
25 | 29 |
|
26 | 30 |
|
27 | 31 | @dataclass(frozen=True) |
@@ -66,12 +70,39 @@ def _output_bq_type(self): |
66 | 70 | sig = inspect.signature(self._func) |
67 | 71 | return _PYTHON_TO_BQ_TYPES[sig.return_annotation] |
68 | 72 |
|
| 73 | + def _cleanup_old_udfs(self): |
| 74 | + """Clean up old UDFs in the anonymous dataset.""" |
| 75 | + dataset = self._session._anon_dataset_manager.dataset |
| 76 | + routines = list(self._session.bqclient.list_routines(dataset)) |
| 77 | + seven_days_ago = datetime.datetime.now( |
| 78 | + datetime.timezone.utc |
| 79 | + ) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS) |
| 80 | + |
| 81 | + cleaned_up_routines = 0 |
| 82 | + for routine in routines: |
| 83 | + if ( |
| 84 | + routine.created < seven_days_ago |
| 85 | + and routine._properties["routineType"] == "SCALAR_FUNCTION" |
| 86 | + ): |
| 87 | + self._session.bqclient.delete_routine(routine.reference) |
| 88 | + cleaned_up_routines += 1 |
| 89 | + |
69 | 90 | def _create_udf(self): |
70 | 91 | """Create Python UDF in BQ. Return name of the UDF.""" |
71 | 92 | udf_name = str( |
72 | 93 | self._session._anon_dataset_manager.generate_unique_resource_id() |
73 | 94 | ) |
74 | 95 |
|
| 96 | + # Try to clean up the old Python UDFs in the anonymous dataset. Do not |
| 97 | + # raise an error when it fails for this step. |
| 98 | + try: |
| 99 | + self._cleanup_old_udfs() |
| 100 | + except Exception as e: |
| 101 | + msg = bfe.format_message( |
| 102 | + f"Failed to clean up the old Python UDFs before creating {udf_name}: {e}" |
| 103 | + ) |
| 104 | + warnings.warn(msg, category=bfe.CleanupFailedWarning) |
| 105 | + |
75 | 106 | func_body = inspect.getsource(self._func) |
76 | 107 | func_name = self._func.__name__ |
77 | 108 | packages = str(list(self._requirements)) |
|
0 commit comments