Skip to content

Commit 32a0df7

Browse files
author
Roman Shanin
committed
extend signatue of translate_column_names
1 parent 38ebb19 commit 32a0df7

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

pyiceberg/expressions/visitors.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,9 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
860860
861861
Args:
862862
file_schema (Schema): The schema of the file.
863+
projected_schema (Schema): The schema to project onto the data files.
863864
case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.
865+
projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values.
864866
865867
Raises:
866868
TypeError: In the case of an UnboundPredicate.
@@ -870,9 +872,13 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
870872
file_schema: Schema
871873
case_sensitive: bool
872874

873-
def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
875+
def __init__(
876+
self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any]
877+
) -> None:
874878
self.file_schema = file_schema
879+
self.projected_schema = projected_schema
875880
self.case_sensitive = case_sensitive
881+
self.projected_missing_fields = projected_missing_fields
876882

877883
def visit_true(self) -> BooleanExpression:
878884
return AlwaysTrue()
@@ -913,8 +919,14 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
913919
raise ValueError(f"Unsupported predicate: {predicate}")
914920

915921

916-
def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression:
917-
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))
922+
def translate_column_names(
923+
expr: BooleanExpression,
924+
file_schema: Schema,
925+
projected_schema: Schema,
926+
case_sensitive: bool,
927+
projected_missing_fields: dict[str, Any],
928+
) -> BooleanExpression:
929+
return visit(expr, _ColumnNameTranslator(file_schema, projected_schema, case_sensitive, projected_missing_fields))
918930

919931

920932
class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):

pyiceberg/io/pyarrow.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,18 +1404,24 @@ def _task_to_record_batches(
14041404
# the table format version.
14051405
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
14061406

1407-
pyarrow_filter = None
1408-
if bound_row_filter is not AlwaysTrue():
1409-
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
1410-
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1411-
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1412-
14131407
# Apply column projection rules
14141408
# https://iceberg.apache.org/spec/#column-projection
14151409
should_project_columns, projected_missing_fields = _get_column_projection_values(
14161410
task.file, projected_schema, partition_spec, file_schema.field_ids
14171411
)
14181412

1413+
pyarrow_filter = None
1414+
if bound_row_filter is not AlwaysTrue():
1415+
translated_row_filter = translate_column_names(
1416+
bound_row_filter,
1417+
file_schema,
1418+
projected_schema,
1419+
case_sensitive=case_sensitive,
1420+
projected_missing_fields=projected_missing_fields,
1421+
)
1422+
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1423+
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1424+
14191425
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
14201426

14211427
fragment_scanner = ds.Scanner.from_fragment(

0 commit comments

Comments
 (0)