From 3c22589e84f8735a132b68b4628caf4fd2ce1964 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 9 Oct 2025 21:06:52 +0000 Subject: [PATCH 1/2] refactor: add agg_ops.TimeSeriesDiffOp --- .../sqlglot/aggregations/unary_compiler.py | 17 +++++++++++++++++ .../test_time_series_diff/out.sql | 17 +++++++++++++++++ .../sqlglot/aggregations/test_unary_compiler.py | 11 +++++++++++ 3 files changed, 45 insertions(+) create mode 100644 tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_time_series_diff/out.sql diff --git a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py index cfa27909c6..dbd1bec62d 100644 --- a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py @@ -293,3 +293,20 @@ def _( # Will be null if all inputs are null. Pandas defaults to zero sum though. zero = pd.to_timedelta(0) if column.dtype == dtypes.TIMEDELTA_DTYPE else 0 return sge.func("IFNULL", expr, ir._literal(zero, column.dtype)) + + +@UNARY_OP_REGISTRATION.register(agg_ops.TimeSeriesDiffOp) +def _( + op: agg_ops.TimeSeriesDiffOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + if column.dtype != dtypes.TIMESTAMP_DTYPE: + raise TypeError(f"Cannot perform time series diff on type {column.dtype}") + shift_op_impl = UNARY_OP_REGISTRATION[agg_ops.ShiftOp(0)] + shifted = shift_op_impl(agg_ops.ShiftOp(op.periods), column, window) + return sge.TimestampDiff( + this=column.expr, + expression=shifted, + unit=sge.Identifier(this="MICROSECOND"), + ) diff --git a/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_time_series_diff/out.sql b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_time_series_diff/out.sql new file mode 100644 index 0000000000..8ed95b3c07 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_time_series_diff/out.sql @@ -0,0 +1,17 @@ +WITH `bfcte_0` AS ( + SELECT + `timestamp_col` AS `bfcol_0` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + TIMESTAMP_DIFF( + `bfcol_0`, + LAG(`bfcol_0`, 1) OVER (ORDER BY `bfcol_0` IS NULL ASC NULLS LAST, `bfcol_0` ASC NULLS LAST), + MICROSECOND + ) AS `bfcol_1` + FROM `bfcte_0` +) +SELECT + `bfcol_1` AS `diff_time` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py index a83a494e55..e9941920be 100644 --- a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py +++ b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py @@ -331,3 +331,14 @@ def test_sum(scalar_types_df: bpd.DataFrame, snapshot): ) snapshot.assert_match(sql, "out.sql") + + +def test_time_series_diff(scalar_types_df: bpd.DataFrame, snapshot): + col_name = "timestamp_col" + bf_df = scalar_types_df[[col_name]] + window = window_spec.WindowSpec(ordering=(ordering.ascending_over(col_name),)) + op = agg_exprs.UnaryAggregation( + agg_ops.TimeSeriesDiffOp(periods=1), expression.deref(col_name) + ) + sql = _apply_unary_window_op(bf_df, op, window, "diff_time") + snapshot.assert_match(sql, "out.sql") From 625bde6c65295df67c8ccb2526d6b6147eeccfd8 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 9 Oct 2025 21:17:39 +0000 Subject: [PATCH 2/2] refactor: add agg_ops.DateSeriesDiffOp --- .../sqlglot/aggregations/unary_compiler.py | 29 ++++++++++++++++--- .../test_date_series_diff/out.sql | 17 +++++++++++ .../aggregations/test_unary_compiler.py | 11 +++++++ 3 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_date_series_diff/out.sql diff --git a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py index dbd1bec62d..d157f07df2 100644 --- a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py @@ -98,6 +98,27 @@ def _( return apply_window_if_present(sge.func("COUNT", column.expr), window) +@UNARY_OP_REGISTRATION.register(agg_ops.DateSeriesDiffOp) +def _( + op: agg_ops.DateSeriesDiffOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + if column.dtype != dtypes.DATE_DTYPE: + raise TypeError(f"Cannot perform date series diff on type {column.dtype}") + shift_op_impl = UNARY_OP_REGISTRATION[agg_ops.ShiftOp(0)] + shifted = shift_op_impl(agg_ops.ShiftOp(op.periods), column, window) + # Conversion factor from days to microseconds + conversion_factor = 24 * 60 * 60 * 1_000_000 + return sge.Cast( + this=sge.DateDiff( + this=column.expr, expression=shifted, unit=sge.Identifier(this="DAY") + ) + * sge.convert(conversion_factor), + to="INT64", + ) + + @UNARY_OP_REGISTRATION.register(agg_ops.DenseRankOp) def _( op: agg_ops.DenseRankOp, @@ -306,7 +327,7 @@ def _( shift_op_impl = UNARY_OP_REGISTRATION[agg_ops.ShiftOp(0)] shifted = shift_op_impl(agg_ops.ShiftOp(op.periods), column, window) return sge.TimestampDiff( - this=column.expr, - expression=shifted, - unit=sge.Identifier(this="MICROSECOND"), - ) + this=column.expr, + expression=shifted, + unit=sge.Identifier(this="MICROSECOND"), + ) diff --git a/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_date_series_diff/out.sql b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_date_series_diff/out.sql new file mode 100644 index 0000000000..599d8333c9 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_date_series_diff/out.sql @@ -0,0 +1,17 @@ +WITH `bfcte_0` AS ( + SELECT + `date_col` AS `bfcol_0` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + CAST(DATE_DIFF( + `bfcol_0`, + LAG(`bfcol_0`, 1) OVER (ORDER BY `bfcol_0` IS NULL ASC NULLS LAST, `bfcol_0` ASC NULLS LAST), + DAY + ) * 86400000000 AS INT64) AS `bfcol_1` + FROM `bfcte_0` +) +SELECT + `bfcol_1` AS `diff_date` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py index e9941920be..da388ccad1 100644 --- a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py +++ b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py @@ -127,6 +127,17 @@ def test_dense_rank(scalar_types_df: bpd.DataFrame, snapshot): snapshot.assert_match(sql, "out.sql") +def test_date_series_diff(scalar_types_df: bpd.DataFrame, snapshot): + col_name = "date_col" + bf_df = scalar_types_df[[col_name]] + window = window_spec.WindowSpec(ordering=(ordering.ascending_over(col_name),)) + op = agg_exprs.UnaryAggregation( + agg_ops.DateSeriesDiffOp(periods=1), expression.deref(col_name) + ) + sql = _apply_unary_window_op(bf_df, op, window, "diff_date") + snapshot.assert_match(sql, "out.sql") + + def test_diff(scalar_types_df: bpd.DataFrame, snapshot): # Test integer int_col = "int64_col"