From 13a30e8c71007073e8b8a805a465328fc8c2b5cc Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 8 Sep 2025 17:17:11 +0000 Subject: [PATCH] proof of concept --- bigframes/bigquery/_operations/ai.py | 82 +++++++++++++++++++ .../ibis_compiler/scalar_op_registry.py | 16 ++++ bigframes/operations/__init__.py | 3 + bigframes/operations/ai_ops.py | 47 +++++++++++ .../ibis/backends/sql/compilers/base.py | 10 ++- .../ibis/expr/operations/udf.py | 10 +++ 6 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 bigframes/bigquery/_operations/ai.py create mode 100644 bigframes/operations/ai_ops.py diff --git a/bigframes/bigquery/_operations/ai.py b/bigframes/bigquery/_operations/ai.py new file mode 100644 index 0000000000..ea13afb057 --- /dev/null +++ b/bigframes/bigquery/_operations/ai.py @@ -0,0 +1,82 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json + +from typing import List, Literal, Mapping, Sequence, Tuple, Any + +from bigframes import series +from bigframes.operations import ai_ops + + +def ai_generate_bool( + prompt: series.Series | Sequence[str | series.Series], + *, + connection_id: str | None = None, + endpoint: str | None = None, + request_type: Literal["dedicated", "shared", "unspecified"] = "unspecified", + model_params: Mapping[Any, Any] | None = None, +) -> series.Series: + """ """ + + if request_type not in ("dedicated", "shared", "unspecified"): + raise ValueError(f"Unsupported request type: {request_type}") + + if isinstance(prompt, series.Series): + prompt_context, series_list = _separate_context_and_series([prompt]) + elif isinstance(prompt, Sequence): + prompt_context, series_list = _separate_context_and_series(prompt) + else: + raise ValueError(f"Unsupported prompt type: {type(prompt)}") + + if not series_list: + raise ValueError("Please provide at least one Series in the prompt") + + operator = ai_ops.AIGenerateBool( + tuple(prompt_context), + connection_id=connection_id or series_list[0]._session._bq_connection, + endpoint=endpoint, + request_type=request_type, + model_params=json.dumps(model_params) if model_params else None, + ) + + return series_list[0]._apply_nary_op(operator, series_list[1:]) + + +def _separate_context_and_series( + prompt: Sequence[str | series.Series], +) -> Tuple[List[str | None], List[series.Series]]: + """ + Returns the two values. The first value is the prompt with all series replaced by None. The second value is all the series + in the prompt. The original item order is kept. + + For example: + Input: ("str1", series1, "str2", "str3", series2) + Output: ["str1", None, "str2", "str3", None], [series1, series2] + """ + + prompt_context: List[str|None] = [] + series_list: List[series.Series] = [] + + for item in prompt: + if isinstance(item, str): + prompt_context.append(item) + elif isinstance(item, series.Series): + prompt_context.append(None) + else: + raise ValueError(f"Unsupported type in prompt: {type(item)}") + + return prompt_context, series_list \ No newline at end of file diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 044fc90306..708ad29523 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1228,6 +1228,11 @@ def array_reduce_op_impl(x: ibis_types.Value, op: ops.ArrayReduceOp): op.aggregation, typing.cast(ibis_types.Column, arr_vals) ) ) + +# AI Ops +@scalar_op_compiler.register_nary_op(ops.AIGenerateBool, pass_op=True) +def ai_generate_bool(*values: ibis_types.Value, op: ops.AIGenerateBool): + # JSON Ops @@ -2172,3 +2177,14 @@ def str_strip_op( # type: ignore[empty-body] x: ibis_dtypes.String, to_strip: ibis_dtypes.String ) -> ibis_dtypes.String: """Remove leading and trailing characters.""" + + +@ibis_udf.scalar.builtin(name="AI.GENERATE_BOOL", named_args=True, ignore_none_values=True) +def ai_generate_bool( # type: ignore[empty-body] + prompt: ibis_types.Value, + connection_id: str, + endpoint = None, + request_type = None, + model_params = None, +) -> ibis_dtypes.Value: + """Call AI.GENERATE_BOOL with the prompt.""" \ No newline at end of file diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index e5888ace00..81fb23df00 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -14,6 +14,7 @@ from __future__ import annotations +from bigframes.operations.ai_ops import AIGenerateBool from bigframes.operations.array_ops import ( ArrayIndexOp, ArrayReduceOp, @@ -408,6 +409,8 @@ "geo_x_op", "geo_y_op", "GeoStDistanceOp", + #AI ops + "AIGenerateBool", # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", diff --git a/bigframes/operations/ai_ops.py b/bigframes/operations/ai_ops.py new file mode 100644 index 0000000000..00cae40a16 --- /dev/null +++ b/bigframes/operations/ai_ops.py @@ -0,0 +1,47 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +from typing import Tuple, Literal + +import pandas as pd +import pyarrow as pa + +from bigframes import dtypes +from bigframes.operations import base_ops + + +@dataclasses.dataclass(frozen=True) +class AIGenerateBool(base_ops.NaryOp): + + # Prompt with column referneces replaced with "None" placeholder + prompt_context: Tuple[str | None, ...] + + connection_id: str + endpoint: str | None + request_type: Literal["dedicated", "shared", "unspecified"] + model_params: str | None + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + return pd.ArrowDtype( + pa.struct( + ( + pa.field("result", pa.bool_()), + pa.field("full_response", pa.string()), + pa.field("status", pa.string()), + ) + ) + ) diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py index cbc51e59d6..db280ec122 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py @@ -1233,7 +1233,15 @@ def __sql_name__(self, op: ops.ScalarUDF | ops.AggUDF) -> str: ) def visit_ScalarUDF(self, op, **kw): - return self.f[self.__sql_name__(op)](*kw.values()) + if op.__config__.get("named_args"): + args = [] + for name, value in kw.items(): + if op.__config__.get("ignore_none_values") and isinstance(value, sge.Null): + continue + args.append(sge.Kwarg(this=sg.to_identifier(name), expression=value)) + else: + args = list(kw.values()) + return self.f[self.__sql_name__(op)](*args) def visit_AggUDF(self, op, *, where, **kw): return self.agg[self.__sql_name__(op)](*kw.values(), where=where) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/udf.py b/third_party/bigframes_vendored/ibis/expr/operations/udf.py index 91366cace8..82c85b1733 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/udf.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/udf.py @@ -198,6 +198,8 @@ def builtin( database: str | None = None, catalog: str | None = None, signature: tuple[tuple[Any, ...], Any] | None = None, + named_args: bool = False, + ignore_none_values = False, **kwargs: Any, ) -> Callable[[Callable], Callable[..., ir.Value]]: ... @@ -212,6 +214,8 @@ def builtin( database=None, catalog=None, signature=None, + named_args=False, + ignore_none_values=False, **kwargs, ): """Construct a scalar user-defined function that is built-in to the backend. @@ -235,6 +239,10 @@ def builtin( For **builtin** UDFs, only the **return type** annotation is required. See [the user guide](/how-to/extending/builtin.qmd#input-types) for more information. + named_args + Whether to compile the function with named arguments. + ignore_none_values + If true, named arguments whose value is None do no appear in the compiled SQL. kwargs Additional backend-specific configuration arguments for the UDF. @@ -258,6 +266,8 @@ def builtin( database=database, catalog=catalog, signature=signature, + named_args=named_args, + ignore_none_values=ignore_none_values, **kwargs, )