From c0f986fac93c97884c25b979dd5b8347d215a501 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 30 Jun 2025 20:23:29 +0000 Subject: [PATCH 1/2] feat: Add simple stats support to hybrid local pushdown --- bigframes/session/polars_executor.py | 10 +++++- bigframes/testing/engine_utils.py | 9 +++-- .../system/small/engines/test_aggregation.py | 33 +++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py index ec00e38606..9f0a950635 100644 --- a/bigframes/session/polars_executor.py +++ b/bigframes/session/polars_executor.py @@ -46,7 +46,15 @@ bigframes.operations.ge_op, bigframes.operations.le_op, ) -_COMPATIBLE_AGG_OPS = (agg_ops.SizeOp, agg_ops.SizeUnaryOp) +_COMPATIBLE_AGG_OPS = ( + agg_ops.SizeOp, + agg_ops.SizeUnaryOp, + agg_ops.MinOp, + agg_ops.MaxOp, + agg_ops.SumOp, + agg_ops.MeanOp, + agg_ops.CountOp, +) def _get_expr_ops(expr: expression.Expression) -> set[bigframes.operations.ScalarOp]: diff --git a/bigframes/testing/engine_utils.py b/bigframes/testing/engine_utils.py index f58e5951a1..2ef4aa70ca 100644 --- a/bigframes/testing/engine_utils.py +++ b/bigframes/testing/engine_utils.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pandas.testing + from bigframes.core import nodes from bigframes.session import semi_executor @@ -26,6 +28,7 @@ def assert_equivalence_execution( assert e1_result is not None assert e2_result is not None # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser - e1_table = e1_result.to_arrow_table().cast(node.schema.to_pyarrow()) - e2_table = e2_result.to_arrow_table().cast(node.schema.to_pyarrow()) - assert e1_table.equals(e2_table), f"{e1_table} is not equal to {e2_table}" + assert e1_result.schema == e2_result.schema + e1_table = e1_result.to_pandas() + e2_table = e2_result.to_pandas() + pandas.testing.assert_frame_equal(e1_table, e2_table, rtol=1e-10) diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py index 2c323a5f28..11fda441d5 100644 --- a/tests/system/small/engines/test_aggregation.py +++ b/tests/system/small/engines/test_aggregation.py @@ -25,6 +25,25 @@ REFERENCE_ENGINE = polars_executor.PolarsExecutor() +def apply_agg_to_all_valid( + array: array_value.ArrayValue, op: agg_ops.UnaryAggregateOp, excluded_cols=[] +) -> array_value.ArrayValue: + exprs_by_name = [] + for arg in array.column_ids: + if arg in excluded_cols: + continue + try: + _ = op.output_type(array.get_column_type(arg)) + expr = expression.UnaryAggregation(op, expression.deref(arg)) + name = f"{arg}-{op.name}" + exprs_by_name.append((expr, name)) + except TypeError: + continue + assert len(exprs_by_name) > 0 + new_arr = array.aggregate(exprs_by_name) + return new_arr + + @pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) def test_engines_aggregate_size( scalars_array_value: array_value.ArrayValue, @@ -48,6 +67,20 @@ def test_engines_aggregate_size( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize( + "op", + [agg_ops.min_op, agg_ops.max_op, agg_ops.mean_op, agg_ops.sum_op, agg_ops.count_op], +) +def test_engines_unary_aggregates( + scalars_array_value: array_value.ArrayValue, + engine, + op, +): + node = apply_agg_to_all_valid(scalars_array_value, op).node + assert_equivalence_execution(node, REFERENCE_ENGINE, engine) + + @pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) @pytest.mark.parametrize( "grouping_cols", From bc6be57695937829d1164818fd0b7565df5ed0c7 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 1 Jul 2025 20:49:57 +0000 Subject: [PATCH 2/2] comments --- bigframes/testing/engine_utils.py | 2 +- tests/system/small/engines/test_aggregation.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/bigframes/testing/engine_utils.py b/bigframes/testing/engine_utils.py index 2ef4aa70ca..8aa52cf51a 100644 --- a/bigframes/testing/engine_utils.py +++ b/bigframes/testing/engine_utils.py @@ -27,7 +27,7 @@ def assert_equivalence_execution( e2_result = engine2.execute(node, ordered=True) assert e1_result is not None assert e2_result is not None - # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser + # Convert to pandas, as pandas has better comparison utils than arrow assert e1_result.schema == e2_result.schema e1_table = e1_result.to_pandas() e2_table = e2_result.to_pandas() diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py index 11fda441d5..8530a6fefa 100644 --- a/tests/system/small/engines/test_aggregation.py +++ b/tests/system/small/engines/test_aggregation.py @@ -28,6 +28,9 @@ def apply_agg_to_all_valid( array: array_value.ArrayValue, op: agg_ops.UnaryAggregateOp, excluded_cols=[] ) -> array_value.ArrayValue: + """ + Apply the aggregation to every column in the array that has a compatible datatype. + """ exprs_by_name = [] for arg in array.column_ids: if arg in excluded_cols: