diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 26241d2351..64a4d10a5c 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -896,10 +896,15 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi field = predicate.term.ref().field file_column_name = self.file_schema.find_column_name(field.field_id) + # In the case of schema evolution or column projection, the field might not be present in the file schema if file_column_name is None: - # In the case of schema evolution, the column might not be present - # we can use the default value as a constant and evaluate it against - # the predicate + # If the field has no initial_default, return AlwaysTrue to include all rows for further evaluation. + # This ensures that the predicate is evaluated during row-level filtering rather than being eliminated + # at the file level, preserving the ability to apply more granular filtering later in the process. + if field.initial_default is None: + return AlwaysTrue() + + # If the field has initial_default, use the default value as a constant and evaluate it against the predicate pred: BooleanExpression if isinstance(predicate, BoundUnaryPredicate): pred = predicate.as_unbound(field.name) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 273bd24c9b..5248f84a62 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -72,6 +72,7 @@ expression_to_plain_format, rewrite_not, rewrite_to_dnf, + translate_column_names, visit, visit_bound_predicate, ) @@ -79,6 +80,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( + BooleanType, DoubleType, FloatType, IcebergType, @@ -1623,3 +1625,203 @@ def test_expression_evaluator_null() -> None: assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True + + +def test_translate_column_names_simple_case(table_schema_simple: Schema) -> None: + """Test translate_column_names with matching column names.""" + # Create a bound expression using the original schema + unbound_expr = EqualTo("foo", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=table_schema_simple, case_sensitive=True)) + + # File schema has the same column names + file_schema = Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should return the same unbound expression since column names match + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("foo") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_different_column_names() -> None: + """Test translate_column_names with different column names in file schema.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="original_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Create bound expression + unbound_expr = EqualTo("original_name", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has different column name but same field ID + file_schema = Schema( + NestedField(field_id=1, name="file_column_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should use the file schema's column name + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("file_column_name") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_missing_column() -> None: + """Test translate_column_names when column is missing from file schema (schema evolution).""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False, initial_default="default"), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue because the default value (42) matches the literal (42) + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_missing_column_false_evaluation() -> None: + """Test translate_column_names when missing column evaluates to false.""" + # Original schema + original_schema = Schema( + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), + schema_id=1, + ) + + # Create bound expression that won't match the default value + unbound_expr = EqualTo("missing_col", 42) # default is 10, literal is 42 + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema doesn't have this column + file_schema = Schema( + NestedField(field_id=1, name="other_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysFalse because default value (10) doesn't match literal (42) + assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_complex_expression() -> None: + """Test translate_column_names with complex boolean expressions.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="col1", field_type=StringType(), required=False), + NestedField(field_id=2, name="col2", field_type=IntegerType(), required=True), + schema_id=1, + ) + + # Create complex bound expression + unbound_expr = And(EqualTo("col1", "test"), GreaterThan("col2", 10)) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has different column names + file_schema = Schema( + NestedField(field_id=1, name="file_col1", field_type=StringType(), required=False), + NestedField(field_id=2, name="file_col2", field_type=IntegerType(), required=True), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should be an And expression with translated column names + assert isinstance(translated_expr, And) + assert isinstance(translated_expr.left, EqualTo) + assert translated_expr.left.term == Reference("file_col1") + assert isinstance(translated_expr.right, GreaterThan) + assert translated_expr.right.term == Reference("file_col2") + + +def test_translate_column_names_case_sensitive() -> None: + """Test translate_column_names with case sensitivity.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="TestColumn", field_type=StringType(), required=False), + schema_id=1, + ) + + # Create bound expression + unbound_expr = EqualTo("TestColumn", "value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has same field ID but different case + file_schema = Schema( + NestedField(field_id=1, name="testcolumn", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate with case sensitivity + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should use the file schema's column name (different case) + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("testcolumn") + + +def test_translate_column_names_always_true_false() -> None: + """Test translate_column_names with AlwaysTrue and AlwaysFalse expressions.""" + file_schema = Schema( + NestedField(field_id=1, name="col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Test AlwaysTrue + translated_true = translate_column_names(AlwaysTrue(), file_schema, case_sensitive=True) + assert translated_true == AlwaysTrue() + + # Test AlwaysFalse + translated_false = translate_column_names(AlwaysFalse(), file_schema, case_sensitive=True) + assert translated_false == AlwaysFalse() + + +def test_translate_column_names_column_projection_missing_field_no_initial_default() -> None: + """Test translate_column_names for column projection when field is missing from file schema and has no initial_default.""" + # Original schema with a field that has no initial_default (defaults to None) + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), # No initial_default specified + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue when field has no initial_default, allowing for further evaluation + assert translated_expr == AlwaysTrue() diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..ada64b27ce 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1197,6 +1197,16 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa }, schema=schema, ) + # Test that the partition value is projected correctly + assert table.scan(row_filter="partition_id = 1").to_arrow() == pa.table( + { + "other_field": ["foo", "bar", "baz"], + "partition_id": [1, 1, 1], + }, + schema=schema, + ) + # Test that the partition value is not projected for a non-existing partition + assert len(table.scan(row_filter="partition_id = -1").to_arrow()) == 0 def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: