|
74 | 74 | import bigframes.operations.datetimes as dt |
75 | 75 | import bigframes.operations.lists as lists |
76 | 76 | import bigframes.operations.plotting as plotting |
| 77 | +import bigframes.operations.python_op_maps as python_ops |
77 | 78 | import bigframes.operations.strings as strings |
78 | 79 | import bigframes.operations.structs as structs |
79 | 80 | import bigframes.session |
@@ -2030,88 +2031,97 @@ def apply( |
2030 | 2031 | if by_row not in ["compat", False]: |
2031 | 2032 | raise ValueError("Param by_row must be one of 'compat' or False") |
2032 | 2033 |
|
2033 | | - if not callable(func): |
| 2034 | + if not callable(func) and not isinstance(func, numpy.ufunc): |
2034 | 2035 | raise ValueError( |
2035 | 2036 | "Only a ufunc (a function that applies to the entire Series) or" |
2036 | 2037 | " a BigFrames BigQuery function that only works on single values" |
2037 | 2038 | " are supported." |
2038 | 2039 | ) |
2039 | 2040 |
|
2040 | | - if not isinstance(func, bigframes.functions.BigqueryCallableRoutine): |
2041 | | - # It is neither a remote function nor a managed function. |
2042 | | - # Then it must be a vectorized function that applies to the Series |
2043 | | - # as a whole. |
2044 | | - if by_row: |
2045 | | - raise ValueError( |
2046 | | - "You have passed a function as-is. If your intention is to " |
2047 | | - "apply this function in a vectorized way (i.e. to the " |
2048 | | - "entire Series as a whole, and you are sure that it " |
2049 | | - "performs only the operations that are implemented for a " |
2050 | | - "Series (e.g. a chain of arithmetic/logical operations, " |
2051 | | - "such as `def foo(s): return s % 2 == 1`), please also " |
2052 | | - "specify `by_row=False`. If your function contains " |
2053 | | - "arbitrary code, it can only be applied to every element " |
2054 | | - "in the Series individually, in which case you must " |
2055 | | - "convert it to a BigFrames BigQuery function using " |
2056 | | - "`bigframes.pandas.udf`, " |
2057 | | - "or `bigframes.pandas.remote_function` before passing." |
| 2041 | + if isinstance(func, bigframes.functions.BigqueryCallableRoutine): |
| 2042 | + # We are working with bigquery function at this point |
| 2043 | + if args: |
| 2044 | + result_series = self._apply_nary_op( |
| 2045 | + ops.NaryRemoteFunctionOp(function_def=func.udf_def), args |
| 2046 | + ) |
| 2047 | + # TODO(jialuo): Investigate why `_apply_nary_op` drops the series |
| 2048 | + # `name`. Manually reassigning it here as a temporary fix. |
| 2049 | + result_series.name = self.name |
| 2050 | + else: |
| 2051 | + result_series = self._apply_unary_op( |
| 2052 | + ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) |
2058 | 2053 | ) |
| 2054 | + result_series = func._post_process_series(result_series) |
2059 | 2055 |
|
2060 | | - try: |
2061 | | - return func(self) |
2062 | | - except Exception as ex: |
2063 | | - # This could happen if any of the operators in func is not |
2064 | | - # supported on a Series. Let's guide the customer to use a |
2065 | | - # bigquery function instead |
2066 | | - if hasattr(ex, "message"): |
2067 | | - ex.message += f"\n{_bigquery_function_recommendation_message}" |
2068 | | - raise |
2069 | | - |
2070 | | - # We are working with bigquery function at this point |
2071 | | - if args: |
2072 | | - result_series = self._apply_nary_op( |
2073 | | - ops.NaryRemoteFunctionOp(function_def=func.udf_def), args |
2074 | | - ) |
2075 | | - # TODO(jialuo): Investigate why `_apply_nary_op` drops the series |
2076 | | - # `name`. Manually reassigning it here as a temporary fix. |
2077 | | - result_series.name = self.name |
2078 | | - else: |
2079 | | - result_series = self._apply_unary_op( |
2080 | | - ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) |
| 2056 | + return result_series |
| 2057 | + |
| 2058 | + bf_op = python_ops.python_callable_to_op(func) |
| 2059 | + if bf_op and isinstance(bf_op, ops.UnaryOp): |
| 2060 | + return self._apply_unary_op(bf_op) |
| 2061 | + |
| 2062 | + # It is neither a remote function nor a managed function. |
| 2063 | + # Then it must be a vectorized function that applies to the Series |
| 2064 | + # as a whole. |
| 2065 | + if by_row: |
| 2066 | + raise ValueError( |
| 2067 | + "You have passed a function as-is. If your intention is to " |
| 2068 | + "apply this function in a vectorized way (i.e. to the " |
| 2069 | + "entire Series as a whole, and you are sure that it " |
| 2070 | + "performs only the operations that are implemented for a " |
| 2071 | + "Series (e.g. a chain of arithmetic/logical operations, " |
| 2072 | + "such as `def foo(s): return s % 2 == 1`), please also " |
| 2073 | + "specify `by_row=False`. If your function contains " |
| 2074 | + "arbitrary code, it can only be applied to every element " |
| 2075 | + "in the Series individually, in which case you must " |
| 2076 | + "convert it to a BigFrames BigQuery function using " |
| 2077 | + "`bigframes.pandas.udf`, " |
| 2078 | + "or `bigframes.pandas.remote_function` before passing." |
2081 | 2079 | ) |
2082 | | - result_series = func._post_process_series(result_series) |
2083 | 2080 |
|
2084 | | - return result_series |
| 2081 | + try: |
| 2082 | + return func(self) # type: ignore |
| 2083 | + except Exception as ex: |
| 2084 | + # This could happen if any of the operators in func is not |
| 2085 | + # supported on a Series. Let's guide the customer to use a |
| 2086 | + # bigquery function instead |
| 2087 | + if hasattr(ex, "message"): |
| 2088 | + ex.message += f"\n{_bigquery_function_recommendation_message}" |
| 2089 | + raise |
2085 | 2090 |
|
2086 | 2091 | def combine( |
2087 | 2092 | self, |
2088 | 2093 | other, |
2089 | 2094 | func, |
2090 | 2095 | ) -> Series: |
2091 | | - if not callable(func): |
| 2096 | + if not callable(func) and not isinstance(func, numpy.ufunc): |
2092 | 2097 | raise ValueError( |
2093 | 2098 | "Only a ufunc (a function that applies to the entire Series) or" |
2094 | 2099 | " a BigFrames BigQuery function that only works on single values" |
2095 | 2100 | " are supported." |
2096 | 2101 | ) |
2097 | 2102 |
|
2098 | | - if not isinstance(func, bigframes.functions.BigqueryCallableRoutine): |
2099 | | - # Keep this in sync with .apply |
2100 | | - try: |
2101 | | - return func(self, other) |
2102 | | - except Exception as ex: |
2103 | | - # This could happen if any of the operators in func is not |
2104 | | - # supported on a Series. Let's guide the customer to use a |
2105 | | - # bigquery function instead |
2106 | | - if hasattr(ex, "message"): |
2107 | | - ex.message += f"\n{_bigquery_function_recommendation_message}" |
2108 | | - raise |
2109 | | - |
2110 | | - result_series = self._apply_binary_op( |
2111 | | - other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) |
2112 | | - ) |
2113 | | - result_series = func._post_process_series(result_series) |
2114 | | - return result_series |
| 2103 | + if isinstance(func, bigframes.functions.BigqueryCallableRoutine): |
| 2104 | + result_series = self._apply_binary_op( |
| 2105 | + other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) |
| 2106 | + ) |
| 2107 | + result_series = func._post_process_series(result_series) |
| 2108 | + return result_series |
| 2109 | + |
| 2110 | + bf_op = python_ops.python_callable_to_op(func) |
| 2111 | + if bf_op and isinstance(bf_op, ops.BinaryOp): |
| 2112 | + result_series = self._apply_binary_op(other, bf_op) |
| 2113 | + return result_series |
| 2114 | + |
| 2115 | + # Keep this in sync with .apply |
| 2116 | + try: |
| 2117 | + return func(self, other) |
| 2118 | + except Exception as ex: |
| 2119 | + # This could happen if any of the operators in func is not |
| 2120 | + # supported on a Series. Let's guide the customer to use a |
| 2121 | + # bigquery function instead |
| 2122 | + if hasattr(ex, "message"): |
| 2123 | + ex.message += f"\n{_bigquery_function_recommendation_message}" |
| 2124 | + raise |
2115 | 2125 |
|
2116 | 2126 | @validations.requires_index |
2117 | 2127 | def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series: |
|
0 commit comments