Skip to content

Commit 1e53fa9

Browse files
add documentation
1 parent b2e4efa commit 1e53fa9

File tree

3 files changed

+59
-8
lines changed

3 files changed

+59
-8
lines changed

bigframes/core/array_value.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,28 @@ def compute_values(self, assignments: Sequence[ex.Expression]):
266266
)
267267

268268
def compute_general_expression(self, assignments: Sequence[ex.Expression]):
269+
"""
270+
Applies arbitrary column expressions to the current execution block.
271+
272+
This method transforms the logical plan by applying a sequence of expressions that
273+
preserve the length of the input columns. It supports both scalar operations
274+
and window functions. Each expression is assigned a unique internal column identifier.
275+
276+
Args:
277+
assignments (Sequence[ex.Expression]): A sequence of expression objects
278+
representing the transformations to apply to the columns.
279+
280+
Returns:
281+
Tuple[ArrayValue, Tuple[str, ...]]: A tuple containing:
282+
- An `ArrayValue` wrapping the new root node of the updated logical plan.
283+
- A tuple of strings representing the unique column IDs generated for
284+
each expression in the assignments.
285+
"""
269286
named_exprs = [
270287
nodes.ColumnDef(expr, ids.ColumnId.unique()) for expr in assignments
271288
]
272289
# TODO: Push this to rewrite later to go from block expression to planning form
273-
new_root = expression_factoring.plan_general_col_exprs(self.node, named_exprs)
290+
new_root = expression_factoring.apply_col_exprs_to_plan(self.node, named_exprs)
274291

275292
target_ids = tuple(named_expr.id for named_expr in named_exprs)
276293
return (ArrayValue(new_root), target_ids)
@@ -282,7 +299,29 @@ def compute_general_reduction(
282299
*,
283300
dropna: bool = False,
284301
):
285-
# Warning: this function does not check if the expression is a valid reduction, and may fail spectacularly on invalid inputs
302+
"""
303+
Applies arbitrary aggregation expressions to the block, optionally grouped by keys.
304+
305+
This method handles reduction operations (e.g., sum, mean, count) that collapse
306+
multiple input rows into a single scalar value per group. If grouping keys are
307+
provided, the operation is performed per group; otherwise, it is a global reduction.
308+
309+
Args:
310+
assignments (Sequence[ex.Expression]): A sequence of aggregation expressions
311+
to be calculated.
312+
by_column_ids (typing.Sequence[str], optional): A sequence of column IDs
313+
to use as grouping keys. Defaults to an empty tuple (global reduction).
314+
dropna (bool, optional): If True, rows containing null values in the
315+
`by_column_ids` columns will be filtered out before the reduction
316+
is applied. Defaults to False.
317+
318+
Returns:
319+
Tuple[ArrayValue, Tuple[str, ...]]: A tuple containing:
320+
- An `ArrayValue` wrapping the new root node representing the
321+
aggregation/group-by result.
322+
- A tuple of strings representing the unique column IDs assigned to the
323+
resulting aggregate columns.
324+
"""
286325
plan = self.node
287326
if dropna:
288327
for col_id in by_column_ids:
@@ -292,7 +331,7 @@ def compute_general_reduction(
292331
nodes.ColumnDef(expr, ids.ColumnId.unique()) for expr in assignments
293332
]
294333
# TODO: Push this to rewrite later to go from block expression to planning form
295-
new_root = expression_factoring.plan_general_aggregation(
334+
new_root = expression_factoring.apply_agg_exprs_to_plan(
296335
plan, named_exprs, grouping_keys=[ex.deref(by) for by in by_column_ids]
297336
)
298337
target_ids = tuple(named_expr.id for named_expr in named_exprs)

bigframes/core/blocks.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,13 +1146,15 @@ def project_exprs(
11461146
index_labels=self._index_labels,
11471147
)
11481148

1149-
# This is a new experimental version of the project_exprs that supports mixing analytic and scalar expressions
11501149
def project_block_exprs(
11511150
self,
11521151
exprs: Sequence[ex.Expression],
11531152
labels: Union[Sequence[Label], pd.Index],
11541153
drop=False,
11551154
) -> Block:
1155+
"""
1156+
Version of the project_exprs that supports mixing analytic and scalar expressions
1157+
"""
11561158
new_array, _ = self.expr.compute_general_expression(exprs)
11571159
if drop:
11581160
new_array = new_array.drop_columns(self.value_columns)
@@ -1167,7 +1169,6 @@ def project_block_exprs(
11671169
index_labels=self._index_labels,
11681170
)
11691171

1170-
# This is a new experimental version of the aggregate that supports mixing analytic and scalar expressions\
11711172
def reduce_general(
11721173
self,
11731174
aggregations: typing.Sequence[ex.Expression] = (),
@@ -1176,6 +1177,9 @@ def reduce_general(
11761177
*,
11771178
dropna: bool = True,
11781179
) -> typing.Tuple[Block, typing.Sequence[str]]:
1180+
"""
1181+
Version of the aggregate that supports mixing analytic and scalar expressions.
1182+
"""
11791183
if column_labels is None:
11801184
column_labels = pd.Index(range(len(aggregations)))
11811185

bigframes/core/expression_factoring.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
_MAX_INLINE_COMPLEXITY = 10
4242

4343

44-
def plan_general_col_exprs(
44+
def apply_col_exprs_to_plan(
4545
plan: nodes.BigFrameNode, col_exprs: Sequence[nodes.ColumnDef]
4646
) -> nodes.BigFrameNode:
4747
# TODO: Jointly fragmentize expressions to more efficiently reuse common sub-expressions
@@ -55,7 +55,7 @@ def plan_general_col_exprs(
5555
return push_into_tree(plan, fragments, target_ids)
5656

5757

58-
def plan_general_aggregation(
58+
def apply_agg_exprs_to_plan(
5959
plan: nodes.BigFrameNode,
6060
agg_defs: Sequence[nodes.ColumnDef],
6161
grouping_keys: Sequence[expression.DerefOp],
@@ -69,7 +69,7 @@ def plan_general_aggregation(
6969
nodes.ColumnDef(windowize(cdef.expression, window_def), cdef.id)
7070
for cdef in all_inputs
7171
]
72-
plan = plan_general_col_exprs(plan, windowized_inputs)
72+
plan = apply_col_exprs_to_plan(plan, windowized_inputs)
7373
all_aggs = list(
7474
itertools.chain(*(factored_agg.agg_exprs for factored_agg in factored_aggs))
7575
)
@@ -113,6 +113,14 @@ def fragmentize_expression(root: nodes.ColumnDef) -> Sequence[nodes.ColumnDef]:
113113

114114
@dataclasses.dataclass(frozen=True, eq=False)
115115
class FactoredAggregation:
116+
"""
117+
A three part recomposition of a general aggregating expression.
118+
119+
1. agg_inputs: This is a set of (*col) -> col transformation that preprocess inputs for the aggregations ops
120+
2. agg_exprs: This is a set of pure aggregations (eg sum, mean, min, max) ops referencing the outputs of (1)
121+
3. root_scalar_expr: This is the final set, takes outputs of (2), applies scalar expression to produce final result.
122+
"""
123+
116124
# pure scalar expression
117125
root_scalar_expr: nodes.ColumnDef
118126
# pure agg expression, only refs cols and consts

0 commit comments

Comments
 (0)