Skip to content

Commit ac5d869

Browse files
authored
Merge branch 'main' into migrate-manhatton-dist
2 parents 8fa638f + 956a5b0 commit ac5d869

File tree

17 files changed

+187
-18
lines changed

17 files changed

+187
-18
lines changed

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def compile_readtable(node: nodes.ReadTableNode, child: ir.SQLGlotIR):
172172
col_names=[col.source_id for col in node.scan_list.items],
173173
alias_names=[col.id.sql for col in node.scan_list.items],
174174
uid_gen=child.uid_gen,
175+
system_time=node.source.at_time,
175176
)
176177

177178

bigframes/core/compile/sqlglot/expressions/generic_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ def _(
134134
)
135135

136136

137+
@register_binary_op(ops.fillna_op)
138+
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
139+
return sge.Coalesce(this=left.expr, expressions=[right.expr])
140+
141+
137142
@register_nary_op(ops.case_when_op)
138143
def _(*cases_and_outputs: TypedExpr) -> sge.Expression:
139144
# Need to upcast BOOL to INT if any output is numeric

bigframes/core/compile/sqlglot/expressions/numeric_ops.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,18 @@ def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
305305
return result
306306

307307

308+
@register_binary_op(ops.euclidean_distance_op)
309+
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
310+
return sge.Anonymous(
311+
this="ML.DISTANCE",
312+
expressions=[
313+
left.expr,
314+
right.expr,
315+
sge.Literal.string("EUCLIDEAN"),
316+
],
317+
)
318+
319+
308320
@register_binary_op(ops.floordiv_op)
309321
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
310322
left_expr = _coerce_bool_to_int(left)

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import functools
1920
import typing
2021

@@ -118,6 +119,7 @@ def from_table(
118119
col_names: typing.Sequence[str],
119120
alias_names: typing.Sequence[str],
120121
uid_gen: guid.SequentialUIDGenerator,
122+
system_time: typing.Optional[datetime.datetime] = None,
121123
) -> SQLGlotIR:
122124
"""Builds a SQLGlotIR expression from a BigQuery table.
123125
@@ -128,6 +130,7 @@ def from_table(
128130
col_names (typing.Sequence[str]): The names of the columns to select.
129131
alias_names (typing.Sequence[str]): The aliases for the selected columns.
130132
uid_gen (guid.SequentialUIDGenerator): A generator for unique identifiers.
133+
system_time (typing.Optional[str]): An optional system time for time-travel queries.
131134
"""
132135
selections = [
133136
sge.Alias(
@@ -138,10 +141,20 @@ def from_table(
138141
else sge.to_identifier(col_name, quoted=cls.quoted)
139142
for col_name, alias_name in zip(col_names, alias_names)
140143
]
144+
version = (
145+
sge.Version(
146+
this="TIMESTAMP",
147+
expression=sge.Literal(this=system_time.isoformat(), is_string=True),
148+
kind="AS OF",
149+
)
150+
if system_time
151+
else None
152+
)
141153
table_expr = sge.Table(
142154
this=sg.to_identifier(table_id, quoted=cls.quoted),
143155
db=sg.to_identifier(dataset_id, quoted=cls.quoted),
144156
catalog=sg.to_identifier(project_id, quoted=cls.quoted),
157+
version=version,
145158
)
146159
select_expr = sge.Select().select(*selections).from_(table_expr)
147160
return cls(expr=select_expr, uid_gen=uid_gen)

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ def _agg_func(self, func) -> df.DataFrame:
593593
def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
594594
aggregations: typing.List[agg_expressions.Aggregation] = []
595595
column_labels = []
596+
function_labels = []
596597

597598
want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())
598599

@@ -602,8 +603,10 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
602603
funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id]
603604
)
604605
for f in func_list:
605-
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0]))
606+
f_op, f_label = agg_ops.lookup_agg_func(f)
607+
aggregations.append(aggs.agg(col_id, f_op))
606608
column_labels.append(label)
609+
function_labels.append(f_label)
607610
agg_block, _ = self._block.aggregate(
608611
by_column_ids=self._by_col_ids,
609612
aggregations=aggregations,
@@ -613,10 +616,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
613616
agg_block = agg_block.with_column_labels(
614617
utils.combine_indices(
615618
pd.Index(column_labels),
616-
pd.Index(
617-
typing.cast(agg_ops.AggregateOp, agg.op).name
618-
for agg in aggregations
619-
),
619+
pd.Index(function_labels),
620620
)
621621
)
622622
else:

bigframes/operations/aggregations.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -717,9 +717,15 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
717717
np.all: all_op,
718718
np.any: any_op,
719719
np.unique: nunique_op,
720-
# TODO(b/443252872): Solve
721-
# list: ArrayAggOp(),
722720
np.size: size_op,
721+
# TODO(b/443252872): Solve
722+
list: ArrayAggOp(),
723+
len: size_op,
724+
sum: sum_op,
725+
min: min_op,
726+
max: max_op,
727+
any: any_op,
728+
all: all_op,
723729
}
724730

725731

tests/system/small/engines/test_generic_ops.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ def test_engines_coalesce_op(scalars_array_value: array_value.ArrayValue, engine
343343
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)
344344

345345

346-
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
346+
@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True)
347347
def test_engines_fillna_op(scalars_array_value: array_value.ArrayValue, engine):
348348
arr, _ = scalars_array_value.compute_values(
349349
[

tests/system/small/test_dataframe.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6151,6 +6151,28 @@ def test_agg_with_dict_strs(scalars_dfs):
61516151
)
61526152

61536153

6154+
def test_df_agg_with_builtins(scalars_dfs):
6155+
bf_df, pd_df = scalars_dfs
6156+
6157+
bf_result = (
6158+
bf_df[["int64_col", "bool_col"]]
6159+
.dropna()
6160+
.groupby(bf_df.int64_too % 2)
6161+
.agg({"int64_col": [len, sum, min, max, list], "bool_col": [all, any, max]})
6162+
.to_pandas()
6163+
)
6164+
pd_result = (
6165+
pd_df[["int64_col", "bool_col"]]
6166+
.dropna()
6167+
.groupby(pd_df.int64_too % 2)
6168+
.agg({"int64_col": [len, sum, min, max, list], "bool_col": [all, any, max]})
6169+
)
6170+
6171+
pd.testing.assert_frame_equal(
6172+
bf_result, pd_result, check_dtype=False, check_index_type=False
6173+
)
6174+
6175+
61546176
def test_agg_with_dict_containing_non_existing_col_raise_key_error(scalars_dfs):
61556177
bf_df, _ = scalars_dfs
61566178
agg_funcs = {

tests/system/small/test_groupby.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,6 @@ def test_dataframe_groupby_agg_dict_with_list(
282282
)
283283
bf_result_computed = bf_result.to_pandas()
284284

285-
# some inconsistency between versions, so normalize to bigframes behavior
286-
pd_result = pd_result.rename({"amax": "max"}, axis="columns")
287285
pd.testing.assert_frame_equal(
288286
pd_result, bf_result_computed, check_dtype=False, check_index_type=False
289287
)

tests/unit/core/compile/sqlglot/conftest.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ def scalar_types_table_schema() -> typing.Sequence[bigquery.SchemaField]:
9797
def scalar_types_df(compiler_session) -> bpd.DataFrame:
9898
"""Returns a BigFrames DataFrame containing all scalar types and using the `rowindex`
9999
column as the index."""
100-
bf_df = compiler_session.read_gbq_table("bigframes-dev.sqlglot_test.scalar_types")
100+
bf_df = compiler_session._loader.read_gbq_table(
101+
"bigframes-dev.sqlglot_test.scalar_types",
102+
enable_snapshot=False,
103+
)
101104
bf_df = bf_df.set_index("rowindex", drop=False)
102105
return bf_df
103106

@@ -154,8 +157,9 @@ def nested_structs_types_table_schema() -> typing.Sequence[bigquery.SchemaField]
154157
def nested_structs_types_df(compiler_session_w_nested_structs_types) -> bpd.DataFrame:
155158
"""Returns a BigFrames DataFrame containing all scalar types and using the `rowindex`
156159
column as the index."""
157-
bf_df = compiler_session_w_nested_structs_types.read_gbq_table(
158-
"bigframes-dev.sqlglot_test.nested_structs_types"
160+
bf_df = compiler_session_w_nested_structs_types._loader.read_gbq_table(
161+
"bigframes-dev.sqlglot_test.nested_structs_types",
162+
enable_snapshot=False,
159163
)
160164
bf_df = bf_df.set_index("id", drop=False)
161165
return bf_df
@@ -204,8 +208,9 @@ def repeated_types_table_schema() -> typing.Sequence[bigquery.SchemaField]:
204208
def repeated_types_df(compiler_session_w_repeated_types) -> bpd.DataFrame:
205209
"""Returns a BigFrames DataFrame containing all scalar types and using the `rowindex`
206210
column as the index."""
207-
bf_df = compiler_session_w_repeated_types.read_gbq_table(
208-
"bigframes-dev.sqlglot_test.repeated_types"
211+
bf_df = compiler_session_w_repeated_types._loader.read_gbq_table(
212+
"bigframes-dev.sqlglot_test.repeated_types",
213+
enable_snapshot=False,
209214
)
210215
bf_df = bf_df.set_index("rowindex", drop=False)
211216
return bf_df
@@ -237,8 +242,9 @@ def json_types_table_schema() -> typing.Sequence[bigquery.SchemaField]:
237242
def json_types_df(compiler_session_w_json_types) -> bpd.DataFrame:
238243
"""Returns a BigFrames DataFrame containing JSON types and using the `rowindex`
239244
column as the index."""
240-
bf_df = compiler_session_w_json_types.read_gbq_table(
241-
"bigframes-dev.sqlglot_test.json_types"
245+
bf_df = compiler_session_w_json_types._loader.read_gbq_table(
246+
"bigframes-dev.sqlglot_test.json_types",
247+
enable_snapshot=False,
242248
)
243249
# TODO(b/427305807): Why `drop=False` will produce two "rowindex" columns?
244250
bf_df = bf_df.set_index("rowindex", drop=True)

0 commit comments

Comments
 (0)