Skip to content

Commit 5ca4cbc

Browse files
committed
Merge remote-tracking branch 'origin/main' into b401261155-autosummary
2 parents e7ad2b3 + 956a5b0 commit 5ca4cbc

File tree

270 files changed

+1731
-1444
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

270 files changed

+1731
-1444
lines changed

bigframes/core/compile/sqlglot/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from bigframes.core.compile.sqlglot.compiler import SQLGlotCompiler
16+
from bigframes.core.compile.sqlglot.compiler import compile_sql
1717
import bigframes.core.compile.sqlglot.expressions.ai_ops # noqa: F401
1818
import bigframes.core.compile.sqlglot.expressions.array_ops # noqa: F401
1919
import bigframes.core.compile.sqlglot.expressions.blob_ops # noqa: F401
@@ -29,4 +29,4 @@
2929
import bigframes.core.compile.sqlglot.expressions.struct_ops # noqa: F401
3030
import bigframes.core.compile.sqlglot.expressions.timedelta_ops # noqa: F401
3131

32-
__all__ = ["SQLGlotCompiler"]
32+
__all__ = ["compile_sql"]

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 311 additions & 335 deletions
Large diffs are not rendered by default.

bigframes/core/compile/sqlglot/expressions/generic_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ def _(
134134
)
135135

136136

137+
@register_binary_op(ops.fillna_op)
138+
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
139+
return sge.Coalesce(this=left.expr, expressions=[right.expr])
140+
141+
137142
@register_nary_op(ops.case_when_op)
138143
def _(*cases_and_outputs: TypedExpr) -> sge.Expression:
139144
# Need to upcast BOOL to INT if any output is numeric

bigframes/core/compile/sqlglot/expressions/numeric_ops.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,18 @@ def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
305305
return result
306306

307307

308+
@register_binary_op(ops.euclidean_distance_op)
309+
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
310+
return sge.Anonymous(
311+
this="ML.DISTANCE",
312+
expressions=[
313+
left.expr,
314+
right.expr,
315+
sge.Literal.string("EUCLIDEAN"),
316+
],
317+
)
318+
319+
308320
@register_binary_op(ops.floordiv_op)
309321
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
310322
left_expr = _coerce_bool_to_int(left)

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import functools
1920
import typing
2021

@@ -118,6 +119,7 @@ def from_table(
118119
col_names: typing.Sequence[str],
119120
alias_names: typing.Sequence[str],
120121
uid_gen: guid.SequentialUIDGenerator,
122+
system_time: typing.Optional[datetime.datetime] = None,
121123
) -> SQLGlotIR:
122124
"""Builds a SQLGlotIR expression from a BigQuery table.
123125
@@ -128,18 +130,31 @@ def from_table(
128130
col_names (typing.Sequence[str]): The names of the columns to select.
129131
alias_names (typing.Sequence[str]): The aliases for the selected columns.
130132
uid_gen (guid.SequentialUIDGenerator): A generator for unique identifiers.
133+
system_time (typing.Optional[str]): An optional system time for time-travel queries.
131134
"""
132135
selections = [
133136
sge.Alias(
134137
this=sge.to_identifier(col_name, quoted=cls.quoted),
135138
alias=sge.to_identifier(alias_name, quoted=cls.quoted),
136139
)
140+
if col_name != alias_name
141+
else sge.to_identifier(col_name, quoted=cls.quoted)
137142
for col_name, alias_name in zip(col_names, alias_names)
138143
]
144+
version = (
145+
sge.Version(
146+
this="TIMESTAMP",
147+
expression=sge.Literal(this=system_time.isoformat(), is_string=True),
148+
kind="AS OF",
149+
)
150+
if system_time
151+
else None
152+
)
139153
table_expr = sge.Table(
140154
this=sg.to_identifier(table_id, quoted=cls.quoted),
141155
db=sg.to_identifier(dataset_id, quoted=cls.quoted),
142156
catalog=sg.to_identifier(project_id, quoted=cls.quoted),
157+
version=version,
143158
)
144159
select_expr = sge.Select().select(*selections).from_(table_expr)
145160
return cls(expr=select_expr, uid_gen=uid_gen)
@@ -227,6 +242,8 @@ def select(
227242
this=expr,
228243
alias=sge.to_identifier(id, quoted=self.quoted),
229244
)
245+
if expr.alias_or_name != id
246+
else expr
230247
for id, expr in selected_cols
231248
]
232249

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ def _agg_func(self, func) -> df.DataFrame:
593593
def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
594594
aggregations: typing.List[agg_expressions.Aggregation] = []
595595
column_labels = []
596+
function_labels = []
596597

597598
want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())
598599

@@ -602,8 +603,10 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
602603
funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id]
603604
)
604605
for f in func_list:
605-
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0]))
606+
f_op, f_label = agg_ops.lookup_agg_func(f)
607+
aggregations.append(aggs.agg(col_id, f_op))
606608
column_labels.append(label)
609+
function_labels.append(f_label)
607610
agg_block, _ = self._block.aggregate(
608611
by_column_ids=self._by_col_ids,
609612
aggregations=aggregations,
@@ -613,10 +616,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
613616
agg_block = agg_block.with_column_labels(
614617
utils.combine_indices(
615618
pd.Index(column_labels),
616-
pd.Index(
617-
typing.cast(agg_ops.AggregateOp, agg.op).name
618-
for agg in aggregations
619-
),
619+
pd.Index(function_labels),
620620
)
621621
)
622622
else:

bigframes/core/rewrite/select_pullup.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
# limitations under the License.
1414

1515
import dataclasses
16+
import functools
1617
from typing import cast
1718

18-
from bigframes.core import expression, nodes
19+
from bigframes.core import expression, identifiers, nodes
1920

2021

2122
def defer_selection(
@@ -26,12 +27,19 @@ def defer_selection(
2627
2728
In many cases, these nodes will be merged or eliminated entirely, simplifying the overall tree.
2829
"""
29-
return nodes.bottom_up(root, pull_up_select)
30+
return nodes.bottom_up(
31+
root, functools.partial(pull_up_select, prefer_source_names=True)
32+
)
3033

3134

32-
def pull_up_select(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
35+
def pull_up_select(
36+
node: nodes.BigFrameNode, prefer_source_names: bool
37+
) -> nodes.BigFrameNode:
3338
if isinstance(node, nodes.LeafNode):
34-
return node
39+
if prefer_source_names and isinstance(node, nodes.ReadTableNode):
40+
return pull_up_source_ids(node)
41+
else:
42+
return node
3543
if isinstance(node, nodes.JoinNode):
3644
return pull_up_selects_under_join(node)
3745
if isinstance(node, nodes.ConcatNode):
@@ -42,6 +50,32 @@ def pull_up_select(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
4250
return node
4351

4452

53+
def pull_up_source_ids(node: nodes.ReadTableNode) -> nodes.BigFrameNode:
54+
if all(id.sql == source_id for id, source_id in node.scan_list.items):
55+
return node
56+
else:
57+
source_ids = sorted(
58+
set(scan_item.source_id for scan_item in node.scan_list.items)
59+
)
60+
new_scan_list = nodes.ScanList.from_items(
61+
[
62+
nodes.ScanItem(identifiers.ColumnId(source_id), source_id)
63+
for source_id in source_ids
64+
]
65+
)
66+
new_source = dataclasses.replace(node, scan_list=new_scan_list)
67+
new_selection = nodes.SelectionNode(
68+
new_source,
69+
tuple(
70+
nodes.AliasedRef(
71+
expression.DerefOp(identifiers.ColumnId(source_id)), id
72+
)
73+
for id, source_id in node.scan_list.items
74+
),
75+
)
76+
return new_selection
77+
78+
4579
def pull_up_select_unary(node: nodes.UnaryNode) -> nodes.BigFrameNode:
4680
child = node.child
4781
if not isinstance(child, nodes.SelectionNode):

bigframes/operations/aggregations.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -717,9 +717,15 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
717717
np.all: all_op,
718718
np.any: any_op,
719719
np.unique: nunique_op,
720-
# TODO(b/443252872): Solve
721-
# list: ArrayAggOp(),
722720
np.size: size_op,
721+
# TODO(b/443252872): Solve
722+
list: ArrayAggOp(),
723+
len: size_op,
724+
sum: sum_op,
725+
min: min_op,
726+
max: max_op,
727+
any: any_op,
728+
all: all_op,
723729
}
724730

725731

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ def get_index_cols(
402402
| bigframes.enums.DefaultIndexKind,
403403
*,
404404
rename_to_schema: Optional[Dict[str, str]] = None,
405+
default_index_type: bigframes.enums.DefaultIndexKind = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64,
405406
) -> List[str]:
406407
"""
407408
If we can get a total ordering from the table, such as via primary key
@@ -471,7 +472,11 @@ def get_index_cols(
471472
# find index_cols to use. This is to avoid unexpected performance and
472473
# resource utilization because of the default sequential index. See
473474
# internal issue 335727141.
474-
if _is_table_clustered_or_partitioned(table) and not primary_keys:
475+
if (
476+
_is_table_clustered_or_partitioned(table)
477+
and not primary_keys
478+
and default_index_type == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
479+
):
475480
msg = bfe.format_message(
476481
f"Table '{str(table.reference)}' is clustered and/or "
477482
"partitioned, but BigQuery DataFrames was not able to find a "

bigframes/session/direct_gbq_execution.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ def __init__(
4040
):
4141
self.bqclient = bqclient
4242
self._compile_fn = (
43-
compile.compile_sql
44-
if compiler == "ibis"
45-
else sqlglot.SQLGlotCompiler()._compile_sql
43+
compile.compile_sql if compiler == "ibis" else sqlglot.compile_sql
4644
)
4745
self._publisher = publisher
4846

0 commit comments

Comments
 (0)