diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py index e60bef1819..28ab421905 100644 --- a/bigframes/session/polars_executor.py +++ b/bigframes/session/polars_executor.py @@ -47,7 +47,15 @@ bigframes.operations.ge_op, bigframes.operations.le_op, ) -_COMPATIBLE_AGG_OPS = (agg_ops.SizeOp, agg_ops.SizeUnaryOp) +_COMPATIBLE_AGG_OPS = ( + agg_ops.SizeOp, + agg_ops.SizeUnaryOp, + agg_ops.MinOp, + agg_ops.MaxOp, + agg_ops.SumOp, + agg_ops.MeanOp, + agg_ops.CountOp, +) def _get_expr_ops(expr: expression.Expression) -> set[bigframes.operations.ScalarOp]: diff --git a/bigframes/testing/engine_utils.py b/bigframes/testing/engine_utils.py index f58e5951a1..8aa52cf51a 100644 --- a/bigframes/testing/engine_utils.py +++ b/bigframes/testing/engine_utils.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pandas.testing + from bigframes.core import nodes from bigframes.session import semi_executor @@ -25,7 +27,8 @@ def assert_equivalence_execution( e2_result = engine2.execute(node, ordered=True) assert e1_result is not None assert e2_result is not None - # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser - e1_table = e1_result.to_arrow_table().cast(node.schema.to_pyarrow()) - e2_table = e2_result.to_arrow_table().cast(node.schema.to_pyarrow()) - assert e1_table.equals(e2_table), f"{e1_table} is not equal to {e2_table}" + # Convert to pandas, as pandas has better comparison utils than arrow + assert e1_result.schema == e2_result.schema + e1_table = e1_result.to_pandas() + e2_table = e2_result.to_pandas() + pandas.testing.assert_frame_equal(e1_table, e2_table, rtol=1e-10) diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py index 2c323a5f28..8530a6fefa 100644 --- a/tests/system/small/engines/test_aggregation.py +++ b/tests/system/small/engines/test_aggregation.py @@ -25,6 +25,28 @@ REFERENCE_ENGINE = polars_executor.PolarsExecutor() +def apply_agg_to_all_valid( + array: array_value.ArrayValue, op: agg_ops.UnaryAggregateOp, excluded_cols=[] +) -> array_value.ArrayValue: + """ + Apply the aggregation to every column in the array that has a compatible datatype. + """ + exprs_by_name = [] + for arg in array.column_ids: + if arg in excluded_cols: + continue + try: + _ = op.output_type(array.get_column_type(arg)) + expr = expression.UnaryAggregation(op, expression.deref(arg)) + name = f"{arg}-{op.name}" + exprs_by_name.append((expr, name)) + except TypeError: + continue + assert len(exprs_by_name) > 0 + new_arr = array.aggregate(exprs_by_name) + return new_arr + + @pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) def test_engines_aggregate_size( scalars_array_value: array_value.ArrayValue, @@ -48,6 +70,20 @@ def test_engines_aggregate_size( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize( + "op", + [agg_ops.min_op, agg_ops.max_op, agg_ops.mean_op, agg_ops.sum_op, agg_ops.count_op], +) +def test_engines_unary_aggregates( + scalars_array_value: array_value.ArrayValue, + engine, + op, +): + node = apply_agg_to_all_valid(scalars_array_value, op).node + assert_equivalence_execution(node, REFERENCE_ENGINE, engine) + + @pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) @pytest.mark.parametrize( "grouping_cols",