From e7c4eb4c77d297380189f7626907ebe7df294f9a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 12 Sep 2025 23:59:28 +0000 Subject: [PATCH 1/3] refactor: Define window column expression type --- bigframes/core/agg_expressions.py | 67 ++++- bigframes/core/blocks.py | 2 +- bigframes/core/compile/compiled.py | 235 ++++-------------- .../ibis_compiler/aggregate_compiler.py | 113 ++++++++- .../ibis_compiler/scalar_op_compiler.py | 66 ++++- bigframes/core/compile/ibis_compiler/utils.py | 0 bigframes/core/compile/polars/compiler.py | 5 +- bigframes/core/ordering.py | 11 +- bigframes/core/window/rolling.py | 4 +- bigframes/core/window_spec.py | 23 +- 10 files changed, 320 insertions(+), 206 deletions(-) create mode 100644 bigframes/core/compile/ibis_compiler/utils.py diff --git a/bigframes/core/agg_expressions.py b/bigframes/core/agg_expressions.py index f77525706b..e65718bdc4 100644 --- a/bigframes/core/agg_expressions.py +++ b/bigframes/core/agg_expressions.py @@ -22,7 +22,7 @@ from typing import Callable, Mapping, TypeVar from bigframes import dtypes -from bigframes.core import expression +from bigframes.core import expression, window_spec import bigframes.core.identifiers as ids import bigframes.operations.aggregations as agg_ops @@ -149,3 +149,68 @@ def replace_args( self, larg: expression.Expression, rarg: expression.Expression ) -> BinaryAggregation: return BinaryAggregation(self.op, larg, rarg) + + +@dataclasses.dataclass(frozen=True) +class WindowExpression(expression.Expression): + analytic_expr: Aggregation + window: window_spec.WindowSpec + + @property + def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: + return tuple( + itertools.chain.from_iterable( + map(lambda x: x.column_references, self.inputs) + ) + ) + + @functools.cached_property + def is_resolved(self) -> bool: + return all(input.is_resolved for input in self.inputs) + + @property + def output_type(self) -> dtypes.ExpressionType: + return self.analytic_expr.output_type + + @property + def inputs( + self, + ) -> typing.Tuple[expression.Expression, ...]: + return (self.analytic_expr, *self.window.expressions) + + @property + def free_variables(self) -> typing.Tuple[str, ...]: + return tuple( + itertools.chain.from_iterable(map(lambda x: x.free_variables, self.inputs)) + ) + + @property + def is_const(self) -> bool: + return all(child.is_const for child in self.inputs) + + def transform_children( + self: WindowExpression, + t: Callable[[expression.Expression], expression.Expression], + ) -> WindowExpression: + return WindowExpression( + self.analytic_expr.transform_children(t), + self.window.transform_exprs(t), + ) + + def bind_variables( + self: WindowExpression, + bindings: Mapping[str, expression.Expression], + allow_partial_bindings: bool = False, + ) -> WindowExpression: + return self.transform_children( + lambda x: x.bind_variables(bindings, allow_partial_bindings) + ) + + def bind_refs( + self: WindowExpression, + bindings: Mapping[ids.ColumnId, expression.Expression], + allow_partial_bindings: bool = False, + ) -> WindowExpression: + return self.transform_children( + lambda x: x.bind_refs(bindings, allow_partial_bindings) + ) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index b2d9d10107..695f5e1df3 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1174,7 +1174,7 @@ def apply_analytic( block = self if skip_null_groups: for key in window.grouping_keys: - block = block.filter(ops.notnull_op.as_expr(key.id.name)) + block = block.filter(ops.notnull_op.as_expr(key)) expr, result_id = block._expr.project_window_expr( agg_expr, window, diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index b28880d498..03b8f6cf87 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -21,15 +21,13 @@ import bigframes_vendored.ibis import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery import bigframes_vendored.ibis.common.deferred as ibis_deferred # type: ignore -from bigframes_vendored.ibis.expr import builders as ibis_expr_builders import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -from bigframes_vendored.ibis.expr.operations import window as ibis_expr_window import bigframes_vendored.ibis.expr.operations as ibis_ops import bigframes_vendored.ibis.expr.types as ibis_types from google.cloud import bigquery import pyarrow as pa -from bigframes.core import utils +from bigframes.core import agg_expressions import bigframes.core.agg_expressions as ex_types import bigframes.core.compile.googlesql import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compiler @@ -38,8 +36,9 @@ import bigframes.core.expression as ex from bigframes.core.ordering import OrderingExpression import bigframes.core.sql -from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec +from bigframes.core.window_spec import WindowSpec import bigframes.dtypes +import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops op_compiler = op_compilers.scalar_op_compiler @@ -237,7 +236,9 @@ def aggregate( col_out: agg_compiler.compile_aggregate( aggregate, bindings, - order_by=_convert_row_ordering_to_table_values(table, order_by), + order_by=op_compiler._convert_row_ordering_to_table_values( + table, order_by + ), ) for aggregate, col_out in aggregations } @@ -442,113 +443,63 @@ def project_window_op( if expression.op.order_independent and window_spec.is_unbounded: # notably percentile_cont does not support ordering clause window_spec = window_spec.without_order() - window = self._ibis_window_from_spec(window_spec) - bindings = {col: self._get_ibis_column(col) for col in self.column_ids} - - window_op = agg_compiler.compile_analytic( - expression, - window, - bindings=bindings, - ) - inputs = tuple( - typing.cast(ibis_types.Column, self._compile_expression(ex.DerefOp(column))) - for column in expression.column_references + # TODO: Turn this logic into a true rewriter + result_expr: ex.Expression = agg_expressions.WindowExpression( + expression, window_spec ) - clauses = [] + clauses: list[tuple[ex.Expression, ex.Expression]] = [] if expression.op.skips_nulls and not never_skip_nulls: - for column in inputs: - clauses.append((column.isnull(), ibis_types.null())) - if window_spec.min_periods and len(inputs) > 0: + for input in expression.inputs: + clauses.append((ops.isnull_op.as_expr(input), ex.const(None))) + if window_spec.min_periods and len(expression.inputs) > 0: if not expression.op.nulls_count_for_min_values: + is_observation = ops.notnull_op.as_expr() + # Most operations do not count NULL values towards min_periods - per_col_does_count = (column.notnull() for column in inputs) + per_col_does_count = ( + ops.notnull_op.as_expr(input) for input in expression.inputs + ) # All inputs must be non-null for observation to count is_observation = functools.reduce( - lambda x, y: x & y, per_col_does_count - ).cast(int) - observation_count = agg_compiler.compile_analytic( - ex_types.UnaryAggregation( - agg_ops.sum_op, ex.deref("_observation_count") - ), - window, - bindings={"_observation_count": is_observation}, + lambda x, y: ops.and_op.as_expr(x, y), per_col_does_count + ) + observation_sentinel = ops.AsTypeOp(bigframes.dtypes.INT_DTYPE).as_expr( + is_observation + ) + observation_count_expr = agg_expressions.WindowExpression( + ex_types.UnaryAggregation(agg_ops.sum_op, observation_sentinel), + window_spec, ) else: # Operations like count treat even NULLs as valid observations for the sake of min_periods # notnull is just used to convert null values to non-null (FALSE) values to be counted - is_observation = inputs[0].notnull() - observation_count = agg_compiler.compile_analytic( - ex_types.UnaryAggregation( - agg_ops.count_op, ex.deref("_observation_count") - ), - window, - bindings={"_observation_count": is_observation}, + observation_count_expr = agg_expressions.WindowExpression( + agg_ops.size_op.as_expr(), + window_spec, ) clauses.append( ( - observation_count < ibis_types.literal(window_spec.min_periods), - ibis_types.null(), + ops.lt_op.as_expr( + observation_count_expr, ex.const(window_spec.min_periods) + ), + ex.const(None), ) ) if clauses: - case_statement = bigframes_vendored.ibis.case() - for clause in clauses: - case_statement = case_statement.when(clause[0], clause[1]) - case_statement = case_statement.else_(window_op).end() # type: ignore - window_op = case_statement # type: ignore - - return UnorderedIR(self._table, (*self.columns, window_op.name(output_name))) - - def _compile_expression(self, expr: ex.Expression): - return op_compiler.compile_expression(expr, self._ibis_bindings) - - def _ibis_window_from_spec(self, window_spec: WindowSpec): - group_by: typing.List[ibis_types.Value] = ( - [ - typing.cast( - ibis_types.Column, _as_groupable(self._compile_expression(column)) - ) - for column in window_spec.grouping_keys + case_inputs = [ + *itertools.chain.from_iterable(clauses), + ex.const(True), + result_expr, ] - if window_spec.grouping_keys - else [] - ) + result_expr = ops.CaseWhenOp().as_expr(*case_inputs) - # Construct ordering. There are basically 3 main cases - # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed - # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed - # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. - if window_spec.is_row_bounded: - if not window_spec.ordering: - # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. - raise ValueError("No ordering provided for ordered analytic function") - order_by = _convert_row_ordering_to_table_values( - self._column_names, - window_spec.ordering, - ) + ibis_expr = op_compiler.compile_expression(result_expr, self._ibis_bindings) - elif window_spec.is_range_bounded: - order_by = [ - _convert_range_ordering_to_table_value( - self._column_names, - window_spec.ordering[0], - ) - ] - # The rest if branches are for unbounded windows - elif window_spec.ordering: - # Unbound grouping window. Suitable for aggregations but not for analytic function application. - order_by = _convert_row_ordering_to_table_values( - self._column_names, - window_spec.ordering, - ) - else: - order_by = None + return UnorderedIR(self._table, (*self.columns, ibis_expr.name(output_name))) - window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) - if window_spec.bounds is not None: - return _add_boundary(window_spec.bounds, window) - return window + def _compile_expression(self, expr: ex.Expression): + return op_compiler.compile_expression(expr, self._ibis_bindings) def is_literal(column: ibis_types.Value) -> bool: @@ -567,58 +518,6 @@ def is_window(column: ibis_types.Value) -> bool: return any(isinstance(op, ibis_ops.WindowFunction) for op in matches) -def _convert_row_ordering_to_table_values( - value_lookup: typing.Mapping[str, ibis_types.Value], - ordering_columns: typing.Sequence[OrderingExpression], -) -> typing.Sequence[ibis_types.Value]: - column_refs = ordering_columns - ordering_values = [] - for ordering_col in column_refs: - expr = op_compiler.compile_expression( - ordering_col.scalar_expression, value_lookup - ) - ordering_value = ( - bigframes_vendored.ibis.asc(expr) # type: ignore - if ordering_col.direction.is_ascending - else bigframes_vendored.ibis.desc(expr) # type: ignore - ) - # Bigquery SQL considers NULLS to be "smallest" values, but we need to override in these cases. - if (not ordering_col.na_last) and (not ordering_col.direction.is_ascending): - # Force nulls to be first - is_null_val = typing.cast(ibis_types.Column, expr.isnull()) - ordering_values.append(bigframes_vendored.ibis.desc(is_null_val)) - elif (ordering_col.na_last) and (ordering_col.direction.is_ascending): - # Force nulls to be last - is_null_val = typing.cast(ibis_types.Column, expr.isnull()) - ordering_values.append(bigframes_vendored.ibis.asc(is_null_val)) - ordering_values.append(ordering_value) - return ordering_values - - -def _convert_range_ordering_to_table_value( - value_lookup: typing.Mapping[str, ibis_types.Value], - ordering_column: OrderingExpression, -) -> ibis_types.Value: - """Converts the ordering for range windows to Ibis references. - - Note that this method is different from `_convert_row_ordering_to_table_values` in - that it does not arrange null values. There are two reasons: - 1. Manipulating null positions requires more than one ordering key, which is forbidden - by SQL window syntax for range rolling. - 2. Pandas does not allow range rolling on timeseries with nulls. - - Therefore, we opt for the simplest approach here: generate the simplest SQL and follow - the BigQuery engine behavior. - """ - expr = op_compiler.compile_expression( - ordering_column.scalar_expression, value_lookup - ) - - if ordering_column.direction.is_ascending: - return bigframes_vendored.ibis.asc(expr) # type: ignore - return bigframes_vendored.ibis.desc(expr) # type: ignore - - def _string_cast_join_cond( lvalue: ibis_types.Column, rvalue: ibis_types.Column ) -> ibis_types.BooleanColumn: @@ -678,53 +577,3 @@ def _join_condition( else: return _string_cast_join_cond(lvalue, rvalue) return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue) - - -def _as_groupable(value: ibis_types.Value): - from bigframes.core.compile.ibis_compiler import scalar_op_registry - - # Some types need to be converted to another type to enable groupby - if value.type().is_float64(): - return value.cast(ibis_dtypes.str) - elif value.type().is_geospatial(): - return typing.cast(ibis_types.GeoSpatialColumn, value).as_binary() - elif value.type().is_json(): - return scalar_op_registry.to_json_string(value) - else: - return value - - -def _to_ibis_boundary( - boundary: Optional[int], -) -> Optional[ibis_expr_window.WindowBoundary]: - if boundary is None: - return None - return ibis_expr_window.WindowBoundary( - abs(boundary), preceding=boundary <= 0 # type:ignore - ) - - -def _add_boundary( - bounds: typing.Union[RowsWindowBounds, RangeWindowBounds], - ibis_window: ibis_expr_builders.LegacyWindowBuilder, -) -> ibis_expr_builders.LegacyWindowBuilder: - if isinstance(bounds, RangeWindowBounds): - return ibis_window.range( - start=_to_ibis_boundary( - None - if bounds.start is None - else utils.timedelta_to_micros(bounds.start) - ), - end=_to_ibis_boundary( - None if bounds.end is None else utils.timedelta_to_micros(bounds.end) - ), - ) - if isinstance(bounds, RowsWindowBounds): - if bounds.start is not None or bounds.end is not None: - return ibis_window.rows( - start=_to_ibis_boundary(bounds.start), - end=_to_ibis_boundary(bounds.end), - ) - return ibis_window - else: - raise ValueError(f"unrecognized window bounds {bounds}") diff --git a/bigframes/core/compile/ibis_compiler/aggregate_compiler.py b/bigframes/core/compile/ibis_compiler/aggregate_compiler.py index 1907078690..b101f4e09f 100644 --- a/bigframes/core/compile/ibis_compiler/aggregate_compiler.py +++ b/bigframes/core/compile/ibis_compiler/aggregate_compiler.py @@ -19,8 +19,11 @@ from typing import cast, List, Optional import bigframes_vendored.constants as constants +import bigframes_vendored.ibis +from bigframes_vendored.ibis.expr import builders as ibis_expr_builders import bigframes_vendored.ibis.expr.api as ibis_api import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes +from bigframes_vendored.ibis.expr.operations import window as ibis_expr_window import bigframes_vendored.ibis.expr.operations as ibis_ops import bigframes_vendored.ibis.expr.operations.udf as ibis_udf import bigframes_vendored.ibis.expr.types as ibis_types @@ -30,6 +33,8 @@ from bigframes.core.compile import constants as compiler_constants import bigframes.core.compile.ibis_compiler.scalar_op_compiler as scalar_compilers import bigframes.core.compile.ibis_types as compile_ibis_types +import bigframes.core.utils +from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec import bigframes.core.window_spec as window_spec import bigframes.operations.aggregations as agg_ops @@ -73,11 +78,12 @@ def compile_analytic( window: window_spec.WindowSpec, bindings: typing.Dict[str, ibis_types.Value], ) -> ibis_types.Value: + ibis_window = _ibis_window_from_spec(window, bindings=bindings) if isinstance(aggregate, agg_expressions.NullaryAggregation): - return compile_nullary_agg(aggregate.op, window) + return compile_nullary_agg(aggregate.op, ibis_window) elif isinstance(aggregate, agg_expressions.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg(aggregate.op, input, window) # type: ignore + return compile_unary_agg(aggregate.op, input, ibis_window) # type: ignore elif isinstance(aggregate, agg_expressions.BinaryAggregation): raise NotImplementedError("binary analytic operations not yet supported") else: @@ -729,6 +735,109 @@ def _apply_window_if_present(value: ibis_types.Value, window): return value.over(window) if (window is not None) else value +def _ibis_window_from_spec( + window_spec: WindowSpec, bindings: typing.Dict[str, ibis_types.Value] +): + group_by: typing.List[ibis_types.Value] = ( + [ + typing.cast( + ibis_types.Column, + _as_groupable(scalar_compiler.compile_expression(column, bindings)), + ) + for column in window_spec.grouping_keys + ] + if window_spec.grouping_keys + else [] + ) + + # Construct ordering. There are basically 3 main cases + # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed + # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed + # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. + if window_spec.is_row_bounded: + if not window_spec.ordering: + # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. + raise ValueError("No ordering provided for ordered analytic function") + order_by = scalar_compiler._convert_row_ordering_to_table_values( + bindings, + window_spec.ordering, + ) + + elif window_spec.is_range_bounded: + order_by = [ + scalar_compiler._convert_range_ordering_to_table_value( + bindings, + window_spec.ordering[0], + ) + ] + # The rest if branches are for unbounded windows + elif window_spec.ordering: + # Unbound grouping window. Suitable for aggregations but not for analytic function application. + order_by = scalar_compiler._convert_row_ordering_to_table_values( + bindings, + window_spec.ordering, + ) + else: + order_by = None + + window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) + if window_spec.bounds is not None: + return _add_boundary(window_spec.bounds, window) + return window + + +def _as_groupable(value: ibis_types.Value): + from bigframes.core.compile.ibis_compiler import scalar_op_registry + + # Some types need to be converted to another type to enable groupby + if value.type().is_float64(): + return value.cast(ibis_dtypes.str) + elif value.type().is_geospatial(): + return typing.cast(ibis_types.GeoSpatialColumn, value).as_binary() + elif value.type().is_json(): + return scalar_op_registry.to_json_string(value) + else: + return value + + +def _to_ibis_boundary( + boundary: Optional[int], +) -> Optional[ibis_expr_window.WindowBoundary]: + if boundary is None: + return None + return ibis_expr_window.WindowBoundary( + abs(boundary), preceding=boundary <= 0 # type:ignore + ) + + +def _add_boundary( + bounds: typing.Union[RowsWindowBounds, RangeWindowBounds], + ibis_window: ibis_expr_builders.LegacyWindowBuilder, +) -> ibis_expr_builders.LegacyWindowBuilder: + if isinstance(bounds, RangeWindowBounds): + return ibis_window.range( + start=_to_ibis_boundary( + None + if bounds.start is None + else bigframes.core.utils.timedelta_to_micros(bounds.start) + ), + end=_to_ibis_boundary( + None + if bounds.end is None + else bigframes.core.utils.timedelta_to_micros(bounds.end) + ), + ) + if isinstance(bounds, RowsWindowBounds): + if bounds.start is not None or bounds.end is not None: + return ibis_window.rows( + start=_to_ibis_boundary(bounds.start), + end=_to_ibis_boundary(bounds.end), + ) + return ibis_window + else: + raise ValueError(f"unrecognized window bounds {bounds}") + + def _map_to_literal( original: ibis_types.Value, literal: ibis_types.Scalar ) -> ibis_types.Column: diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py b/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py index d5f3e15d34..1197f6b9da 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py @@ -20,8 +20,10 @@ import typing from typing import TYPE_CHECKING +import bigframes_vendored.ibis import bigframes_vendored.ibis.expr.types as ibis_types +from bigframes.core import agg_expressions, ordering import bigframes.core.compile.ibis_types import bigframes.core.expression as ex @@ -29,7 +31,7 @@ import bigframes.operations as ops -class ScalarOpCompiler: +class ExpressionCompiler: # Mapping of operation name to implemenations _registry: dict[ str, @@ -67,6 +69,18 @@ def _( else: return bindings[expression.id.sql] + @compile_expression.register + def _( + self, + expression: agg_expressions.WindowExpression, + bindings: typing.Dict[str, ibis_types.Value], + ) -> ibis_types.Value: + import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compile + + return agg_compile.compile_analytic( + expression.analytic_expr, expression.window, bindings + ) + @compile_expression.register def _( self, @@ -202,6 +216,54 @@ def _register( raise ValueError(f"Operation name {op_name} already registered") self._registry[op_name] = impl + def _convert_row_ordering_to_table_values( + self, + value_lookup: typing.Mapping[str, ibis_types.Value], + ordering_columns: typing.Sequence[ordering.OrderingExpression], + ) -> typing.Sequence[ibis_types.Value]: + column_refs = ordering_columns + ordering_values = [] + for ordering_col in column_refs: + expr = self.compile_expression(ordering_col.scalar_expression, value_lookup) + ordering_value = ( + bigframes_vendored.ibis.asc(expr) # type: ignore + if ordering_col.direction.is_ascending + else bigframes_vendored.ibis.desc(expr) # type: ignore + ) + # Bigquery SQL considers NULLS to be "smallest" values, but we need to override in these cases. + if (not ordering_col.na_last) and (not ordering_col.direction.is_ascending): + # Force nulls to be first + is_null_val = typing.cast(ibis_types.Column, expr.isnull()) + ordering_values.append(bigframes_vendored.ibis.desc(is_null_val)) + elif (ordering_col.na_last) and (ordering_col.direction.is_ascending): + # Force nulls to be last + is_null_val = typing.cast(ibis_types.Column, expr.isnull()) + ordering_values.append(bigframes_vendored.ibis.asc(is_null_val)) + ordering_values.append(ordering_value) + return ordering_values + + def _convert_range_ordering_to_table_value( + self, + value_lookup: typing.Mapping[str, ibis_types.Value], + ordering_column: ordering.OrderingExpression, + ) -> ibis_types.Value: + """Converts the ordering for range windows to Ibis references. + + Note that this method is different from `_convert_row_ordering_to_table_values` in + that it does not arrange null values. There are two reasons: + 1. Manipulating null positions requires more than one ordering key, which is forbidden + by SQL window syntax for range rolling. + 2. Pandas does not allow range rolling on timeseries with nulls. + + Therefore, we opt for the simplest approach here: generate the simplest SQL and follow + the BigQuery engine behavior. + """ + expr = self.compile_expression(ordering_column.scalar_expression, value_lookup) + + if ordering_column.direction.is_ascending: + return bigframes_vendored.ibis.asc(expr) # type: ignore + return bigframes_vendored.ibis.desc(expr) # type: ignore + # Singleton compiler -scalar_op_compiler = ScalarOpCompiler() +scalar_op_compiler = ExpressionCompiler() diff --git a/bigframes/core/compile/ibis_compiler/utils.py b/bigframes/core/compile/ibis_compiler/utils.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index df84f08852..f7c742e852 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -828,7 +828,10 @@ def compile_window(self, node: nodes.WindowOpNode): # polars will automatically broadcast the aggregate to the matching input rows agg_pl = self.agg_compiler.compile_agg_expr(node.expression) if window.grouping_keys: - agg_pl = agg_pl.over(id.id.sql for id in window.grouping_keys) + agg_pl = agg_pl.over( + self.expr_compiler.compile_expression(key) + for key in window.grouping_keys + ) result = df.with_columns(agg_pl.alias(node.output_name.sql)) else: # row-bounded window window_result = self._calc_row_analytic_func( diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index 2fc7573b21..50b3cee8aa 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -17,7 +17,7 @@ from dataclasses import dataclass, field from enum import Enum import typing -from typing import Mapping, Optional, Sequence, Set, Union +from typing import Callable, Mapping, Optional, Sequence, Set, Union import bigframes.core.expression as expression import bigframes.core.identifiers as ids @@ -82,6 +82,15 @@ def with_reverse(self) -> OrderingExpression: self.scalar_expression, self.direction.reverse(), not self.na_last ) + def transform_exprs( + self, t: Callable[[expression.Expression], expression.Expression] + ) -> OrderingExpression: + return OrderingExpression( + t(self.scalar_expression), + self.direction, + self.na_last, + ) + # Encoding classes specify additional properties for some ordering representations @dataclass(frozen=True) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index a9c6dfdfa7..1f3466874f 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -108,8 +108,10 @@ def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block: if self._window_spec.grouping_keys: original_index_ids = block.index_columns block = block.reset_index(drop=False) + # grouping keys will always be direct column references, but we should probably + # refactor this class to enforce this statically index_ids = ( - *[col.id.name for col in self._window_spec.grouping_keys], + *[col.id.name for col in self._window_spec.grouping_keys], # type: ignore *original_index_ids, ) block = block.set_index(col_ids=index_ids) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index bef5fbea7c..9e4ee17103 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, replace import datetime import itertools -from typing import Literal, Mapping, Optional, Sequence, Set, Tuple, Union +from typing import Callable, Literal, Mapping, Optional, Sequence, Set, Tuple, Union import numpy as np import pandas as pd @@ -215,13 +215,13 @@ class WindowSpec: Specifies a window over which aggregate and analytic function may be applied. Attributes: - grouping_keys: A set of column ids to group on + grouping_keys: A set of columns to group on bounds: The window boundaries ordering: A list of columns ids and ordering direction to override base ordering min_periods: The minimum number of observations in window required to have a value """ - grouping_keys: Tuple[ex.DerefOp, ...] = tuple() + grouping_keys: Tuple[ex.Expression, ...] = tuple() ordering: Tuple[orderings.OrderingExpression, ...] = tuple() bounds: Union[RowsWindowBounds, RangeWindowBounds, None] = None min_periods: int = 0 @@ -273,7 +273,10 @@ def all_referenced_columns(self) -> Set[ids.ColumnId]: ordering_vars = itertools.chain.from_iterable( item.scalar_expression.column_references for item in self.ordering ) - return set(itertools.chain((i.id for i in self.grouping_keys), ordering_vars)) + grouping_vars = itertools.chain.from_iterable( + item.column_references for item in self.grouping_keys + ) + return set(itertools.chain(grouping_vars, ordering_vars)) def without_order(self, force: bool = False) -> WindowSpec: """Removes ordering clause if ordering isn't required to define bounds.""" @@ -298,3 +301,15 @@ def remap_column_refs( bounds=self.bounds, min_periods=self.min_periods, ) + + def transform_exprs( + self: WindowSpec, t: Callable[[ex.Expression], ex.Expression] + ) -> WindowSpec: + return WindowSpec( + grouping_keys=tuple(t(key) for key in self.grouping_keys), + ordering=tuple( + order_part.transform_exprs(t) for order_part in self.ordering + ), + bounds=self.bounds, + min_periods=self.min_periods, + ) From aa519e6953ce05d3ab48e0775ed1edee62ef3f6f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 13 Sep 2025 01:13:01 +0000 Subject: [PATCH 2/3] hack deferred value fix --- bigframes/core/compile/compiled.py | 3 ++- third_party/bigframes_vendored/ibis/expr/types/generic.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 03b8f6cf87..0afd5480cf 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -474,8 +474,9 @@ def project_window_op( else: # Operations like count treat even NULLs as valid observations for the sake of min_periods # notnull is just used to convert null values to non-null (FALSE) values to be counted + is_observation = ops.notnull_op.as_expr(expression.inputs[0]) observation_count_expr = agg_expressions.WindowExpression( - agg_ops.size_op.as_expr(), + agg_ops.count_op.as_expr(is_observation), window_spec, ) clauses.append( diff --git a/third_party/bigframes_vendored/ibis/expr/types/generic.py b/third_party/bigframes_vendored/ibis/expr/types/generic.py index 7de357b138..68a85c2312 100644 --- a/third_party/bigframes_vendored/ibis/expr/types/generic.py +++ b/third_party/bigframes_vendored/ibis/expr/types/generic.py @@ -773,7 +773,9 @@ def over( @deferrable def bind(table): - winfunc = rewrite_window_input(node, window.bind(table)) + winfunc = rewrite_window_input( + node, window.bind(table) if table else window + ) if winfunc == node: raise com.IbisTypeError( "No reduction or analytic function found to construct a window expression" @@ -782,7 +784,7 @@ def bind(table): try: return bind(table) - except com.IbisInputError: + except com.IbisInputError as e: return bind(_) def isnull(self) -> ir.BooleanValue: From e1e038daff92dfd3a84b46744a19ea858abf9c56 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 15 Sep 2025 17:19:43 +0000 Subject: [PATCH 3/3] check table nullity properly --- bigframes/core/compile/compiled.py | 12 ------------ bigframes/core/compile/ibis_compiler/utils.py | 0 .../bigframes_vendored/ibis/expr/types/generic.py | 4 ++-- 3 files changed, 2 insertions(+), 14 deletions(-) delete mode 100644 bigframes/core/compile/ibis_compiler/utils.py diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 0afd5480cf..91d72d96b2 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -166,18 +166,6 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(ibis_type), ) - def row_count(self, name: str) -> UnorderedIR: - original_table = self._to_ibis_expr() - ibis_table = original_table.agg( - [ - original_table.count().name(name), - ] - ) - return UnorderedIR( - ibis_table, - (ibis_table[name],), - ) - def _to_ibis_expr( self, *, diff --git a/bigframes/core/compile/ibis_compiler/utils.py b/bigframes/core/compile/ibis_compiler/utils.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/third_party/bigframes_vendored/ibis/expr/types/generic.py b/third_party/bigframes_vendored/ibis/expr/types/generic.py index 68a85c2312..596d3134f6 100644 --- a/third_party/bigframes_vendored/ibis/expr/types/generic.py +++ b/third_party/bigframes_vendored/ibis/expr/types/generic.py @@ -774,7 +774,7 @@ def over( @deferrable def bind(table): winfunc = rewrite_window_input( - node, window.bind(table) if table else window + node, window.bind(table) if (table is not None) else window ) if winfunc == node: raise com.IbisTypeError( @@ -784,7 +784,7 @@ def bind(table): try: return bind(table) - except com.IbisInputError as e: + except com.IbisInputError: return bind(_) def isnull(self) -> ir.BooleanValue: