-
Notifications
You must be signed in to change notification settings - Fork 413
Fix projected fields predicate evaluation #2029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8dbf6b
e88a6a2
befa05c
a93b4eb
10afbb8
1ce4889
7b2ecbb
682afc5
f9b53e0
bc8d5c9
7de4744
3d58180
4757d6f
69a850e
fa78926
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,13 +72,15 @@ | |
| expression_to_plain_format, | ||
| rewrite_not, | ||
| rewrite_to_dnf, | ||
| translate_column_names, | ||
| visit, | ||
| visit_bound_predicate, | ||
| ) | ||
| from pyiceberg.manifest import ManifestFile, PartitionFieldSummary | ||
| from pyiceberg.schema import Accessor, Schema | ||
| from pyiceberg.typedef import Record | ||
| from pyiceberg.types import ( | ||
| BooleanType, | ||
| DoubleType, | ||
| FloatType, | ||
| IcebergType, | ||
|
|
@@ -1623,3 +1625,282 @@ def test_expression_evaluator_null() -> None: | |
| assert expression_evaluator(schema, LessThan("a", 1), case_sensitive=True)(struct) is False | ||
| assert expression_evaluator(schema, StartsWith("a", 1), case_sensitive=True)(struct) is False | ||
| assert expression_evaluator(schema, NotStartsWith("a", 1), case_sensitive=True)(struct) is True | ||
|
|
||
|
|
||
| def test_translate_column_names_simple_case(table_schema_simple: Schema) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for all the test cases, I've stepped through them and they cover the cases I'd expect. 1.) Disjunctive/Conjunctive cases (Or, and, etc) where one field is missing from the file and one field is not. Maybe mix this in with the rename case where the file on disk has the field but with a different name (I see that's already tested in the single predicate case, but just to sanity check combined cases) 2.) Maybe a nested field case though it's really no different Down the line, when say Spark can support the DDL. for default values then we can have end to end verification tests as well |
||
| """Test translate_column_names with matching column names.""" | ||
| # Create a bound expression using the original schema | ||
| unbound_expr = EqualTo("foo", "test_value") | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=table_schema_simple, case_sensitive=True)) | ||
|
|
||
| # File schema has the same column names | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="foo", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), | ||
| NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # Should return an unbound expression with the same column name since they match | ||
| assert isinstance(translated_expr, EqualTo) | ||
| assert translated_expr.term == Reference("foo") | ||
| assert translated_expr.literal == literal("test_value") | ||
|
|
||
|
|
||
| def test_translate_column_names_different_column_names() -> None: | ||
| """Test translate_column_names with different column names in file schema.""" | ||
| # Original schema | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="original_name", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression | ||
| unbound_expr = EqualTo("original_name", "test_value") | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema has different column name but same field ID | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="file_column_name", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # Should use the file schema's column name | ||
| assert isinstance(translated_expr, EqualTo) | ||
| assert translated_expr.term == Reference("file_column_name") | ||
| assert translated_expr.literal == literal("test_value") | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column() -> None: | ||
| """Test translate_column_names when column is missing from file schema (such as in schema evolution).""" | ||
| # Original schema | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # missing_col's default initial_default (None) does not match the expression literal (42) | ||
| assert translated_expr == AlwaysFalse() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_match_null() -> None: | ||
| """Test translate_column_names when missing column matches null.""" | ||
| # Original schema | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = IsNull("missing_col") | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # Should evaluate to AlwaysTrue because the missing column is treated as null | ||
| # missing_col's default initial_default (None) satisfies the IsNull predicate | ||
| assert translated_expr == AlwaysTrue() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_with_initial_default() -> None: | ||
| """Test translate_column_names when missing column's initial_default matches expression.""" | ||
| # Original schema | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # Should evaluate to AlwaysTrue because the initial_default value (42) matches the literal (42) | ||
| assert translated_expr == AlwaysTrue() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_with_initial_default_mismatch() -> None: | ||
| """Test translate_column_names when missing column's initial_default doesn't match expression.""" | ||
| # Original schema | ||
| original_schema = Schema( | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression that won't match the default value | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema doesn't have this column | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="other_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names(bound_expr, file_schema, case_sensitive=True) | ||
|
|
||
| # Should evaluate to AlwaysFalse because initial_default value (10) doesn't match literal (42) | ||
| assert translated_expr == AlwaysFalse() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_with_projected_field_matches() -> None: | ||
| """Test translate_column_names with projected field value that matches expression.""" | ||
| # Original schema with a field that has no initial_default (defaults to None) | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Projected column that is missing in the file schema | ||
| projected_field_values = {"missing_col": 42} | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names( | ||
| bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values | ||
| ) | ||
|
|
||
| # Should evaluate to AlwaysTrue since projected field value matches the expression literal | ||
| # even though the field is missing in the file schema | ||
| assert translated_expr == AlwaysTrue() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_with_projected_field_mismatch() -> None: | ||
| """Test translate_column_names with projected field value that doesn't match expression.""" | ||
| # Original schema with a field that has no initial_default (defaults to None) | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Projected column that is missing in the file schema | ||
| projected_field_values = {"missing_col": 1} | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names( | ||
| bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values | ||
| ) | ||
|
|
||
| # Should evaluate to AlwaysFalse since projected field value does not match the expression literal | ||
| assert translated_expr == AlwaysFalse() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_projected_field_fallbacks_to_initial_default() -> None: | ||
| """Test translate_column_names when projected field value doesn't match but initial_default does.""" | ||
| # Original schema with a field that has an initial_default | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=42), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column that would match initial_default | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Projected field value that differs from both the expression literal and initial_default | ||
| projected_field_values = {"missing_col": 10} # This doesn't match expression literal (42) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names( | ||
| bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values | ||
| ) | ||
|
|
||
| # Should evaluate to AlwaysTrue since projected field value doesn't match but initial_default does | ||
| assert translated_expr == AlwaysTrue() | ||
|
|
||
|
|
||
| def test_translate_column_names_missing_column_projected_field_matches_initial_default_mismatch() -> None: | ||
| """Test translate_column_names when both projected field value and initial_default doesn't match.""" | ||
| # Original schema with a field that has an initial_default that doesn't match the expression | ||
| original_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| NestedField(field_id=2, name="missing_col", field_type=IntegerType(), required=False, initial_default=10), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Create bound expression for the missing column | ||
| unbound_expr = EqualTo("missing_col", 42) | ||
| bound_expr = visit(unbound_expr, visitor=BindVisitor(schema=original_schema, case_sensitive=True)) | ||
|
|
||
| # File schema only has the existing column (field_id=1), missing field_id=2 | ||
| file_schema = Schema( | ||
| NestedField(field_id=1, name="existing_col", field_type=StringType(), required=False), | ||
| schema_id=1, | ||
| ) | ||
|
|
||
| # Projected field value that matches the expression literal | ||
| projected_field_values = {"missing_col": 10} # This doesn't match expression literal (42) | ||
|
|
||
| # Translate column names | ||
| translated_expr = translate_column_names( | ||
| bound_expr, file_schema, case_sensitive=True, projected_field_values=projected_field_values | ||
| ) | ||
|
|
||
| # Should evaluate to AlwaysFalse since both projected field value and initial_default does not match | ||
| assert translated_expr == AlwaysFalse() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what's the rationale for not including default value handling inside the logic that produces the
projected_field_values? Seems like the intent of_get_column_projection_valuesis to apply all the projection rules based on the comment but it looks like we apply most of them and then here we fall through to applying the initial default on 928. May be better if all of that logic is self contained in the function so that in case things move around you don't have a separate place where default values are propagatedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about condensing this into: