Skip to content

Commit 0a4e987

Browse files
committed
use limit rewrite for slice support
1 parent 35aba37 commit 0a4e987

File tree

5 files changed

+38
-44
lines changed

5 files changed

+38
-44
lines changed

bigframes/core/nodes.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ def is_limit(self) -> bool:
154154
and (self.stop > 0)
155155
)
156156

157+
@property
158+
def is_noop(self) -> bool:
159+
"""Returns whether this node doesn't actually change the results."""
160+
# TODO: Handle tail case.
161+
return (
162+
((not self.start) or (self.start == 0))
163+
and (self.step == 1)
164+
and ((self.stop is None) or (self.stop == self.row_count))
165+
)
166+
157167
@property
158168
def row_count(self) -> typing.Optional[int]:
159169
child_length = self.child.row_count

bigframes/core/rewrite/scan_reduction.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import Optional
1717

1818
from bigframes.core import nodes
19+
import bigframes.core.rewrite.slices
1920

2021

2122
def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTableNode]:
@@ -28,7 +29,15 @@ def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTab
2829
return None
2930

3031

31-
def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLocalNode]:
32+
def try_reduce_to_local_scan(
33+
node: nodes.BigFrameNode,
34+
) -> Optional[tuple[nodes.ReadLocalNode, Optional[int]]]:
35+
"""Create a ReadLocalNode with optional limit, if possible.
36+
37+
Similar to ReadApiSemiExecutor._try_adapt_plan.
38+
"""
39+
node, limit = bigframes.core.rewrite.slices.pull_out_limit(node)
40+
3241
if not all(
3342
map(
3443
lambda x: isinstance(x, (nodes.ReadLocalNode, nodes.SelectionNode)),
@@ -38,7 +47,7 @@ def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLoc
3847
return None
3948
result = node.bottom_up(merge_scan)
4049
if isinstance(result, nodes.ReadLocalNode):
41-
return result
50+
return result, limit
4251
return None
4352

4453

bigframes/core/rewrite/slices.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def pull_out_limit(
5757
if (prior_limit is not None) and (prior_limit < limit):
5858
limit = prior_limit
5959
return new_root, limit
60+
if root.is_noop:
61+
new_root, prior_limit = pull_out_limit(root.child)
62+
return new_root, prior_limit
6063
elif (
6164
isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode))
6265
and root.row_preserving

bigframes/session/local_scan_executor.py

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from typing import Optional
1717

18-
from bigframes.core import bigframe_node, nodes, rewrite
18+
from bigframes.core import bigframe_node, rewrite
1919
from bigframes.session import executor, semi_executor
2020

2121

@@ -30,30 +30,15 @@ def execute(
3030
ordered: bool,
3131
peek: Optional[int] = None,
3232
) -> Optional[executor.ExecuteResult]:
33-
rewrite_node = plan
34-
35-
# Implement top-level slice here so that we don't have to implement
36-
# slice in all ReadLocalNode compilers.
37-
slice_start: Optional[int] = None
38-
slice_stop: Optional[int] = None
39-
40-
if isinstance(plan, nodes.SliceNode):
41-
# These slice features are not supported by pyarrow.Table.slice. Must
42-
# have a non-negative start and stop and no custom step size.
43-
if (
44-
(plan.step is not None and plan.step != 1)
45-
or (plan.start is not None and plan.start < 0)
46-
or (plan.stop is not None and plan.stop < 0)
47-
):
48-
return None
33+
reduced_result = rewrite.try_reduce_to_local_scan(plan)
34+
if not reduced_result:
35+
return None
4936

50-
slice_start = plan.start
51-
slice_stop = plan.stop
52-
rewrite_node = plan.child
37+
node, limit = reduced_result
5338

54-
node = rewrite.try_reduce_to_local_scan(rewrite_node)
55-
if not node:
56-
return None
39+
if limit is not None:
40+
if peek is None or limit < peek:
41+
peek = limit
5742

5843
# TODO: Can support some sorting
5944
offsets_col = node.offsets_col.sql if (node.offsets_col is not None) else None
@@ -67,23 +52,7 @@ def execute(
6752

6853
arrow_table = arrow_table.select(needed_cols)
6954
arrow_table = arrow_table.rename_columns([id.sql for id in node.ids])
70-
71-
if slice_start is not None or slice_stop is not None:
72-
slice_length: Optional[int] = None
73-
74-
if slice_stop is not None:
75-
if slice_start is None:
76-
slice_length = slice_stop
77-
else:
78-
slice_length = slice_stop - slice_start
79-
80-
arrow_table = arrow_table.slice(
81-
offset=slice_start if slice_start is not None else 0,
82-
length=slice_length,
83-
)
84-
total_rows = arrow_table.num_rows
85-
else:
86-
total_rows = node.row_count
55+
total_rows = node.row_count
8756

8857
if (peek is not None) and (total_rows is not None):
8958
total_rows = min(peek, total_rows)

tests/unit/session/test_local_scan_executor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,13 @@ def create_read_local_node(arrow_table: pyarrow.Table):
5151
@pytest.mark.parametrize(
5252
("start", "stop", "expected_rows"),
5353
(
54+
# No-op slices.
5455
(None, None, 10),
5556
(0, None, 10),
56-
(4, None, 6),
5757
(None, 10, 10),
58+
# Slices equivalent to limits.
5859
(None, 7, 7),
59-
(1, 9, 8),
60+
(0, 3, 3),
6061
),
6162
)
6263
def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under_test):
@@ -87,6 +88,8 @@ def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under
8788
(None, -1, 1),
8889
(None, None, 2),
8990
(None, None, -1),
91+
(4, None, 6),
92+
(1, 9, 8),
9093
),
9194
)
9295
def test_local_scan_executor_with_slice_unsupported_inputs(

0 commit comments

Comments
 (0)