From a8dbf6bbc022048efb30f3928725e3924e7a398b Mon Sep 17 00:00:00 2001 From: Roman Shanin Date: Thu, 12 Jun 2025 18:22:29 +0300 Subject: [PATCH 01/14] extend signatue of translate_column_names --- pyiceberg/expressions/visitors.py | 18 +++++++++++++++--- pyiceberg/io/pyarrow.py | 18 ++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index abac19bc19..a3b077fcef 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -860,7 +860,9 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): Args: file_schema (Schema): The schema of the file. + projected_schema (Schema): The schema to project onto the data files. case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. + projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values. Raises: TypeError: In the case of an UnboundPredicate. @@ -870,9 +872,13 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): file_schema: Schema case_sensitive: bool - def __init__(self, file_schema: Schema, case_sensitive: bool) -> None: + def __init__( + self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any] + ) -> None: self.file_schema = file_schema + self.projected_schema = projected_schema self.case_sensitive = case_sensitive + self.projected_missing_fields = projected_missing_fields def visit_true(self) -> BooleanExpression: return AlwaysTrue() @@ -913,8 +919,14 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi raise ValueError(f"Unsupported predicate: {predicate}") -def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression: - return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive)) +def translate_column_names( + expr: BooleanExpression, + file_schema: Schema, + projected_schema: Schema, + case_sensitive: bool, + projected_missing_fields: dict[str, Any], +) -> BooleanExpression: + return visit(expr, _ColumnNameTranslator(file_schema, projected_schema, case_sensitive, projected_missing_fields)) class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3e49885e58..7ad3935fc5 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1458,18 +1458,24 @@ def _task_to_record_batches( # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) - pyarrow_filter = None - if bound_row_filter is not AlwaysTrue(): - translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) - bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) - pyarrow_filter = expression_to_pyarrow(bound_file_filter) - # Apply column projection rules # https://iceberg.apache.org/spec/#column-projection should_project_columns, projected_missing_fields = _get_column_projection_values( task.file, projected_schema, partition_spec, file_schema.field_ids ) + pyarrow_filter = None + if bound_row_filter is not AlwaysTrue(): + translated_row_filter = translate_column_names( + bound_row_filter, + file_schema, + projected_schema, + case_sensitive=case_sensitive, + projected_missing_fields=projected_missing_fields, + ) + bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) + pyarrow_filter = expression_to_pyarrow(bound_file_filter) + file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) fragment_scanner = ds.Scanner.from_fragment( From e88a6a205d10fcb035ccd5d5ae64d854c45cb851 Mon Sep 17 00:00:00 2001 From: Roman Shanin Date: Thu, 12 Jun 2025 18:22:56 +0300 Subject: [PATCH 02/14] add test to check projected field predicate evaluator --- tests/expressions/test_visitors.py | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 273bd24c9b..e3eb1bcc61 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -68,10 +68,12 @@ BooleanExpressionVisitor, BoundBooleanExpressionVisitor, _ManifestEvalVisitor, + bind, expression_evaluator, expression_to_plain_format, rewrite_not, rewrite_to_dnf, + translate_column_names, visit, visit_bound_predicate, ) @@ -1623,3 +1625,40 @@ def test_expression_evaluator_null() -> None: assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True + + +@pytest.mark.parametrize( + "before_expression,after_expression", + [ + (In("id", {1, 2, 3}), AlwaysTrue()), + (EqualTo("id", 3), AlwaysFalse()), + ( + And(EqualTo("id", 1), EqualTo("all_same_value_or_null", "string")), + And(AlwaysTrue(), EqualTo("all_same_value_or_null", "string")), + ), + ( + And(EqualTo("all_same_value_or_null", "string"), GreaterThan("id", 2)), + And(EqualTo("all_same_value_or_null", "string"), AlwaysFalse()), + ), + ( + Or( + And(EqualTo("id", 1), EqualTo("all_same_value_or_null", "string")), + And(EqualTo("all_same_value_or_null", "string"), GreaterThan("id", 2)), + ), + Or( + And(AlwaysTrue(), EqualTo("all_same_value_or_null", "string")), + And(EqualTo("all_same_value_or_null", "string"), AlwaysFalse()), + ), + ), + ], +) +def test_translate_column_names_eval_projected_fields( + schema: Schema, before_expression: BooleanExpression, after_expression: BooleanExpression +) -> None: + # exclude id from file_schema pretending that it's part of partition values + file_schema = Schema(*[field for field in schema.columns if field.name != "id"]) + projected_missing_fields = {"id": 1} + assert ( + translate_column_names(bind(schema, before_expression, True), file_schema, schema, True, projected_missing_fields) + == after_expression + ) From befa05ccc251e157cd1fe778c4d5c1224f8b0de0 Mon Sep 17 00:00:00 2001 From: Roman Shanin Date: Thu, 12 Jun 2025 18:23:06 +0300 Subject: [PATCH 03/14] evaluate projected fields in predicate --- pyiceberg/expressions/visitors.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index a3b077fcef..4e819ea5bd 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -906,6 +906,24 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi # in the file schema when reading older data if isinstance(predicate, BoundIsNull): return AlwaysTrue() + # Evaluate projected field by value extracted from partition + elif (field_name := predicate.term.ref().field.name) in self.projected_missing_fields: + unbound_predicate: BooleanExpression + if isinstance(predicate, BoundUnaryPredicate): + unbound_predicate = predicate.as_unbound(field_name) + elif isinstance(predicate, BoundLiteralPredicate): + unbound_predicate = predicate.as_unbound(field_name, predicate.literal) + elif isinstance(predicate, BoundSetPredicate): + unbound_predicate = predicate.as_unbound(field_name, predicate.literals) + else: + raise ValueError(f"Unsupported predicate: {predicate}") + field = self.projected_schema.find_field(field_name) + schema = Schema(field) + evaluator = expression_evaluator(schema, unbound_predicate, self.case_sensitive) + if evaluator(Record(self.projected_missing_fields[field_name])): + return AlwaysTrue() + else: + return AlwaysFalse() else: return AlwaysFalse() From a93b4ebc508276dc872c7dcc732e7c1ddfa980d2 Mon Sep 17 00:00:00 2001 From: Roman Shanin Date: Tue, 1 Jul 2025 11:45:23 +0300 Subject: [PATCH 04/14] extract projected columns evaluator into separate visitor --- pyiceberg/expressions/visitors.py | 100 +++++++++++++++++++++-------- pyiceberg/io/pyarrow.py | 8 ++- tests/expressions/test_visitors.py | 13 ++-- 3 files changed, 85 insertions(+), 36 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 4e819ea5bd..bd9f30f2f0 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -860,9 +860,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): Args: file_schema (Schema): The schema of the file. - projected_schema (Schema): The schema to project onto the data files. case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. - projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values. Raises: TypeError: In the case of an UnboundPredicate. @@ -872,13 +870,9 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): file_schema: Schema case_sensitive: bool - def __init__( - self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any] - ) -> None: + def __init__(self, file_schema: Schema, case_sensitive: bool) -> None: self.file_schema = file_schema - self.projected_schema = projected_schema self.case_sensitive = case_sensitive - self.projected_missing_fields = projected_missing_fields def visit_true(self) -> BooleanExpression: return AlwaysTrue() @@ -906,24 +900,6 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi # in the file schema when reading older data if isinstance(predicate, BoundIsNull): return AlwaysTrue() - # Evaluate projected field by value extracted from partition - elif (field_name := predicate.term.ref().field.name) in self.projected_missing_fields: - unbound_predicate: BooleanExpression - if isinstance(predicate, BoundUnaryPredicate): - unbound_predicate = predicate.as_unbound(field_name) - elif isinstance(predicate, BoundLiteralPredicate): - unbound_predicate = predicate.as_unbound(field_name, predicate.literal) - elif isinstance(predicate, BoundSetPredicate): - unbound_predicate = predicate.as_unbound(field_name, predicate.literals) - else: - raise ValueError(f"Unsupported predicate: {predicate}") - field = self.projected_schema.find_field(field_name) - schema = Schema(field) - evaluator = expression_evaluator(schema, unbound_predicate, self.case_sensitive) - if evaluator(Record(self.projected_missing_fields[field_name])): - return AlwaysTrue() - else: - return AlwaysFalse() else: return AlwaysFalse() @@ -937,14 +913,84 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi raise ValueError(f"Unsupported predicate: {predicate}") -def translate_column_names( +def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression: + return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive)) + + +class _ProjectedColumnsEvaluator(BooleanExpressionVisitor[BooleanExpression]): + """Evaluated predicates which involve projected columns missing from the file. + + Args: + file_schema (Schema): The schema of the file. + projected_schema (Schema): The schema to project onto the data files. + case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. + projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values. + + Raises: + TypeError: In the case of an UnboundPredicate. + """ + + file_schema: Schema + case_sensitive: bool + + def __init__( + self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any] + ) -> None: + self.file_schema = file_schema + self.projected_schema = projected_schema + self.case_sensitive = case_sensitive + self.projected_missing_fields = projected_missing_fields + + def visit_true(self) -> BooleanExpression: + return AlwaysTrue() + + def visit_false(self) -> BooleanExpression: + return AlwaysFalse() + + def visit_not(self, child_result: BooleanExpression) -> BooleanExpression: + return Not(child=child_result) + + def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: + return And(left=left_result, right=right_result) + + def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: + return Or(left=left_result, right=right_result) + + def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: + raise TypeError(f"Expected Bound Predicate, got: {predicate.term}") + + def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression: + file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id) + + if file_column_name is None and (field_name := predicate.term.ref().field.name) in self.projected_missing_fields: + unbound_predicate: BooleanExpression + if isinstance(predicate, BoundUnaryPredicate): + unbound_predicate = predicate.as_unbound(field_name) + elif isinstance(predicate, BoundLiteralPredicate): + unbound_predicate = predicate.as_unbound(field_name, predicate.literal) + elif isinstance(predicate, BoundSetPredicate): + unbound_predicate = predicate.as_unbound(field_name, predicate.literals) + else: + raise ValueError(f"Unsupported predicate: {predicate}") + field = self.projected_schema.find_field(field_name) + schema = Schema(field) + evaluator = expression_evaluator(schema, unbound_predicate, self.case_sensitive) + if evaluator(Record(self.projected_missing_fields[field_name])): + return AlwaysTrue() + else: + return AlwaysFalse() + + return predicate + + +def evaluate_projected_columns( expr: BooleanExpression, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any], ) -> BooleanExpression: - return visit(expr, _ColumnNameTranslator(file_schema, projected_schema, case_sensitive, projected_missing_fields)) + return visit(expr, _ProjectedColumnsEvaluator(file_schema, projected_schema, case_sensitive, projected_missing_fields)) class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7ad3935fc5..17793b6330 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -78,6 +78,7 @@ from pyiceberg.expressions.visitors import ( BoundBooleanExpressionVisitor, bind, + evaluate_projected_columns, extract_field_ids, translate_column_names, ) @@ -1466,13 +1467,18 @@ def _task_to_record_batches( pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): - translated_row_filter = translate_column_names( + evaluated_projected_columns_filter = evaluate_projected_columns( bound_row_filter, file_schema, projected_schema, case_sensitive=case_sensitive, projected_missing_fields=projected_missing_fields, ) + translated_row_filter = translate_column_names( + evaluated_projected_columns_filter, + file_schema, + case_sensitive=case_sensitive, + ) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index e3eb1bcc61..7d39dc96d7 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -69,11 +69,11 @@ BoundBooleanExpressionVisitor, _ManifestEvalVisitor, bind, + evaluate_projected_columns, expression_evaluator, expression_to_plain_format, rewrite_not, rewrite_to_dnf, - translate_column_names, visit, visit_bound_predicate, ) @@ -1652,13 +1652,10 @@ def test_expression_evaluator_null() -> None: ), ], ) -def test_translate_column_names_eval_projected_fields( - schema: Schema, before_expression: BooleanExpression, after_expression: BooleanExpression -) -> None: +def test_eval_projected_fields(schema: Schema, before_expression: BooleanExpression, after_expression: BooleanExpression) -> None: # exclude id from file_schema pretending that it's part of partition values file_schema = Schema(*[field for field in schema.columns if field.name != "id"]) projected_missing_fields = {"id": 1} - assert ( - translate_column_names(bind(schema, before_expression, True), file_schema, schema, True, projected_missing_fields) - == after_expression - ) + assert evaluate_projected_columns( + bind(schema, before_expression, True), file_schema, schema, True, projected_missing_fields + ) == bind(schema, after_expression, True) From 1ce4889138ea526b699e67b5d22c46ac6836d689 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 25 Jul 2025 12:20:54 -0700 Subject: [PATCH 05/14] project value in _ColumnNameTranslator --- pyiceberg/expressions/visitors.py | 16 ++++++++++++---- pyiceberg/io/pyarrow.py | 12 +----------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 1a6f9eb1be..a9326637c0 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -23,6 +23,7 @@ Dict, Generic, List, + Optional, Set, SupportsFloat, Tuple, @@ -861,6 +862,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): Args: file_schema (Schema): The schema of the file. case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. + projected_fields (Dict[str, Any]): Partition field values for missing fields from projection. Raises: TypeError: In the case of an UnboundPredicate. @@ -869,10 +871,12 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): file_schema: Schema case_sensitive: bool + projected_fields: Dict[str, Any] - def __init__(self, file_schema: Schema, case_sensitive: bool) -> None: + def __init__(self, file_schema: Schema, case_sensitive: bool, projected_fields: Optional[Dict[str, Any]] = None) -> None: self.file_schema = file_schema self.case_sensitive = case_sensitive + self.projected_fields = projected_fields or {} def visit_true(self) -> BooleanExpression: return AlwaysTrue() @@ -912,7 +916,9 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi return ( AlwaysTrue() - if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default)) + if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)( + Record(field.initial_default or self.projected_fields.get(field.name, None)) + ) else AlwaysFalse() ) @@ -926,8 +932,10 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi raise ValueError(f"Unsupported predicate: {predicate}") -def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression: - return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive)) +def translate_column_names( + expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_fields: Optional[Dict[str, Any]] = None +) -> BooleanExpression: + return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_fields)) class _ProjectedColumnsEvaluator(BooleanExpressionVisitor[BooleanExpression]): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 9108dbe55b..5d62c54a05 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -78,7 +78,6 @@ from pyiceberg.expressions.visitors import ( BoundBooleanExpressionVisitor, bind, - evaluate_projected_columns, extract_field_ids, translate_column_names, ) @@ -1469,17 +1468,8 @@ def _task_to_record_batches( pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): - evaluated_projected_columns_filter = evaluate_projected_columns( - bound_row_filter, - file_schema, - projected_schema, - case_sensitive=case_sensitive, - projected_missing_fields=projected_missing_fields, - ) translated_row_filter = translate_column_names( - evaluated_projected_columns_filter, - file_schema, - case_sensitive=case_sensitive, + bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_fields=projected_missing_fields ) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) From 7b2ecbb3262e341c7815aec608e0947f7fd1a757 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 26 Jul 2025 11:43:41 -0700 Subject: [PATCH 06/14] remove other changes --- pyiceberg/expressions/visitors.py | 76 ------------------------------ tests/expressions/test_visitors.py | 36 -------------- 2 files changed, 112 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index a9326637c0..7b337640a9 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -938,82 +938,6 @@ def translate_column_names( return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_fields)) -class _ProjectedColumnsEvaluator(BooleanExpressionVisitor[BooleanExpression]): - """Evaluated predicates which involve projected columns missing from the file. - - Args: - file_schema (Schema): The schema of the file. - projected_schema (Schema): The schema to project onto the data files. - case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. - projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values. - - Raises: - TypeError: In the case of an UnboundPredicate. - """ - - file_schema: Schema - case_sensitive: bool - - def __init__( - self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any] - ) -> None: - self.file_schema = file_schema - self.projected_schema = projected_schema - self.case_sensitive = case_sensitive - self.projected_missing_fields = projected_missing_fields - - def visit_true(self) -> BooleanExpression: - return AlwaysTrue() - - def visit_false(self) -> BooleanExpression: - return AlwaysFalse() - - def visit_not(self, child_result: BooleanExpression) -> BooleanExpression: - return Not(child=child_result) - - def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: - return And(left=left_result, right=right_result) - - def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: - return Or(left=left_result, right=right_result) - - def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: - raise TypeError(f"Expected Bound Predicate, got: {predicate.term}") - - def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression: - file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id) - - if file_column_name is None and (field_name := predicate.term.ref().field.name) in self.projected_missing_fields: - unbound_predicate: BooleanExpression - if isinstance(predicate, BoundUnaryPredicate): - unbound_predicate = predicate.as_unbound(field_name) - elif isinstance(predicate, BoundLiteralPredicate): - unbound_predicate = predicate.as_unbound(field_name, predicate.literal) - elif isinstance(predicate, BoundSetPredicate): - unbound_predicate = predicate.as_unbound(field_name, predicate.literals) - else: - raise ValueError(f"Unsupported predicate: {predicate}") - field = self.projected_schema.find_field(field_name) - schema = Schema(field) - evaluator = expression_evaluator(schema, unbound_predicate, self.case_sensitive) - if evaluator(Record(self.projected_missing_fields[field_name])): - return AlwaysTrue() - else: - return AlwaysFalse() - - return predicate - - -def evaluate_projected_columns( - expr: BooleanExpression, - file_schema: Schema, - projected_schema: Schema, - case_sensitive: bool, - projected_missing_fields: dict[str, Any], -) -> BooleanExpression: - return visit(expr, _ProjectedColumnsEvaluator(file_schema, projected_schema, case_sensitive, projected_missing_fields)) - - class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]): """Extracts the field IDs used in the BooleanExpression.""" diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 7d39dc96d7..273bd24c9b 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -68,8 +68,6 @@ BooleanExpressionVisitor, BoundBooleanExpressionVisitor, _ManifestEvalVisitor, - bind, - evaluate_projected_columns, expression_evaluator, expression_to_plain_format, rewrite_not, @@ -1625,37 +1623,3 @@ def test_expression_evaluator_null() -> None: assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True - - -@pytest.mark.parametrize( - "before_expression,after_expression", - [ - (In("id", {1, 2, 3}), AlwaysTrue()), - (EqualTo("id", 3), AlwaysFalse()), - ( - And(EqualTo("id", 1), EqualTo("all_same_value_or_null", "string")), - And(AlwaysTrue(), EqualTo("all_same_value_or_null", "string")), - ), - ( - And(EqualTo("all_same_value_or_null", "string"), GreaterThan("id", 2)), - And(EqualTo("all_same_value_or_null", "string"), AlwaysFalse()), - ), - ( - Or( - And(EqualTo("id", 1), EqualTo("all_same_value_or_null", "string")), - And(EqualTo("all_same_value_or_null", "string"), GreaterThan("id", 2)), - ), - Or( - And(AlwaysTrue(), EqualTo("all_same_value_or_null", "string")), - And(EqualTo("all_same_value_or_null", "string"), AlwaysFalse()), - ), - ), - ], -) -def test_eval_projected_fields(schema: Schema, before_expression: BooleanExpression, after_expression: BooleanExpression) -> None: - # exclude id from file_schema pretending that it's part of partition values - file_schema = Schema(*[field for field in schema.columns if field.name != "id"]) - projected_missing_fields = {"id": 1} - assert evaluate_projected_columns( - bind(schema, before_expression, True), file_schema, schema, True, projected_missing_fields - ) == bind(schema, after_expression, True) From 682afc554f7551d62da736bfbe049248497e155e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 26 Jul 2025 18:15:17 -0700 Subject: [PATCH 07/14] fix --- pyiceberg/expressions/visitors.py | 21 +-- pyiceberg/io/pyarrow.py | 2 +- tests/expressions/test_visitors.py | 251 +++++++++++++++++++++++++++++ tests/io/test_pyarrow.py | 10 ++ 4 files changed, 273 insertions(+), 11 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 7b337640a9..8e9908690f 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -862,7 +862,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): Args: file_schema (Schema): The schema of the file. case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True. - projected_fields (Dict[str, Any]): Partition field values for missing fields from projection. + projected_field_values (Dict[str, Any]): Values for projected fields not present in the data file. Raises: TypeError: In the case of an UnboundPredicate. @@ -871,12 +871,14 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): file_schema: Schema case_sensitive: bool - projected_fields: Dict[str, Any] + projected_field_values: Dict[str, Any] - def __init__(self, file_schema: Schema, case_sensitive: bool, projected_fields: Optional[Dict[str, Any]] = None) -> None: + def __init__( + self, file_schema: Schema, case_sensitive: bool, projected_field_values: Optional[Dict[str, Any]] = None + ) -> None: self.file_schema = file_schema self.case_sensitive = case_sensitive - self.projected_fields = projected_fields or {} + self.projected_field_values = projected_field_values or {} def visit_true(self) -> BooleanExpression: return AlwaysTrue() @@ -901,9 +903,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi file_column_name = self.file_schema.find_column_name(field.field_id) if file_column_name is None: - # In the case of schema evolution, the column might not be present - # we can use the default value as a constant and evaluate it against - # the predicate + # In the case of schema evolution or column projection, the column might not be present in the file schema. + # we can use the projected value or the field's default value as a constant and evaluate it against the predicate pred: BooleanExpression if isinstance(predicate, BoundUnaryPredicate): pred = predicate.as_unbound(field.name) @@ -917,7 +918,7 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi return ( AlwaysTrue() if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)( - Record(field.initial_default or self.projected_fields.get(field.name, None)) + Record(self.projected_field_values.get(field.name, None) or field.initial_default) ) else AlwaysFalse() ) @@ -933,9 +934,9 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi def translate_column_names( - expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_fields: Optional[Dict[str, Any]] = None + expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_field_values: Optional[Dict[str, Any]] = None ) -> BooleanExpression: - return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_fields)) + return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_field_values)) class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5d62c54a05..e6992843ca 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1469,7 +1469,7 @@ def _task_to_record_batches( pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names( - bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_fields=projected_missing_fields + bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields ) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 273bd24c9b..4d4f26c530 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -72,6 +72,7 @@ expression_to_plain_format, rewrite_not, rewrite_to_dnf, + translate_column_names, visit, visit_bound_predicate, ) @@ -79,6 +80,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( + BooleanType, DoubleType, FloatType, IcebergType, @@ -1623,3 +1625,252 @@ def test_expression_evaluator_null() -> None: assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True + + +def test_translate_column_names_simple_case(table_schema_simple: Schema) -> None: + """Test translate_column_names with matching column names.""" + # Create a bound expression using the original schema + unbound_expr = EqualTo("foo", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=table_schema_simple, case_sensitive=True)) + + # File schema has the same column names + file_schema = Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should return an unbound expression with the same column name since they match + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("foo") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_different_column_names() -> None: + """Test translate_column_names with different column names in file schema.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="original_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Create bound expression + unbound_expr = EqualTo("original_name", "test_value") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema has different column name but same field ID + file_schema = Schema( + NestedField(field_id=1, name="file_column_name", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should use the file schema's column name + assert isinstance(translated_expr, EqualTo) + assert translated_expr.term == Reference("file_column_name") + assert translated_expr.literal == literal("test_value") + + +def test_translate_column_names_missing_column() -> None: + """Test translate_column_names when column is missing from file schema (such as in schema evolution).""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # missing_col's initial_default (None) does not match the expression literal (42) + assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_missing_column_is_null() -> None: + """Test translate_column_names when missing column is checked for null.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = IsNull("missing_col") + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue because the missing column is treated as null + # missing_col's initial_default (None) satisfies the IsNull predicate + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_missing_column_with_initial_default() -> None: + """Test translate_column_names when missing column has initial_default that matches expression.""" + # Original schema + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysTrue because the initial_default value (42) matches the literal (42) + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_missing_column_initial_default_mismatch() -> None: + """Test translate_column_names when missing column's initial_default doesn't match expression.""" + # Original schema + original_schema = Schema( + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), + schema_id=1, + ) + + # Create bound expression that won't match the default value + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema doesn't have this column + file_schema = Schema( + NestedField(field_id=1, name="other_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Translate column names + translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) + + # Should evaluate to AlwaysFalse because initial_default value (10) doesn't match literal (42) + assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_projected_field_matches() -> None: + """Test translate_column_names with projected field value that matches expression.""" + # Original schema with a field that has no initial_default (defaults to None) + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Projected column that is missing in the file schema + projected_field_values = {"missing_col": 42} + + # Translate column names + translated_expr = translate_column_names( + bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values + ) + + # Should evaluate to AlwaysTrue since projected field value matches the expression literal + # even though the field is missing in the file schema + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_projected_field_mismatch() -> None: + """Test translate_column_names with projected field value that doesn't match expression.""" + # Original schema with a field that has no initial_default (defaults to None) + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Projected column that is missing in the file schema + projected_field_values = {"missing_col": 1} + + # Translate column names + translated_expr = translate_column_names( + bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values + ) + + # Should evaluate to AlwaysFalse since projected field value does not match the expression literal + assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_projected_field_and_initial_default() -> None: + """Test translate_column_names with both projected field values and initial_default values.""" + # Original schema with mixed field configurations + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col_1", field_type=IntegerType(), required=False), + NestedField(field_id=3, name="missing_col_2", field_type=StringType(), required=False, initial_default="test"), + schema_id=1, + ) + + # Create bound expression for both missing columns + unbound_expr = And(EqualTo("missing_col_1", 42), EqualTo("missing_col_2", "test")) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 and field_id=3 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Projected value for one missing column + projected_field_values = {"missing_col_1": 42} + + # Translate column names + translated_expr = translate_column_names( + bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values + ) + + # Should evaluate to AlwaysTrue since both missing_col_1's projected value and missing_col_2's initial_default match their respective expression literals + assert translated_expr == AlwaysTrue() diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..ac16ef18f6 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1197,6 +1197,16 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa }, schema=schema, ) + # Test that row filter works with partition value projection + assert table.scan(row_filter="partition_id = 1").to_arrow() == pa.table( + { + "other_field": ["foo", "bar", "baz"], + "partition_id": [1, 1, 1], + }, + schema=schema, + ) + # Test that row filter does not return any rows for a non-existing partition value + assert len(table.scan(row_filter="partition_id = -1").to_arrow()) == 0 def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: From f9b53e0137e19566967416954eb42bd846b2148d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 26 Jul 2025 23:11:09 -0700 Subject: [PATCH 08/14] fix --- tests/expressions/test_visitors.py | 48 +++++------------------------- 1 file changed, 8 insertions(+), 40 deletions(-) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 4d4f26c530..a57b3d302e 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -1699,12 +1699,12 @@ def test_translate_column_names_missing_column() -> None: # Translate column names translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) - # missing_col's initial_default (None) does not match the expression literal (42) + # missing_col's default initial_default (None) does not match the expression literal (42) assert translated_expr == AlwaysFalse() -def test_translate_column_names_missing_column_is_null() -> None: - """Test translate_column_names when missing column is checked for null.""" +def test_translate_column_names_missing_column_match_null() -> None: + """Test translate_column_names when missing column matches null.""" # Original schema original_schema = Schema( NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), @@ -1726,12 +1726,12 @@ def test_translate_column_names_missing_column_is_null() -> None: translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) # Should evaluate to AlwaysTrue because the missing column is treated as null - # missing_col's initial_default (None) satisfies the IsNull predicate + # missing_col's default initial_default (None) satisfies the IsNull predicate assert translated_expr == AlwaysTrue() def test_translate_column_names_missing_column_with_initial_default() -> None: - """Test translate_column_names when missing column has initial_default that matches expression.""" + """Test translate_column_names when missing column's initial_default matches expression.""" # Original schema original_schema = Schema( NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), @@ -1756,7 +1756,7 @@ def test_translate_column_names_missing_column_with_initial_default() -> None: assert translated_expr == AlwaysTrue() -def test_translate_column_names_missing_column_initial_default_mismatch() -> None: +def test_translate_column_names_missing_column_with_initial_default_mismatch() -> None: """Test translate_column_names when missing column's initial_default doesn't match expression.""" # Original schema original_schema = Schema( @@ -1781,7 +1781,7 @@ def test_translate_column_names_missing_column_initial_default_mismatch() -> Non assert translated_expr == AlwaysFalse() -def test_translate_column_names_projected_field_matches() -> None: +def test_translate_column_names_missing_column_with_projected_field_matches() -> None: """Test translate_column_names with projected field value that matches expression.""" # Original schema with a field that has no initial_default (defaults to None) original_schema = Schema( @@ -1813,7 +1813,7 @@ def test_translate_column_names_projected_field_matches() -> None: assert translated_expr == AlwaysTrue() -def test_translate_column_names_projected_field_mismatch() -> None: +def test_translate_column_names_missing_column_with_projected_field_mismatch() -> None: """Test translate_column_names with projected field value that doesn't match expression.""" # Original schema with a field that has no initial_default (defaults to None) original_schema = Schema( @@ -1842,35 +1842,3 @@ def test_translate_column_names_projected_field_mismatch() -> None: # Should evaluate to AlwaysFalse since projected field value does not match the expression literal assert translated_expr == AlwaysFalse() - - -def test_translate_column_names_projected_field_and_initial_default() -> None: - """Test translate_column_names with both projected field values and initial_default values.""" - # Original schema with mixed field configurations - original_schema = Schema( - NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), - NestedField(field_id=2, name="missing_col_1", field_type=IntegerType(), required=False), - NestedField(field_id=3, name="missing_col_2", field_type=StringType(), required=False, initial_default="test"), - schema_id=1, - ) - - # Create bound expression for both missing columns - unbound_expr = And(EqualTo("missing_col_1", 42), EqualTo("missing_col_2", "test")) - bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) - - # File schema only has the existing column (field_id=1), missing field_id=2 and field_id=3 - file_schema = Schema( - NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), - schema_id=1, - ) - - # Projected value for one missing column - projected_field_values = {"missing_col_1": 42} - - # Translate column names - translated_expr = translate_column_names( - bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values - ) - - # Should evaluate to AlwaysTrue since both missing_col_1's projected value and missing_col_2's initial_default match their respective expression literals - assert translated_expr == AlwaysTrue() From bc8d5c99c39f3595a70859ea06eb16ce14206c68 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 27 Jul 2025 09:21:46 -0700 Subject: [PATCH 09/14] fix logic --- pyiceberg/expressions/visitors.py | 10 +++-- tests/expressions/test_visitors.py | 62 ++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 8e9908690f..3e4dd1b103 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -915,11 +915,15 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi else: raise ValueError(f"Unsupported predicate: {predicate}") + # Evaluate column projection first if it exists + if projected_field_value := self.projected_field_values.get(field.name): + if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(projected_field_value)): + return AlwaysTrue() + + # Evaluate initial_default value return ( AlwaysTrue() - if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)( - Record(self.projected_field_values.get(field.name, None) or field.initial_default) - ) + if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default)) else AlwaysFalse() ) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index a57b3d302e..c317f93b34 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -1842,3 +1842,65 @@ def test_translate_column_names_missing_column_with_projected_field_mismatch() - # Should evaluate to AlwaysFalse since projected field value does not match the expression literal assert translated_expr == AlwaysFalse() + + +def test_translate_column_names_missing_column_projected_field_fallbacks_to_initial_default() -> None: + """Test translate_column_names when projected field value doesn't match but initial_default does.""" + # Original schema with a field that has an initial_default + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), + schema_id=1, + ) + + # Create bound expression for the missing column that would match initial_default + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Projected field value that differs from both the expression literal and initial_default + projected_field_values = {"missing_col_1": 10} # This doesn't match expression literal (42) + + # Translate column names + translated_expr = translate_column_names( + bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values + ) + + # Should evaluate to AlwaysTrue since projected field value doesn't match but initial_default does + assert translated_expr == AlwaysTrue() + + +def test_translate_column_names_missing_column_projected_field_matches_initial_default_mismatch() -> None: + """Test translate_column_names when both projected field value and initial_default doesn't match.""" + # Original schema with a field that has an initial_default that doesn't match the expression + original_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), + schema_id=1, + ) + + # Create bound expression for the missing column + unbound_expr = EqualTo("missing_col", 42) + bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) + + # File schema only has the existing column (field_id=1), missing field_id=2 + file_schema = Schema( + NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), + schema_id=1, + ) + + # Projected field value that matches the expression literal + projected_field_values = {"missing_col_1": 10} # This doesn't match expression literal (42) + + # Translate column names + translated_expr = translate_column_names( + bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values + ) + + # Should evaluate to AlwaysFalse since both projected field value and initial_default does not match + assert translated_expr == AlwaysFalse() From 7de474450a74a77f41f3aa91bc3d9be185d3dd5c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 27 Jul 2025 09:30:06 -0700 Subject: [PATCH 10/14] comments --- pyiceberg/expressions/visitors.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 3e4dd1b103..e3ad011cd8 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -903,7 +903,7 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi file_column_name = self.file_schema.find_column_name(field.field_id) if file_column_name is None: - # In the case of schema evolution or column projection, the column might not be present in the file schema. + # In the case of schema evolution or column projection, the field might not be present in the file schema. # we can use the projected value or the field's default value as a constant and evaluate it against the predicate pred: BooleanExpression if isinstance(predicate, BoundUnaryPredicate): @@ -915,6 +915,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi else: raise ValueError(f"Unsupported predicate: {predicate}") + # In the order described by the "Column Projection" section of the Iceberg spec: + # https://iceberg.apache.org/spec/#column-projection # Evaluate column projection first if it exists if projected_field_value := self.projected_field_values.get(field.name): if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(projected_field_value)): From 3d581809976bbdd1bf46bd3328d1ce12d16a4b1b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 27 Jul 2025 09:45:27 -0700 Subject: [PATCH 11/14] Update tests/expressions/test_visitors.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/expressions/test_visitors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index c317f93b34..5d500f7af4 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -1864,7 +1864,7 @@ def test_translate_column_names_missing_column_projected_field_fallbacks_to_init ) # Projected field value that differs from both the expression literal and initial_default - projected_field_values = {"missing_col_1": 10} # This doesn't match expression literal (42) + projected_field_values = {"missing_col": 10} # This doesn't match expression literal (42) # Translate column names translated_expr = translate_column_names( From 4757d6ffff22d7e23ea1c1c51dccb370afc9afd5 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 27 Jul 2025 09:45:55 -0700 Subject: [PATCH 12/14] Update tests/expressions/test_visitors.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/expressions/test_visitors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 5d500f7af4..f02aadfe44 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -1895,7 +1895,7 @@ def test_translate_column_names_missing_column_projected_field_matches_initial_d ) # Projected field value that matches the expression literal - projected_field_values = {"missing_col_1": 10} # This doesn't match expression literal (42) + projected_field_values = {"missing_col": 10} # This doesn't match expression literal (42) # Translate column names translated_expr = translate_column_names( From 69a850ef3a4ed3b86c192f954697844e902f9052 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 15:21:56 -0700 Subject: [PATCH 13/14] Update pyiceberg/expressions/visitors.py Co-authored-by: Fokko Driesprong --- pyiceberg/expressions/visitors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index e3ad011cd8..f1d3494f36 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -874,7 +874,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): projected_field_values: Dict[str, Any] def __init__( - self, file_schema: Schema, case_sensitive: bool, projected_field_values: Optional[Dict[str, Any]] = None + self, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT ) -> None: self.file_schema = file_schema self.case_sensitive = case_sensitive From fa789265b830f770b1e135d0c6caa0de308a4419 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 15:28:29 -0700 Subject: [PATCH 14/14] use EMPTY_DICT --- pyiceberg/expressions/visitors.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index f1d3494f36..99cbc0fb66 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -23,7 +23,6 @@ Dict, Generic, List, - Optional, Set, SupportsFloat, Tuple, @@ -873,9 +872,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): case_sensitive: bool projected_field_values: Dict[str, Any] - def __init__( - self, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT - ) -> None: + def __init__(self, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT) -> None: self.file_schema = file_schema self.case_sensitive = case_sensitive self.projected_field_values = projected_field_values or {} @@ -940,7 +937,7 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi def translate_column_names( - expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_field_values: Optional[Dict[str, Any]] = None + expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT ) -> BooleanExpression: return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_field_values))