From b0f14bd048c06fa7d36bed066a7a5467ab9f6a0b Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 17 Sep 2025 23:59:12 +0000 Subject: [PATCH] refactor: add agg_ops.MeanOp for sqlglot compiler --- .../sqlglot/aggregations/unary_compiler.py | 20 ++++++++++++++ .../system/small/engines/test_aggregation.py | 2 +- .../test_unary_compiler/test_mean/out.sql | 27 +++++++++++++++++++ .../aggregations/test_unary_compiler.py | 27 +++++++++++++++++++ 4 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_mean/out.sql diff --git a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py index 4cb0000894..8ed5510ec2 100644 --- a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py @@ -56,6 +56,26 @@ def _( return apply_window_if_present(sge.func("MAX", column.expr), window) +@UNARY_OP_REGISTRATION.register(agg_ops.MeanOp) +def _( + op: agg_ops.MeanOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + expr = column.expr + if column.dtype == dtypes.BOOL_DTYPE: + expr = sge.Cast(this=expr, to="INT64") + + expr = sge.func("AVG", expr) + + should_floor_result = ( + op.should_floor_result or column.dtype == dtypes.TIMEDELTA_DTYPE + ) + if should_floor_result: + expr = sge.Cast(this=sge.func("FLOOR", expr), to="INT64") + return apply_window_if_present(expr, window) + + @UNARY_OP_REGISTRATION.register(agg_ops.MedianOp) def _( op: agg_ops.MedianOp, diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py index 98d5cd4ac8..9b4efe8cbe 100644 --- a/tests/system/small/engines/test_aggregation.py +++ b/tests/system/small/engines/test_aggregation.py @@ -71,7 +71,7 @@ def test_engines_aggregate_size( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) @pytest.mark.parametrize( "op", [agg_ops.min_op, agg_ops.max_op, agg_ops.mean_op, agg_ops.sum_op, agg_ops.count_op], diff --git a/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_mean/out.sql b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_mean/out.sql new file mode 100644 index 0000000000..6d4bb6f89a --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_mean/out.sql @@ -0,0 +1,27 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_col` AS `bfcol_0`, + `int64_col` AS `bfcol_1`, + `duration_col` AS `bfcol_2` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + `bfcol_1` AS `bfcol_6`, + `bfcol_0` AS `bfcol_7`, + `bfcol_2` AS `bfcol_8` + FROM `bfcte_0` +), `bfcte_2` AS ( + SELECT + AVG(`bfcol_6`) AS `bfcol_12`, + AVG(CAST(`bfcol_7` AS INT64)) AS `bfcol_13`, + CAST(FLOOR(AVG(`bfcol_8`)) AS INT64) AS `bfcol_14`, + CAST(FLOOR(AVG(`bfcol_6`)) AS INT64) AS `bfcol_15` + FROM `bfcte_1` +) +SELECT + `bfcol_12` AS `int64_col`, + `bfcol_13` AS `bool_col`, + `bfcol_14` AS `duration_col`, + `bfcol_15` AS `int64_col_w_floor` +FROM `bfcte_2` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py index 4f0016a6e7..a5ffda0e65 100644 --- a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py +++ b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py @@ -56,6 +56,33 @@ def test_max(scalar_types_df: bpd.DataFrame, snapshot): snapshot.assert_match(sql, "out.sql") +def test_mean(scalar_types_df: bpd.DataFrame, snapshot): + col_names = ["int64_col", "bool_col", "duration_col"] + bf_df = scalar_types_df[col_names] + bf_df["duration_col"] = bpd.to_timedelta(bf_df["duration_col"], unit="us") + + # The `to_timedelta` creates a new mapping for the column id. + col_names.insert(0, "rowindex") + name2id = { + col_name: col_id + for col_name, col_id in zip(col_names, bf_df._block.expr.column_ids) + } + + agg_ops_map = { + "int64_col": agg_ops.MeanOp().as_expr(name2id["int64_col"]), + "bool_col": agg_ops.MeanOp().as_expr(name2id["bool_col"]), + "duration_col": agg_ops.MeanOp().as_expr(name2id["duration_col"]), + "int64_col_w_floor": agg_ops.MeanOp(should_floor_result=True).as_expr( + name2id["int64_col"] + ), + } + sql = _apply_unary_agg_ops( + bf_df, list(agg_ops_map.values()), list(agg_ops_map.keys()) + ) + + snapshot.assert_match(sql, "out.sql") + + def test_median(scalar_types_df: bpd.DataFrame, snapshot): bf_df = scalar_types_df ops_map = {