Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 13 additions & 67 deletions bigframes/core/compile/sqlglot/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
# limitations under the License.
from __future__ import annotations

import functools
import typing

import sqlglot.expressions as sge

from bigframes.core import expression, window_spec
from bigframes.core import expression
from bigframes.core.compile.sqlglot.aggregations import (
binary_compiler,
nullary_compiler,
ordered_unary_compiler,
unary_compiler,
)
from bigframes.core.compile.sqlglot.expressions import typed_expr
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
import bigframes.operations as ops


def compile_aggregate(
Expand All @@ -31,16 +32,18 @@ def compile_aggregate(
) -> sge.Expression:
"""Compiles BigFrames aggregation expression into SQLGlot expression."""
if isinstance(aggregate, expression.NullaryAggregation):
return compile_nullary_agg(aggregate.op)
return nullary_compiler.compile(aggregate.op)
if isinstance(aggregate, expression.UnaryAggregation):
column = typed_expr.TypedExpr(
scalar_compiler.compile_scalar_expression(aggregate.arg),
aggregate.arg.output_type,
)
if not aggregate.op.order_independent:
return compile_ordered_unary_agg(aggregate.op, column, order_by=order_by)
return ordered_unary_compiler.compile(
aggregate.op, column, order_by=order_by
)
else:
return compile_unary_agg(aggregate.op, column)
return unary_compiler.compile(aggregate.op, column)
elif isinstance(aggregate, expression.BinaryAggregation):
left = typed_expr.TypedExpr(
scalar_compiler.compile_scalar_expression(aggregate.left),
Expand All @@ -50,63 +53,6 @@ def compile_aggregate(
scalar_compiler.compile_scalar_expression(aggregate.right),
aggregate.right.output_type,
)
return compile_binary_agg(aggregate.op, left, right)
return binary_compiler.compile(aggregate.op, left, right)
else:
raise ValueError(f"Unexpected aggregation: {aggregate}")


@functools.singledispatch
def compile_nullary_agg(
op: ops.aggregations.WindowOp,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
raise ValueError(f"Can't compile unrecognized operation: {op}")


@functools.singledispatch
def compile_binary_agg(
op: ops.aggregations.WindowOp,
left: typed_expr.TypedExpr,
right: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
raise ValueError(f"Can't compile unrecognized operation: {op}")


@functools.singledispatch
def compile_unary_agg(
op: ops.aggregations.WindowOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
raise ValueError(f"Can't compile unrecognized operation: {op}")


@functools.singledispatch
def compile_ordered_unary_agg(
op: ops.aggregations.WindowOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
order_by: typing.Sequence[sge.Expression] = [],
) -> sge.Expression:
raise ValueError(f"Can't compile unrecognized operation: {op}")


@compile_unary_agg.register
def _(
op: ops.aggregations.SumOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
# Will be null if all inputs are null. Pandas defaults to zero sum though.
expr = _apply_window_if_present(sge.func("SUM", column.expr), window)
return sge.func("IFNULL", expr, ir._literal(0, column.dtype))


def _apply_window_if_present(
value: sge.Expression,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
if window is not None:
raise NotImplementedError("Can't apply window to the expression.")
return value
13 changes: 13 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
35 changes: 35 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/binary_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
from bigframes.operations import aggregations as agg_ops

BINARY_OP_REGISTRATION = reg.OpRegistration()


def compile(
op: agg_ops.WindowOp,
left: typed_expr.TypedExpr,
right: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
return BINARY_OP_REGISTRATION[op](op, left, right, window=window)
41 changes: 41 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
from bigframes.operations import aggregations as agg_ops

NULLARY_OP_REGISTRATION = reg.OpRegistration()


def compile(
op: agg_ops.WindowOp,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
return NULLARY_OP_REGISTRATION[op](op, window=window)


@NULLARY_OP_REGISTRATION.register(agg_ops.SizeOp)
def _(
op: agg_ops.SizeOp,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
return apply_window_if_present(sge.func("COUNT", sge.convert(1)), window)
62 changes: 62 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/op_registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

from sqlglot import expressions as sge

from bigframes.operations import aggregations as agg_ops

# We should've been more specific about input types. Unfortunately,
# MyPy doesn't support more rigorous checks.
CompilationFunc = typing.Callable[..., sge.Expression]


class OpRegistration:
def __init__(self) -> None:
self._registered_ops: dict[str, CompilationFunc] = {}

def register(
self, op: agg_ops.WindowOp | type[agg_ops.WindowOp]
) -> typing.Callable[[CompilationFunc], CompilationFunc]:
def decorator(item: CompilationFunc):
def arg_checker(*args, **kwargs):
if not isinstance(args[0], agg_ops.WindowOp):
raise ValueError(
"The first parameter must be a window operator. "
f"Got {type(args[0])}"
)
return item(*args, **kwargs)

if hasattr(op, "name"):
key = typing.cast(str, op.name)
if key in self._registered_ops:
raise ValueError(f"{key} is already registered")
else:
raise ValueError(f"The operator must have a 'name' attribute. Got {op}")
self._registered_ops[key] = item
return arg_checker

return decorator

def __getitem__(self, op: str | agg_ops.WindowOp) -> CompilationFunc:
if isinstance(op, agg_ops.WindowOp):
if not hasattr(op, "name"):
raise ValueError(f"The operator must have a 'name' attribute. Got {op}")
else:
key = typing.cast(str, op.name)
return self._registered_ops[key]
return self._registered_ops[op]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
from bigframes.operations import aggregations as agg_ops

ORDERED_UNARY_OP_REGISTRATION = reg.OpRegistration()


def compile(
op: agg_ops.WindowOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
order_by: typing.Sequence[sge.Expression] = [],
) -> sge.Expression:
return ORDERED_UNARY_OP_REGISTRATION[op](
op, column, window=window, order_by=order_by
)
56 changes: 56 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/unary_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
from bigframes.operations import aggregations as agg_ops

UNARY_OP_REGISTRATION = reg.OpRegistration()


def compile(
op: agg_ops.WindowOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
return UNARY_OP_REGISTRATION[op](op, column, window=window)


@UNARY_OP_REGISTRATION.register(agg_ops.SumOp)
def _(
op: agg_ops.SumOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
# Will be null if all inputs are null. Pandas defaults to zero sum though.
expr = apply_window_if_present(sge.func("SUM", column.expr), window)
return sge.func("IFNULL", expr, ir._literal(0, column.dtype))


@UNARY_OP_REGISTRATION.register(agg_ops.SizeUnaryOp)
def _(
op: agg_ops.SizeUnaryOp,
_,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
return apply_window_if_present(sge.func("COUNT", sge.convert(1)), window)
29 changes: 29 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import window_spec


def apply_window_if_present(
value: sge.Expression,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
if window is not None:
raise NotImplementedError("Can't apply window to the expression.")
return value
Loading