Skip to content

Commit d6ba77f

Browse files
committed
refactor
1 parent fc11a1c commit d6ba77f

File tree

5 files changed

+219
-110
lines changed

5 files changed

+219
-110
lines changed

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from __future__ import annotations
1616

1717
import datetime
18-
import functools
1918
import typing
2019
from typing import Iterable, Literal, Optional, Sequence, Tuple, Union
2120

@@ -30,7 +29,7 @@
3029
from bigframes.core import log_adapter
3130
import bigframes.core.block_transforms as block_ops
3231
import bigframes.core.blocks as blocks
33-
from bigframes.core.groupby import aggs, series_group_by
32+
from bigframes.core.groupby import aggs, group_by, series_group_by
3433
import bigframes.core.ordering as order
3534
import bigframes.core.utils as utils
3635
import bigframes.core.validations as validations
@@ -39,8 +38,6 @@
3938
import bigframes.core.window_spec as window_specs
4039
import bigframes.dataframe as df
4140
import bigframes.dtypes as dtypes
42-
import bigframes.enums
43-
import bigframes.operations as ops
4441
import bigframes.operations.aggregations as agg_ops
4542
import bigframes.series as series
4643

@@ -157,57 +154,14 @@ def head(self, n: int = 5) -> df.DataFrame:
157154
)
158155

159156
def __iter__(self) -> Iterable[Tuple[blocks.Label, df.DataFrame]]:
160-
original_index_columns = self._block._index_columns
161-
original_index_labels = self._block._index_labels
162-
by_col_ids = self._by_col_ids
163-
block = self._block.reset_index(
164-
level=None,
165-
# Keep the original index columns so they can be recovered.
166-
drop=False,
167-
allow_duplicates=True,
168-
replacement=bigframes.enums.DefaultIndexKind.NULL,
169-
).set_index(
170-
by_col_ids,
171-
# Keep by_col_ids in-place so the ordering doesn't change.
172-
drop=False,
173-
append=False,
174-
)
175-
block.cached(
176-
force=True,
177-
# All DataFrames will be filtered by by_col_ids, so
178-
# force block.cached() to cluster by the new index by explicitly
179-
# setting `session_aware=False`. This will ensure that the filters
180-
# are more efficient.
181-
session_aware=False,
182-
)
183-
keys_block, _ = block.aggregate(by_col_ids, dropna=self._dropna)
184-
for chunk in keys_block.to_pandas_batches():
185-
for by_keys in pd.MultiIndex.from_frame(chunk.index.to_frame()):
186-
filtered_df = df.DataFrame(
187-
# To ensure the cache is used, filter first, then reset the
188-
# index before yielding the DataFrame.
189-
block.filter(
190-
functools.reduce(
191-
ops.and_op.as_expr,
192-
(
193-
ops.eq_op.as_expr(by_col, ex.const(by_key))
194-
for by_col, by_key in zip(by_col_ids, by_keys)
195-
),
196-
),
197-
).set_index(
198-
original_index_columns,
199-
# We retained by_col_ids in the set_index call above,
200-
# so it's safe to drop the duplicates now.
201-
drop=True,
202-
append=False,
203-
index_labels=original_index_labels,
204-
)
205-
)
206-
207-
if self._by_key_is_singular:
208-
yield by_keys[0], filtered_df
209-
else:
210-
yield by_keys, filtered_df
157+
for group_keys, filtered_block in group_by.block_groupby_iter(
158+
self._block,
159+
by_col_ids=self._by_col_ids,
160+
by_key_is_singular=self._by_key_is_singular,
161+
dropna=self._dropna,
162+
):
163+
filtered_df = df.DataFrame(filtered_block)
164+
yield group_keys, filtered_df
211165

212166
def size(self) -> typing.Union[df.DataFrame, series.Series]:
213167
agg_block, _ = self._block.aggregate_size(

bigframes/core/groupby/group_by.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import functools
18+
from typing import Sequence
19+
20+
import pandas as pd
21+
22+
from bigframes.core import blocks
23+
from bigframes.core import expression as ex
24+
import bigframes.enums
25+
import bigframes.operations as ops
26+
27+
28+
def block_groupby_iter(
29+
block: blocks.Block,
30+
*,
31+
by_col_ids: Sequence[str],
32+
by_key_is_singular: bool,
33+
dropna: bool,
34+
):
35+
original_index_columns = block._index_columns
36+
original_index_labels = block._index_labels
37+
by_col_ids = by_col_ids
38+
block = block.reset_index(
39+
level=None,
40+
# Keep the original index columns so they can be recovered.
41+
drop=False,
42+
allow_duplicates=True,
43+
replacement=bigframes.enums.DefaultIndexKind.NULL,
44+
).set_index(
45+
by_col_ids,
46+
# Keep by_col_ids in-place so the ordering doesn't change.
47+
drop=False,
48+
append=False,
49+
)
50+
block.cached(
51+
force=True,
52+
# All DataFrames will be filtered by by_col_ids, so
53+
# force block.cached() to cluster by the new index by explicitly
54+
# setting `session_aware=False`. This will ensure that the filters
55+
# are more efficient.
56+
session_aware=False,
57+
)
58+
keys_block, _ = block.aggregate(by_col_ids, dropna=dropna)
59+
for chunk in keys_block.to_pandas_batches():
60+
# Convert to MultiIndex to make sure we get tuples,
61+
# even for singular keys.
62+
by_keys_index = chunk.index
63+
if not isinstance(by_keys_index, pd.MultiIndex):
64+
by_keys_index = pd.MultiIndex.from_frame(by_keys_index.to_frame())
65+
66+
for by_keys in by_keys_index:
67+
filtered_block = (
68+
# To ensure the cache is used, filter first, then reset the
69+
# index before yielding the DataFrame.
70+
block.filter(
71+
functools.reduce(
72+
ops.and_op.as_expr,
73+
(
74+
ops.eq_op.as_expr(by_col, ex.const(by_key))
75+
for by_col, by_key in zip(by_col_ids, by_keys)
76+
),
77+
),
78+
).set_index(
79+
original_index_columns,
80+
# We retained by_col_ids in the set_index call above,
81+
# so it's safe to drop the duplicates now.
82+
drop=True,
83+
append=False,
84+
index_labels=original_index_labels,
85+
)
86+
)
87+
88+
if by_key_is_singular:
89+
yield by_keys[0], filtered_block
90+
else:
91+
yield by_keys, filtered_block

bigframes/core/groupby/series_group_by.py

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from __future__ import annotations
1616

1717
import datetime
18-
import functools
1918
import typing
2019
from typing import Iterable, Literal, Sequence, Tuple, Union
2120

@@ -29,7 +28,7 @@
2928
from bigframes.core import log_adapter
3029
import bigframes.core.block_transforms as block_ops
3130
import bigframes.core.blocks as blocks
32-
from bigframes.core.groupby import aggs
31+
from bigframes.core.groupby import aggs, group_by
3332
import bigframes.core.ordering as order
3433
import bigframes.core.utils as utils
3534
import bigframes.core.validations as validations
@@ -38,8 +37,6 @@
3837
import bigframes.core.window_spec as window_specs
3938
import bigframes.dataframe as df
4039
import bigframes.dtypes
41-
import bigframes.enums
42-
import bigframes.operations as ops
4340
import bigframes.operations.aggregations as agg_ops
4441
import bigframes.series as series
4542

@@ -55,6 +52,8 @@ def __init__(
5552
by_col_ids: typing.Sequence[str],
5653
value_name: blocks.Label = None,
5754
dropna=True,
55+
*,
56+
by_key_is_singular: bool = False,
5857
):
5958
# TODO(tbergeron): Support more group-by expression types
6059
self._block = block
@@ -63,6 +62,10 @@ def __init__(
6362
self._value_name = value_name
6463
self._dropna = dropna # Applies to aggregations but not windowing
6564

65+
self._by_key_is_singular = by_key_is_singular
66+
if by_key_is_singular:
67+
assert len(by_col_ids) == 1, "singular key should be exactly one group key"
68+
6669
@property
6770
def _session(self) -> session.Session:
6871
return self._block.session
@@ -79,56 +82,17 @@ def head(self, n: int = 5) -> series.Series:
7982
)
8083

8184
def __iter__(self) -> Iterable[Tuple[blocks.Label, series.Series]]:
82-
original_index_columns = self._block._index_columns
83-
original_index_labels = self._block._index_labels
84-
by_col_ids = self._by_col_ids
85-
block = self._block.reset_index(
86-
level=None,
87-
# Keep the original index columns so they can be recovered.
88-
drop=False,
89-
allow_duplicates=True,
90-
replacement=bigframes.enums.DefaultIndexKind.NULL,
91-
).set_index(
92-
by_col_ids,
93-
# Keep by_col_ids in-place so the ordering doesn't change.
94-
drop=False,
95-
append=False,
96-
)
97-
block.cached(
98-
force=True,
99-
# All DataFrames will be filtered by by_col_ids, so
100-
# force block.cached() to cluster by the new index by explicitly
101-
# setting `session_aware=False`. This will ensure that the filters
102-
# are more efficient.
103-
session_aware=False,
104-
)
105-
keys_block, _ = block.aggregate(by_col_ids, dropna=self._dropna)
106-
for chunk in keys_block.to_pandas_batches():
107-
for by_keys in chunk.index:
108-
filtered_series = series.Series(
109-
# To ensure the cache is used, filter first, then reset the
110-
# index before yielding the DataFrame.
111-
block.filter(
112-
functools.reduce(
113-
ops.and_op.as_expr,
114-
(
115-
ops.eq_op.as_expr(by_col, ex.const(by_key))
116-
for by_col, by_key in zip(by_col_ids, by_keys)
117-
),
118-
),
119-
)
120-
.set_index(
121-
original_index_columns,
122-
# We retained by_col_ids in the set_index call above,
123-
# so it's safe to drop the duplicates now.
124-
drop=True,
125-
append=False,
126-
index_labels=original_index_labels,
127-
)
128-
.select_column(self._value_column),
129-
)
130-
filtered_series.name = self._value_name
131-
yield by_keys, filtered_series
85+
for group_keys, filtered_block in group_by.block_groupby_iter(
86+
self._block,
87+
by_col_ids=self._by_col_ids,
88+
by_key_is_singular=self._by_key_is_singular,
89+
dropna=self._dropna,
90+
):
91+
filtered_series = series.Series(
92+
filtered_block.select_column(self._value_column)
93+
)
94+
filtered_series.name = self._value_name
95+
yield group_keys, filtered_series
13296

13397
def all(self) -> series.Series:
13498
return self._aggregate(agg_ops.all_op)

bigframes/series.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,12 +1854,18 @@ def _groupby_level(
18541854
level: int | str | typing.Sequence[int] | typing.Sequence[str],
18551855
dropna: bool = True,
18561856
) -> bigframes.core.groupby.SeriesGroupBy:
1857+
if utils.is_list_like(level):
1858+
by_key_is_singular = False
1859+
else:
1860+
by_key_is_singular = True
1861+
18571862
return groupby.SeriesGroupBy(
18581863
self._block,
18591864
self._value_column,
18601865
by_col_ids=self._resolve_levels(level),
18611866
value_name=self.name,
18621867
dropna=dropna,
1868+
by_key_is_singular=by_key_is_singular,
18631869
)
18641870

18651871
def _groupby_values(
@@ -1871,8 +1877,10 @@ def _groupby_values(
18711877
) -> bigframes.core.groupby.SeriesGroupBy:
18721878
if not isinstance(by, Series) and _is_list_like(by):
18731879
by = list(by)
1880+
by_key_is_singular = False
18741881
else:
18751882
by = [typing.cast(typing.Union[blocks.Label, Series], by)]
1883+
by_key_is_singular = True
18761884

18771885
block = self._block
18781886
grouping_cols: typing.Sequence[str] = []
@@ -1904,6 +1912,7 @@ def _groupby_values(
19041912
by_col_ids=grouping_cols,
19051913
value_name=self.name,
19061914
dropna=dropna,
1915+
by_key_is_singular=by_key_is_singular,
19071916
)
19081917

19091918
def apply(

0 commit comments

Comments
 (0)