Skip to content

Commit e273d7a

Browse files
authored
Merge branch 'main' into main_chelsealin_compilejoin
2 parents bce9a69 + 81e4d64 commit e273d7a

File tree

64 files changed

+1947
-141
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1947
-141
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import google.auth.credentials
2323
import requests.adapters
2424

25+
import bigframes._importing
2526
import bigframes.enums
2627
import bigframes.exceptions as bfe
2728

@@ -94,6 +95,7 @@ def __init__(
9495
requests_transport_adapters: Sequence[
9596
Tuple[str, requests.adapters.BaseAdapter]
9697
] = (),
98+
enable_polars_execution: bool = False,
9799
):
98100
self._credentials = credentials
99101
self._project = project
@@ -113,6 +115,9 @@ def __init__(
113115
client_endpoints_override = {}
114116

115117
self._client_endpoints_override = client_endpoints_override
118+
if enable_polars_execution:
119+
bigframes._importing.import_polars()
120+
self._enable_polars_execution = enable_polars_execution
116121

117122
@property
118123
def application_name(self) -> Optional[str]:
@@ -424,3 +429,22 @@ def requests_transport_adapters(
424429
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
425430
)
426431
self._requests_transport_adapters = value
432+
433+
@property
434+
def enable_polars_execution(self) -> bool:
435+
"""If True, will use polars to execute some simple query plans locally."""
436+
return self._enable_polars_execution
437+
438+
@enable_polars_execution.setter
439+
def enable_polars_execution(self, value: bool):
440+
if self._session_started and self._enable_polars_execution != value:
441+
raise ValueError(
442+
SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
443+
)
444+
if value is True:
445+
msg = bfe.format_message(
446+
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
447+
)
448+
warnings.warn(msg, category=bfe.PreviewWarning)
449+
bigframes._importing.import_polars()
450+
self._enable_polars_execution = value

bigframes/_importing.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import importlib
15+
from types import ModuleType
16+
17+
from packaging import version
18+
19+
# Keep this in sync with setup.py
20+
POLARS_MIN_VERSION = version.Version("1.7.0")
21+
22+
23+
def import_polars() -> ModuleType:
24+
polars_module = importlib.import_module("polars")
25+
imported_version = version.Version(polars_module.build_info()["version"])
26+
if imported_version < POLARS_MIN_VERSION:
27+
raise ImportError(
28+
f"Imported polars version: {imported_version} is below the minimum version: {POLARS_MIN_VERSION}"
29+
)
30+
return polars_module

bigframes/core/array_value.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,12 +330,27 @@ def create_constant(
330330

331331
return self.project_to_id(ex.const(value, dtype))
332332

333-
def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
333+
def select_columns(
334+
self, column_ids: typing.Sequence[str], allow_renames: bool = False
335+
) -> ArrayValue:
334336
# This basically just drops and reorders columns - logically a no-op except as a final step
335-
selections = (
336-
bigframes.core.nodes.AliasedRef.identity(ids.ColumnId(col_id))
337-
for col_id in column_ids
338-
)
337+
selections = []
338+
seen = set()
339+
340+
for id in column_ids:
341+
if id not in seen:
342+
ref = nodes.AliasedRef.identity(ids.ColumnId(id))
343+
elif allow_renames:
344+
ref = nodes.AliasedRef(
345+
ex.deref(id), ids.ColumnId(bigframes.core.guid.generate_guid())
346+
)
347+
else:
348+
raise ValueError(
349+
"Must set allow_renames=True to select columns repeatedly"
350+
)
351+
selections.append(ref)
352+
seen.add(id)
353+
339354
return ArrayValue(
340355
nodes.SelectionNode(
341356
child=self.node,

bigframes/core/blocks.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import bigframes.core.identifiers
5151
import bigframes.core.join_def as join_defs
5252
import bigframes.core.ordering as ordering
53+
import bigframes.core.pyarrow_utils as pyarrow_utils
5354
import bigframes.core.schema as bf_schema
5455
import bigframes.core.sql as sql
5556
import bigframes.core.utils as utils
@@ -156,6 +157,36 @@ def __init__(
156157
self._view_ref: Optional[bigquery.TableReference] = None
157158
self._view_ref_dry_run: Optional[bigquery.TableReference] = None
158159

160+
@classmethod
161+
def from_pyarrow(
162+
cls,
163+
data: pa.Table,
164+
session: bigframes.Session,
165+
) -> Block:
166+
column_labels = data.column_names
167+
168+
# TODO(tswast): Use array_value.promote_offsets() instead once that node is
169+
# supported by the local engine.
170+
offsets_col = bigframes.core.guid.generate_guid()
171+
index_ids = [offsets_col]
172+
index_labels = [None]
173+
174+
# TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859):
175+
# Allow users to specify the "total ordering" column(s) or allow multiple
176+
# such columns.
177+
data = pyarrow_utils.append_offsets(data, offsets_col=offsets_col)
178+
179+
# from_pyarrow will normalize the types for us.
180+
managed_data = local_data.ManagedArrowTable.from_pyarrow(data)
181+
array_value = core.ArrayValue.from_managed(managed_data, session=session)
182+
block = cls(
183+
array_value,
184+
column_labels=column_labels,
185+
index_columns=index_ids,
186+
index_labels=index_labels,
187+
)
188+
return block
189+
159190
@classmethod
160191
def from_local(
161192
cls,
@@ -1210,7 +1241,10 @@ def select_column(self, id: str) -> Block:
12101241
return self.select_columns([id])
12111242

12121243
def select_columns(self, ids: typing.Sequence[str]) -> Block:
1213-
expr = self._expr.select_columns([*self.index_columns, *ids])
1244+
# Allow renames as may end up selecting same columns multiple times
1245+
expr = self._expr.select_columns(
1246+
[*self.index_columns, *ids], allow_renames=True
1247+
)
12141248
col_labels = self._get_labels_for_columns(ids)
12151249
return Block(expr, self.index_columns, col_labels, self.index.names)
12161250

@@ -1996,7 +2030,7 @@ def _generate_resample_label(
19962030
return block.set_index([resample_label_id])
19972031

19982032
def _create_stack_column(self, col_label: typing.Tuple, stack_labels: pd.Index):
1999-
dtype = None
2033+
input_dtypes = []
20002034
input_columns: list[Optional[str]] = []
20012035
for uvalue in utils.index_as_tuples(stack_labels):
20022036
label_to_match = (*col_label, *uvalue)
@@ -2006,15 +2040,18 @@ def _create_stack_column(self, col_label: typing.Tuple, stack_labels: pd.Index):
20062040
matching_ids = self.label_to_col_id.get(label_to_match, [])
20072041
input_id = matching_ids[0] if len(matching_ids) > 0 else None
20082042
if input_id:
2009-
if dtype and dtype != self._column_type(input_id):
2010-
raise NotImplementedError(
2011-
"Cannot stack columns with non-matching dtypes."
2012-
)
2013-
else:
2014-
dtype = self._column_type(input_id)
2043+
input_dtypes.append(self._column_type(input_id))
20152044
input_columns.append(input_id)
20162045
# Input column i is the first one that
2017-
return tuple(input_columns), dtype or pd.Float64Dtype()
2046+
if len(input_dtypes) > 0:
2047+
output_dtype = bigframes.dtypes.lcd_type(*input_dtypes)
2048+
if output_dtype is None:
2049+
raise NotImplementedError(
2050+
"Cannot stack columns with non-matching dtypes."
2051+
)
2052+
else:
2053+
output_dtype = pd.Float64Dtype()
2054+
return tuple(input_columns), output_dtype
20182055

20192056
def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype:
20202057
col_offset = self.value_columns.index(col_id)

bigframes/core/compile/polars/compiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,15 +393,15 @@ class PolarsCompiler:
393393
expr_compiler = PolarsExpressionCompiler()
394394
agg_compiler = PolarsAggregateCompiler()
395395

396-
def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
396+
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
397397
if not polars_installed:
398398
raise ValueError(
399399
"Polars is not installed, cannot compile to polars engine."
400400
)
401401

402402
# TODO: Create standard way to configure BFET -> BFET rewrites
403403
# Polars has incomplete slice support in lazy mode
404-
node = array_value.node
404+
node = plan
405405
node = bigframes.core.rewrite.column_pruning(node)
406406
node = nodes.bottom_up(node, bigframes.core.rewrite.rewrite_slice)
407407
node = bigframes.core.rewrite.pull_out_window_order(node)

bigframes/core/compile/polars/lowering.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,31 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import dataclasses
16+
1517
from bigframes import dtypes
1618
from bigframes.core import bigframe_node, expression
1719
from bigframes.core.rewrite import op_lowering
18-
from bigframes.operations import numeric_ops
20+
from bigframes.operations import comparison_ops, numeric_ops
1921
import bigframes.operations as ops
2022

2123
# TODO: Would be more precise to actually have separate op set for polars ops (where they diverge from the original ops)
2224

2325

26+
@dataclasses.dataclass
27+
class CoerceArgsRule(op_lowering.OpLoweringRule):
28+
op_type: type[ops.BinaryOp]
29+
30+
@property
31+
def op(self) -> type[ops.ScalarOp]:
32+
return self.op_type
33+
34+
def lower(self, expr: expression.OpExpression) -> expression.Expression:
35+
assert isinstance(expr.op, self.op_type)
36+
larg, rarg = _coerce_comparables(expr.children[0], expr.children[1])
37+
return expr.op.as_expr(larg, rarg)
38+
39+
2440
class LowerFloorDivRule(op_lowering.OpLoweringRule):
2541
@property
2642
def op(self) -> type[ops.ScalarOp]:
@@ -40,7 +56,42 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression:
4056
return ops.where_op.as_expr(zero_result, divisor_is_zero, expr)
4157

4258

43-
POLARS_LOWERING_RULES = (LowerFloorDivRule(),)
59+
def _coerce_comparables(expr1: expression.Expression, expr2: expression.Expression):
60+
61+
target_type = dtypes.coerce_to_common(expr1.output_type, expr2.output_type)
62+
if expr1.output_type != target_type:
63+
expr1 = _lower_cast(ops.AsTypeOp(target_type), expr1)
64+
if expr2.output_type != target_type:
65+
expr2 = _lower_cast(ops.AsTypeOp(target_type), expr2)
66+
return expr1, expr2
67+
68+
69+
# TODO: Need to handle bool->string cast to get capitalization correct
70+
def _lower_cast(cast_op: ops.AsTypeOp, arg: expression.Expression):
71+
if arg.output_type == dtypes.BOOL_DTYPE and dtypes.is_numeric(cast_op.to_type):
72+
# bool -> decimal needs two-step cast
73+
new_arg = ops.AsTypeOp(to_type=dtypes.INT_DTYPE).as_expr(arg)
74+
return cast_op.as_expr(new_arg)
75+
return cast_op.as_expr(arg)
76+
77+
78+
LOWER_COMPARISONS = tuple(
79+
CoerceArgsRule(op)
80+
for op in (
81+
comparison_ops.EqOp,
82+
comparison_ops.EqNullsMatchOp,
83+
comparison_ops.NeOp,
84+
comparison_ops.LtOp,
85+
comparison_ops.GtOp,
86+
comparison_ops.LeOp,
87+
comparison_ops.GeOp,
88+
)
89+
)
90+
91+
POLARS_LOWERING_RULES = (
92+
*LOWER_COMPARISONS,
93+
LowerFloorDivRule(),
94+
)
4495

4596

4697
def lower_ops_to_polars(root: bigframe_node.BigFrameNode) -> bigframe_node.BigFrameNode:

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,7 @@ def eq_op(
14981498
x: ibis_types.Value,
14991499
y: ibis_types.Value,
15001500
):
1501+
x, y = _coerce_comparables(x, y)
15011502
return x == y
15021503

15031504

@@ -1507,6 +1508,7 @@ def eq_nulls_match_op(
15071508
y: ibis_types.Value,
15081509
):
15091510
"""Variant of eq_op where nulls match each other. Only use where dtypes are known to be same."""
1511+
x, y = _coerce_comparables(x, y)
15101512
literal = ibis_types.literal("$NULL_SENTINEL$")
15111513
if hasattr(x, "fill_null"):
15121514
left = x.cast(ibis_dtypes.str).fill_null(literal)
@@ -1523,6 +1525,7 @@ def ne_op(
15231525
x: ibis_types.Value,
15241526
y: ibis_types.Value,
15251527
):
1528+
x, y = _coerce_comparables(x, y)
15261529
return x != y
15271530

15281531

@@ -1534,6 +1537,17 @@ def _null_or_value(value: ibis_types.Value, where_value: ibis_types.BooleanValue
15341537
)
15351538

15361539

1540+
def _coerce_comparables(
1541+
x: ibis_types.Value,
1542+
y: ibis_types.Value,
1543+
):
1544+
if x.type().is_boolean() and not y.type().is_boolean():
1545+
x = x.cast(ibis_dtypes.int64)
1546+
elif y.type().is_boolean() and not x.type().is_boolean():
1547+
y = y.cast(ibis_dtypes.int64)
1548+
return x, y
1549+
1550+
15371551
@scalar_op_compiler.register_binary_op(ops.and_op)
15381552
def and_op(
15391553
x: ibis_types.Value,
@@ -1735,6 +1749,7 @@ def lt_op(
17351749
x: ibis_types.Value,
17361750
y: ibis_types.Value,
17371751
):
1752+
x, y = _coerce_comparables(x, y)
17381753
return x < y
17391754

17401755

@@ -1744,6 +1759,7 @@ def le_op(
17441759
x: ibis_types.Value,
17451760
y: ibis_types.Value,
17461761
):
1762+
x, y = _coerce_comparables(x, y)
17471763
return x <= y
17481764

17491765

@@ -1753,6 +1769,7 @@ def gt_op(
17531769
x: ibis_types.Value,
17541770
y: ibis_types.Value,
17551771
):
1772+
x, y = _coerce_comparables(x, y)
17561773
return x > y
17571774

17581775

@@ -1762,6 +1779,7 @@ def ge_op(
17621779
x: ibis_types.Value,
17631780
y: ibis_types.Value,
17641781
):
1782+
x, y = _coerce_comparables(x, y)
17651783
return x >= y
17661784

17671785

bigframes/core/indexes/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ def dtypes(self) -> pandas.Series:
174174
index=typing.cast(typing.Tuple, self._block.index.names),
175175
)
176176

177+
def __setitem__(self, key, value) -> None:
178+
"""Index objects are immutable. Use Index constructor to create
179+
modified Index."""
180+
raise TypeError("Index does not support mutable operations")
181+
177182
@property
178183
def size(self) -> int:
179184
return self.shape[0]

0 commit comments

Comments
 (0)