From 1edf87cb52424174506f9fd5389ac9414e336d95 Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 26 Sep 2025 23:27:06 +0000 Subject: [PATCH 1/4] fix!: Address the series input type issue in bigframes functions --- bigframes/functions/_function_session.py | 13 +++++++- bigframes/functions/function_template.py | 18 ++++++++--- .../large/functions/test_managed_function.py | 31 ++++++++++++------- .../large/functions/test_remote_function.py | 28 +++++++++++------ .../small/functions/test_remote_function.py | 13 ++++---- 5 files changed, 68 insertions(+), 35 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 9a38ef1957..21c640f35f 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -983,7 +983,18 @@ def _convert_row_processor_sig( if len(signature.parameters) >= 1: first_param = next(iter(signature.parameters.values())) param_type = first_param.annotation - if (param_type == bf_series.Series) or (param_type == pandas.Series): + # Type hints for Series inputs should use pandas.Series because the + # underlying serialization process converts the input to a string + # representation of a pandas Series (not bigframes Series). Using + # bigframes Series will lead to TypeError when creating the function + # remotely. See more from b/445182819. + if param_type == bf_series.Series: + raise bf_formatting.create_exception_with_feedback_link( + TypeError, + "Due to current UDF serialization requirements, argument type " + "hint must be Pandas Series, not BigFrames Series.", + ) + if param_type == pandas.Series: msg = bfe.format_message("input_types=Series is in preview.") warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) return signature.replace( diff --git a/bigframes/functions/function_template.py b/bigframes/functions/function_template.py index dd31de7243..a3680a7a88 100644 --- a/bigframes/functions/function_template.py +++ b/bigframes/functions/function_template.py @@ -363,8 +363,16 @@ def generate_managed_function_code( return {udf_name}(*args)""" ) - udf_code_block = textwrap.dedent( - f"{udf_code}\n{func_code}\n{bigframes_handler_code}" - ) - - return udf_code_block + udf_code_block = [] + if not capture_references and is_row_processor: + # Enable postponed evaluation of type annotations. This converts all + # type hints to strings at runtime, which is necessary for correctly + # handling the type annotation of pandas.Series after the UDF code is + # serialized for remote execution. See more from b/445182819. + udf_code_block.append("from __future__ import annotations") + + udf_code_block.append(udf_code) + udf_code_block.append(func_code) + udf_code_block.append(bigframes_handler_code) + + return textwrap.dedent("\n".join(udf_code_block)) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index e74bc8579f..b87bcc4348 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -701,8 +701,19 @@ def serialize_row(row): } ) + with pytest.raises( + TypeError, + match="Due to current UDF serialization requirements, argument type hint must be Pandas Series, not BigFrames Series.", + ): + serialize_row_mf = session.udf( + input_types=bigframes.series.Series, + output_type=str, + dataset=dataset_id, + name=prefixer.create_prefix(), + )(serialize_row) + serialize_row_mf = session.udf( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, dataset=dataset_id, name=prefixer.create_prefix(), @@ -762,7 +773,7 @@ def analyze(row): ): analyze_mf = session.udf( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, dataset=dataset_id, name=prefixer.create_prefix(), @@ -876,7 +887,7 @@ def serialize_row(row): ) serialize_row_mf = session.udf( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, dataset=dataset_id, name=prefixer.create_prefix(), @@ -926,7 +937,7 @@ def test_managed_function_df_apply_axis_1_na_nan_inf(dataset_id, session): try: - def float_parser(row): + def float_parser(row: pandas.Series): import numpy as mynp import pandas as mypd @@ -937,7 +948,7 @@ def float_parser(row): return float(row["text"]) float_parser_mf = session.udf( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=float, dataset=dataset_id, name=prefixer.create_prefix(), @@ -1027,7 +1038,7 @@ def test_managed_function_df_apply_axis_1_series_args(session, dataset_id, scala try: - def analyze(s, x, y): + def analyze(s: pandas.Series, x: bool, y: float) -> str: value = f"value is {s['int64_col']} and {s['float64_col']}" if x: return f"{value}, x is True!" @@ -1036,8 +1047,6 @@ def analyze(s, x, y): return f"{value}, x is False, y is non-positive!" analyze_mf = session.udf( - input_types=[bigframes.series.Series, bool, float], - output_type=str, dataset=dataset_id, name=prefixer.create_prefix(), )(analyze) @@ -1151,7 +1160,7 @@ def is_sum_positive_series(s): return s["int64_col"] + s["int64_too"] > 0 is_sum_positive_series_mf = session.udf( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=bool, dataset=dataset_id, name=prefixer.create_prefix(), @@ -1217,12 +1226,10 @@ def func_for_other(x): def test_managed_function_df_where_other_issue(session, dataset_id, scalars_df_index): try: - def the_sum(s): + def the_sum(s: pandas.Series) -> int: return s["int64_col"] + s["int64_too"] the_sum_mf = session.udf( - input_types=bigframes.series.Series, - output_type=int, dataset=dataset_id, name=prefixer.create_prefix(), )(the_sum) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 55643d9a60..8b67af5fe4 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -1722,7 +1722,7 @@ def serialize_row(row): ) serialize_row_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, reuse=False, cloud_function_service_account="default", @@ -1771,7 +1771,7 @@ def analyze(row): ) analyze_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, reuse=False, cloud_function_service_account="default", @@ -1895,7 +1895,7 @@ def serialize_row(row): ) serialize_row_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=str, reuse=False, cloud_function_service_account="default", @@ -1944,7 +1944,7 @@ def test_df_apply_axis_1_na_nan_inf(session): try: - def float_parser(row): + def float_parser(row: pandas.Series): import numpy as mynp import pandas as mypd @@ -1955,7 +1955,6 @@ def float_parser(row): return float(row["text"]) float_parser_remote = session.remote_function( - input_types=bigframes.series.Series, output_type=float, reuse=False, cloud_function_service_account="default", @@ -2055,12 +2054,12 @@ def test_df_apply_axis_1_series_args(session, scalars_dfs): try: @session.remote_function( - input_types=[bigframes.series.Series, float, str, bool], + input_types=[pandas.Series, float, str, bool], output_type=list[str], reuse=False, cloud_function_service_account="default", ) - def foo_list(x, y0: float, y1, y2) -> list[str]: + def foo_list(x: pandas.Series, y0: float, y1, y2) -> list[str]: return ( [str(x["int64_col"]), str(y0), str(y1), str(y2)] if y2 @@ -3087,12 +3086,21 @@ def test_remote_function_df_where_mask_series(session, dataset_id, scalars_dfs): try: # The return type has to be bool type for callable where condition. - def is_sum_positive_series(s): + def is_sum_positive_series(s: pandas.Series) -> bool: return s["int64_col"] + s["int64_too"] > 0 + with pytest.raises( + TypeError, + match="Due to current UDF serialization requirements, argument type hint must be Pandas Series, not BigFrames Series.", + ): + session.remote_function( + input_types=bigframes.series.Series, + dataset=dataset_id, + reuse=False, + cloud_function_service_account="default", + )(is_sum_positive_series) + is_sum_positive_series_mf = session.remote_function( - input_types=bigframes.series.Series, - output_type=bool, dataset=dataset_id, reuse=False, cloud_function_service_account="default", diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 28fab19144..15070a3a29 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -20,6 +20,7 @@ import bigframes_vendored.constants as constants import google.api_core.exceptions from google.cloud import bigquery +import pandas import pandas as pd import pyarrow import pytest @@ -1166,7 +1167,7 @@ def test_df_apply_axis_1(session, scalars_dfs, dataset_id_permanent): ] scalars_df, scalars_pandas_df = scalars_dfs - def add_ints(row): + def add_ints(row: pandas.Series) -> int: return row["int64_col"] + row["int64_too"] with pytest.warns( @@ -1174,8 +1175,6 @@ def add_ints(row): match="input_types=Series is in preview.", ): add_ints_remote = session.remote_function( - input_types=bigframes.series.Series, - output_type=int, dataset=dataset_id_permanent, name=get_function_name(add_ints, is_row_processor=True), cloud_function_service_account="default", @@ -1223,11 +1222,11 @@ def test_df_apply_axis_1_ordering(session, scalars_dfs, dataset_id_permanent): ordering_columns = ["bool_col", "int64_col"] scalars_df, scalars_pandas_df = scalars_dfs - def add_ints(row): + def add_ints(row: pandas.Series) -> int: return row["int64_col"] + row["int64_too"] add_ints_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=int, dataset=dataset_id_permanent, name=get_function_name(add_ints, is_row_processor=True), @@ -1267,7 +1266,7 @@ def add_numbers(row): return row["x"] + row["y"] add_numbers_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=float, dataset=dataset_id_permanent, name=get_function_name(add_numbers, is_row_processor=True), @@ -1321,7 +1320,7 @@ def echo_len(row): return len(row) echo_len_remote = session.remote_function( - input_types=bigframes.series.Series, + input_types=pandas.Series, output_type=float, dataset=dataset_id_permanent, name=get_function_name(echo_len, is_row_processor=True), From 464934f231647291355d61381a364f8e2d11e05e Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 29 Sep 2025 18:33:26 +0000 Subject: [PATCH 2/4] fix the tests --- bigframes/session/__init__.py | 2 +- tests/unit/functions/test_remote_function.py | 19 +++---------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f0cec864b4..03e6517903 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -2065,7 +2065,7 @@ def read_gbq_function( parameter. >>> @bpd.remote_function(cloud_function_service_account="default") - ... def row_sum(s: bpd.Series) -> float: + ... def row_sum(s: pd.Series) -> float: ... return s['a'] + s['b'] + s['c'] >>> row_sum_ref = bpd.read_gbq_function( diff --git a/tests/unit/functions/test_remote_function.py b/tests/unit/functions/test_remote_function.py index ea09ac59d3..e9e0d0df67 100644 --- a/tests/unit/functions/test_remote_function.py +++ b/tests/unit/functions/test_remote_function.py @@ -17,25 +17,12 @@ import pandas import pytest +import bigframes.exceptions import bigframes.functions.function as bff -import bigframes.series from bigframes.testing import mocks -@pytest.mark.parametrize( - "series_type", - ( - pytest.param( - pandas.Series, - id="pandas.Series", - ), - pytest.param( - bigframes.series.Series, - id="bigframes.series.Series", - ), - ), -) -def test_series_input_types_to_str(series_type): +def test_series_input_types_to_str(): """Check that is_row_processor=True uses str as the input type to serialize a row.""" session = mocks.create_bigquery_session() remote_function_decorator = bff.remote_function( @@ -48,7 +35,7 @@ def test_series_input_types_to_str(series_type): ): @remote_function_decorator - def axis_1_function(myparam: series_type) -> str: # type: ignore + def axis_1_function(myparam: pandas.Series) -> str: # type: ignore return "Hello, " + myparam["str_col"] + "!" # type: ignore # Still works as a normal function. From c74df6a270bef5358930d11b4b077ded8aed6769 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 29 Sep 2025 19:51:19 +0000 Subject: [PATCH 3/4] fix doctest --- bigframes/session/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 03e6517903..df0afb4c8d 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -2064,6 +2064,7 @@ def read_gbq_function( note, row processor implies that the function has only one input parameter. + >>> import pandas as pd >>> @bpd.remote_function(cloud_function_service_account="default") ... def row_sum(s: pd.Series) -> float: ... return s['a'] + s['b'] + s['c'] From cb4ea23aa2cfbd2776cbb05d43566d9727db4c03 Mon Sep 17 00:00:00 2001 From: jialuo Date: Tue, 7 Oct 2025 16:43:01 +0000 Subject: [PATCH 4/4] fix error message --- bigframes/functions/_function_session.py | 3 +-- tests/system/large/functions/test_managed_function.py | 2 +- tests/system/large/functions/test_remote_function.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 21c640f35f..a456f05417 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -991,8 +991,7 @@ def _convert_row_processor_sig( if param_type == bf_series.Series: raise bf_formatting.create_exception_with_feedback_link( TypeError, - "Due to current UDF serialization requirements, argument type " - "hint must be Pandas Series, not BigFrames Series.", + "Argument type hint must be Pandas Series, not BigFrames Series.", ) if param_type == pandas.Series: msg = bfe.format_message("input_types=Series is in preview.") diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index b87bcc4348..732123ec84 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -703,7 +703,7 @@ def serialize_row(row): with pytest.raises( TypeError, - match="Due to current UDF serialization requirements, argument type hint must be Pandas Series, not BigFrames Series.", + match="Argument type hint must be Pandas Series, not BigFrames Series.", ): serialize_row_mf = session.udf( input_types=bigframes.series.Series, diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 8b67af5fe4..00b1b5f1f0 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -3091,7 +3091,7 @@ def is_sum_positive_series(s: pandas.Series) -> bool: with pytest.raises( TypeError, - match="Due to current UDF serialization requirements, argument type hint must be Pandas Series, not BigFrames Series.", + match="Argument type hint must be Pandas Series, not BigFrames Series.", ): session.remote_function( input_types=bigframes.series.Series,