From bc89cfac38a8675ac7aafbcaeeaa2788a6a02dfe Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 21:01:44 -0700 Subject: [PATCH 1/5] add unit tests --- tests/expressions/test_visitors.py | 176 +++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 273bd24c9b..236b1945a6 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -72,6 +72,7 @@ expression_to_plain_format, rewrite_not, rewrite_to_dnf, + translate_column_names, visit, visit_bound_predicate, ) @@ -79,6 +80,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( + BooleanType, DoubleType, FloatType, IcebergType, @@ -1623,3 +1625,177 @@ def test_expression_evaluator_null() -> None: assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True + + +def test_translate_column_names_simple_case(table_schema_simple: Schema) -> None: + """Test translate_column_names with matching column names.""" + # Create a bound expression using the original schema + unbound_expr = EqualTo("foo", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=table_schema_simple, case_sensitive=True)) + + # File schema has the same column names + file_schema = Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should return the same unbound expression since column names match + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("foo") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_different_column_names() -> None: + """Test translate_column_names with different column names in file schema.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="original_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Create bound expression + unbound_expr = EqualTo("original_name", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has different column name but same field ID + file_schema = Schema( + NestedField(field_id=1, name="file_column_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should use the file schema's column name + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("file_column_name") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_missing_column() -> None: + """Test translate_column_names when column is missing from file schema (schema evolution).""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False, initial_default="default"), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue because the default value (42) matches the literal (42) + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_missing_column_false_evaluation() -> None: + """Test translate_column_names when missing column evaluates to false.""" + # Original schema + original_schema = Schema( + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), + schema_id=1, + ) + + # Create bound expression that won't match the default value + unbound_expr = EqualTo("missing_col", 42) # default is 10, literal is 42 + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema doesn't have this column + file_schema = Schema( + NestedField(field_id=1, name="other_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysFalse because default value (10) doesn't match literal (42) + assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_complex_expression() -> None: + """Test translate_column_names with complex boolean expressions.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="col1", field_type=StringType(), required=False), + NestedField(field_id=2, name="col2", field_type=IntegerType(), required=True), + schema_id=1, + ) + + # Create complex bound expression + unbound_expr = And(EqualTo("col1", "test"), GreaterThan("col2", 10)) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has different column names + file_schema = Schema( + NestedField(field_id=1, name="file_col1", field_type=StringType(), required=False), + NestedField(field_id=2, name="file_col2", field_type=IntegerType(), required=True), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should be an And expression with translated column names + assert isinstance(translated_expr, And) + assert isinstance(translated_expr.left, EqualTo) + assert translated_expr.left.term == Reference("file_col1") + assert isinstance(translated_expr.right, GreaterThan) + assert translated_expr.right.term == Reference("file_col2") + + +def test_translate_column_names_case_sensitive() -> None: + """Test translate_column_names with case sensitivity.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="TestColumn", field_type=StringType(), required=False), + schema_id=1, + ) + + # Create bound expression + unbound_expr = EqualTo("TestColumn", "value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has same field ID but different case + file_schema = Schema( + NestedField(field_id=1, name="testcolumn", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate with case sensitivity + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should use the file schema's column name (different case) + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("testcolumn") + + +def test_translate_column_names_always_true_false() -> None: + """Test translate_column_names with AlwaysTrue and AlwaysFalse expressions.""" + file_schema = Schema( + NestedField(field_id=1, name="col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Test AlwaysTrue + translated_true = translate_column_names(AlwaysTrue(), file_schema, case_sensitive=True) + assert translated_true == AlwaysTrue() + + # Test AlwaysFalse + translated_false = translate_column_names(AlwaysFalse(), file_schema, case_sensitive=True) + assert translated_false == AlwaysFalse() From c6b0f5ffcd68f5eb7927b71cc6f43368b8aa4835 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 21:11:00 -0700 Subject: [PATCH 2/5] add case for column projection --- pyiceberg/expressions/visitors.py | 6 ++++++ tests/expressions/test_visitors.py | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 26241d2351..ba5ab382a3 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -897,6 +897,12 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi file_column_name = self.file_schema.find_column_name(field.field_id) if file_column_name is None: + # In the case of column projection, the field might not be present in the file schema + # If the field has no initial_default, return AlwaysTrue to include all rows + # for further evaluation + if field.initial_default is None: + return AlwaysTrue() + # In the case of schema evolution, the column might not be present # we can use the default value as a constant and evaluate it against # the predicate diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 236b1945a6..5248f84a62 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -1799,3 +1799,29 @@ def test_translate_column_names_always_true_false() -> None: # Test AlwaysFalse translated_false = translate_column_names(AlwaysFalse(), file_schema, case_sensitive=True) assert translated_false == AlwaysFalse() + + +def test_translate_column_names_column_projection_missing_field_no_initial_default() -> None: + """Test translate_column_names for column projection when field is missing from file schema and has no initial_default.""" + # Original schema with a field that has no initial_default (defaults to None) + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), # No initial_default specified + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue when field has no initial_default, allowing for further evaluation + assert translated_expr == AlwaysTrue() From c8d41df77597d7380ac9beb8231a73568211ea4a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 21:24:55 -0700 Subject: [PATCH 3/5] add column projection test --- tests/io/test_pyarrow.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..ada64b27ce 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1197,6 +1197,16 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa }, schema=schema, ) + # Test that the partition value is projected correctly + assert table.scan(row_filter="partition_id = 1").to_arrow() == pa.table( + { + "other_field": ["foo", "bar", "baz"], + "partition_id": [1, 1, 1], + }, + schema=schema, + ) + # Test that the partition value is not projected for a non-existing partition + assert len(table.scan(row_filter="partition_id = -1").to_arrow()) == 0 def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: From 7568b0dbbd98e3eab2fa04565af8b866524d2c99 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 21:32:48 -0700 Subject: [PATCH 4/5] better comments --- pyiceberg/expressions/visitors.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index ba5ab382a3..94e7d34b44 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -896,16 +896,13 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi field = predicate.term.ref().field file_column_name = self.file_schema.find_column_name(field.field_id) + # In the case of schema evolution or column projection, the field might not be present in the file schema if file_column_name is None: - # In the case of column projection, the field might not be present in the file schema - # If the field has no initial_default, return AlwaysTrue to include all rows - # for further evaluation + # If the field has no initial_default, return AlwaysTrue to include all rows for further evaluation if field.initial_default is None: return AlwaysTrue() - # In the case of schema evolution, the column might not be present - # we can use the default value as a constant and evaluate it against - # the predicate + # If the field has initial_default, use the default value as a constant and evaluate it against the predicate pred: BooleanExpression if isinstance(predicate, BoundUnaryPredicate): pred = predicate.as_unbound(field.name) From 50b7b396208e4af394ee14dd637fc00e6960b3ee Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 21:47:59 -0700 Subject: [PATCH 5/5] Update pyiceberg/expressions/visitors.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pyiceberg/expressions/visitors.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 94e7d34b44..64a4d10a5c 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -898,7 +898,9 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi # In the case of schema evolution or column projection, the field might not be present in the file schema if file_column_name is None: - # If the field has no initial_default, return AlwaysTrue to include all rows for further evaluation + # If the field has no initial_default, return AlwaysTrue to include all rows for further evaluation. + # This ensures that the predicate is evaluated during row-level filtering rather than being eliminated + # at the file level, preserving the ability to apply more granular filtering later in the process. if field.initial_default is None: return AlwaysTrue()