From e57b252f8be20c1d59f56398c62b28fc33981d4b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Sat, 22 Mar 2025 15:34:17 +0100 Subject: [PATCH 1/3] Use a balanced tree instead of unbalanced one to prevent recursion error in create_match_filter --- pyiceberg/table/upsert_util.py | 40 +++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index d2bd48bc99..a76f5f6b1e 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -16,6 +16,7 @@ # under the License. import functools import operator +from typing import Callable, List, TypeVar import pyarrow as pa from pyarrow import Table as pyarrow_table @@ -28,6 +29,43 @@ In, ) +T = TypeVar("T") + + +def build_balanced_tree(operator_: Callable[[T, T], T], items: List[T]) -> T: + """ + Recursively constructs a balanced binary tree of expressions using the provided binary operator. + + This function is a safer and more scalable alternative to: + reduce(operator_, items) + + Using reduce creates a deeply nested, unbalanced tree (e.g., operator_(a, operator_(b, operator_(c, ...)))), + which grows linearly with the number of items. This can lead to RecursionError exceptions in Python + when the number of expressions is large (e.g., >1000). + + In contrast, this function builds a balanced binary tree with logarithmic depth (O(log n)), + helping avoid recursion issues and ensuring that expression trees remain stable, predictable, + and safe to traverse — especially in tools like PyIceberg that operate on large logical trees. + + Parameters: + operator_ (Callable[[T, T], T]): A binary operator function (e.g., pyiceberg.expressions.Or, And). + items (List[T]): A list of expression objects to combine. + + Returns: + T: An expression object representing the balanced combination of all input expressions. + + Raises: + ValueError: If the input list is empty. + """ + if not items: + raise ValueError("No expressions to combine") + if len(items) == 1: + return items[0] + mid = len(items) // 2 + left = build_balanced_tree(operator_, items[:mid]) + right = build_balanced_tree(operator_, items[mid:]) + return operator_(left, right) + def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) @@ -39,7 +77,7 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre functools.reduce(operator.and_, [EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist() ] - return AlwaysFalse() if len(filters) == 0 else functools.reduce(operator.or_, filters) + return AlwaysFalse() if len(filters) == 0 else build_balanced_tree(operator.or_, filters) def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: From f978d01cc9d07a8e815ce203286ca36ceea5467d Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 24 Mar 2025 16:18:11 +0100 Subject: [PATCH 2/3] Move build_balanced_tree to Or constructor --- pyiceberg/expressions/__init__.py | 43 ++++++++++++++++++++++++- pyiceberg/table/upsert_util.py | 46 ++++----------------------- tests/expressions/test_expressions.py | 2 +- tests/expressions/test_visitors.py | 20 ++++++------ tests/io/test_pyarrow_visitor.py | 2 +- 5 files changed, 61 insertions(+), 52 deletions(-) diff --git a/pyiceberg/expressions/__init__.py b/pyiceberg/expressions/__init__.py index 8b006a28f1..c09b6132b9 100644 --- a/pyiceberg/expressions/__init__.py +++ b/pyiceberg/expressions/__init__.py @@ -21,8 +21,10 @@ from functools import cached_property, reduce from typing import ( Any, + Callable, Generic, Iterable, + Sequence, Set, Tuple, Type, @@ -79,6 +81,45 @@ def __or__(self, other: BooleanExpression) -> BooleanExpression: return Or(self, other) +def _build_balanced_tree( + operator_: Callable[[BooleanExpression, BooleanExpression], BooleanExpression], items: Sequence[BooleanExpression] +) -> BooleanExpression: + """ + Recursively constructs a balanced binary tree of BooleanExpressions using the provided binary operator. + + This function is a safer and more scalable alternative to: + reduce(operator_, items) + + Using `reduce` creates a deeply nested, unbalanced tree (e.g., operator_(a, operator_(b, operator_(c, ...)))), + which grows linearly with the number of items. This can lead to RecursionError exceptions in Python + when the number of expressions is large (e.g., >1000). + + In contrast, this function builds a balanced binary tree with logarithmic depth (O(log n)), + helping avoid recursion issues and ensuring that expression trees remain stable, predictable, + and safe to traverse — especially in tools like PyIceberg that operate on large logical trees. + + Parameters: + operator_ (Callable): A binary operator function (e.g., pyiceberg.expressions.Or, And) that takes two + BooleanExpressions and returns a combined BooleanExpression. + items (Sequence[BooleanExpression]): A sequence of BooleanExpression objects to combine. + + Returns: + BooleanExpression: The balanced combination of all input BooleanExpressions. + + Raises: + ValueError: If the input sequence is empty. + """ + if not items: + raise ValueError("No expressions to combine") + if len(items) == 1: + return items[0] + mid = len(items) // 2 + + left = _build_balanced_tree(operator_, items[:mid]) + right = _build_balanced_tree(operator_, items[mid:]) + return operator_(left, right) + + class Term(Generic[L], ABC): """A simple expression that evaluates to a value.""" @@ -257,7 +298,7 @@ class Or(BooleanExpression): def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore if rest: - return reduce(Or, (left, right, *rest)) + return _build_balanced_tree(Or, (left, right, *rest)) if left is AlwaysTrue() or right is AlwaysTrue(): return AlwaysTrue() elif left is AlwaysFalse(): diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index a76f5f6b1e..e67f6c0232 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -16,7 +16,6 @@ # under the License. import functools import operator -from typing import Callable, List, TypeVar import pyarrow as pa from pyarrow import Table as pyarrow_table @@ -27,45 +26,9 @@ BooleanExpression, EqualTo, In, + Or, ) -T = TypeVar("T") - - -def build_balanced_tree(operator_: Callable[[T, T], T], items: List[T]) -> T: - """ - Recursively constructs a balanced binary tree of expressions using the provided binary operator. - - This function is a safer and more scalable alternative to: - reduce(operator_, items) - - Using reduce creates a deeply nested, unbalanced tree (e.g., operator_(a, operator_(b, operator_(c, ...)))), - which grows linearly with the number of items. This can lead to RecursionError exceptions in Python - when the number of expressions is large (e.g., >1000). - - In contrast, this function builds a balanced binary tree with logarithmic depth (O(log n)), - helping avoid recursion issues and ensuring that expression trees remain stable, predictable, - and safe to traverse — especially in tools like PyIceberg that operate on large logical trees. - - Parameters: - operator_ (Callable[[T, T], T]): A binary operator function (e.g., pyiceberg.expressions.Or, And). - items (List[T]): A list of expression objects to combine. - - Returns: - T: An expression object representing the balanced combination of all input expressions. - - Raises: - ValueError: If the input list is empty. - """ - if not items: - raise ValueError("No expressions to combine") - if len(items) == 1: - return items[0] - mid = len(items) // 2 - left = build_balanced_tree(operator_, items[:mid]) - right = build_balanced_tree(operator_, items[mid:]) - return operator_(left, right) - def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) @@ -77,7 +40,12 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre functools.reduce(operator.and_, [EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist() ] - return AlwaysFalse() if len(filters) == 0 else build_balanced_tree(operator.or_, filters) + if len(filters) == 0: + return AlwaysFalse() + elif len(filters) == 1: + return filters[0] + else: + return Or(*filters) def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 4926b70121..4f2b3ab848 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -578,7 +578,7 @@ def test_negate(lhs: BooleanExpression, rhs: BooleanExpression) -> None: ), ( Or(ExpressionA(), ExpressionB(), ExpressionA()), - Or(Or(ExpressionA(), ExpressionB()), ExpressionA()), + Or(ExpressionA(), Or(ExpressionB(), ExpressionA())), ), (Not(Not(ExpressionA())), ExpressionA()), ], diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 94bfcf076c..a52b10c4e7 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -230,9 +230,9 @@ def test_boolean_expression_visitor() -> None: "NOT", "OR", "EQUALTO", - "OR", "NOTEQUALTO", "OR", + "OR", "EQUALTO", "NOT", "AND", @@ -408,28 +408,28 @@ def test_and_expression_binding( ), ), Or( + BoundIn( + BoundReference( + field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + accessor=Accessor(position=0, inner=None), + ), + {literal("bar"), literal("baz")}, + ), Or( BoundIn( BoundReference( field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), accessor=Accessor(position=0, inner=None), ), - {literal("bar"), literal("baz")}, + {literal("bar")}, ), BoundIn( BoundReference( field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), accessor=Accessor(position=0, inner=None), ), - {literal("bar")}, - ), - ), - BoundIn( - BoundReference( - field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - accessor=Accessor(position=0, inner=None), + {literal("baz")}, ), - {literal("baz")}, ), ), ), diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 9f5aff3f70..9d5772d01c 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -836,5 +836,5 @@ def test_expression_to_complementary_pyarrow( # Notice an isNan predicate on a str column is automatically converted to always false and removed from Or and thus will not appear in the pc.expr. assert ( repr(result) - == """ 100)) or (is_nan(float_field) and (double_field == 0))) or (float_field > 100)) and invert(is_null(double_field, {nan_is_null=false})))) or is_null(float_field, {nan_is_null=false})) or is_null(string_field, {nan_is_null=false})) or is_nan(double_field))>""" + == """ 100)) or ((is_nan(float_field) and (double_field == 0)) or (float_field > 100))) and invert(is_null(double_field, {nan_is_null=false})))) or is_null(float_field, {nan_is_null=false})) or is_null(string_field, {nan_is_null=false})) or is_nan(double_field))>""" ) From 773392395f415093fc90da0204ab3fcb8044a7f2 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 24 Mar 2025 18:56:57 +0100 Subject: [PATCH 3/3] Use build_balanced_tree in And expression --- pyiceberg/expressions/__init__.py | 4 ++-- tests/expressions/test_expressions.py | 2 +- tests/expressions/test_visitors.py | 28 +++++++++++++-------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pyiceberg/expressions/__init__.py b/pyiceberg/expressions/__init__.py index c09b6132b9..2adf898fea 100644 --- a/pyiceberg/expressions/__init__.py +++ b/pyiceberg/expressions/__init__.py @@ -18,7 +18,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from functools import cached_property, reduce +from functools import cached_property from typing import ( Any, Callable, @@ -255,7 +255,7 @@ class And(BooleanExpression): def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore if rest: - return reduce(And, (left, right, *rest)) + return _build_balanced_tree(And, (left, right, *rest)) if left is AlwaysFalse() or right is AlwaysFalse(): return AlwaysFalse() elif left is AlwaysTrue(): diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 4f2b3ab848..a9b1935356 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -574,7 +574,7 @@ def test_negate(lhs: BooleanExpression, rhs: BooleanExpression) -> None: [ ( And(ExpressionA(), ExpressionB(), ExpressionA()), - And(And(ExpressionA(), ExpressionB()), ExpressionA()), + And(ExpressionA(), And(ExpressionB(), ExpressionA())), ), ( Or(ExpressionA(), ExpressionB(), ExpressionA()), diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index a52b10c4e7..586ba9f5d4 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -235,9 +235,9 @@ def test_boolean_expression_visitor() -> None: "OR", "EQUALTO", "NOT", - "AND", "NOTEQUALTO", "AND", + "AND", ] @@ -335,14 +335,14 @@ def test_always_false_or_always_true_expression_binding(table_schema_simple: Sch ), ), And( - And( - BoundIn( - BoundReference( - field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - accessor=Accessor(position=0, inner=None), - ), - {literal("bar"), literal("baz")}, + BoundIn( + BoundReference( + field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + accessor=Accessor(position=0, inner=None), ), + {literal("bar"), literal("baz")}, + ), + And( BoundEqualTo[int]( BoundReference( field=NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), @@ -350,13 +350,13 @@ def test_always_false_or_always_true_expression_binding(table_schema_simple: Sch ), literal(1), ), - ), - BoundEqualTo( - BoundReference( - field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - accessor=Accessor(position=0, inner=None), + BoundEqualTo( + BoundReference( + field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + accessor=Accessor(position=0, inner=None), + ), + literal("baz"), ), - literal("baz"), ), ), ),