Skip to content

Commit e86feda

Browse files
authored
Merge branch 'main' into fd-add-test-for-migrated-tables
2 parents f88b1ab + 2a11b25 commit e86feda

File tree

10 files changed

+341
-44
lines changed

10 files changed

+341
-44
lines changed

.github/workflows/pypi-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
if: startsWith(matrix.os, 'ubuntu')
6363

6464
- name: Build wheels
65-
uses: pypa/cibuildwheel@v3.0.1
65+
uses: pypa/cibuildwheel@v3.1.3
6666
with:
6767
output-dir: wheelhouse
6868
config-file: "pyproject.toml"

.github/workflows/svn-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
if: startsWith(matrix.os, 'ubuntu')
5858

5959
- name: Build wheels
60-
uses: pypa/cibuildwheel@v3.0.1
60+
uses: pypa/cibuildwheel@v3.1.3
6161
with:
6262
output-dir: wheelhouse
6363
config-file: "pyproject.toml"

poetry.lock

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/expressions/visitors.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,7 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
861861
Args:
862862
file_schema (Schema): The schema of the file.
863863
case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.
864+
projected_field_values (Dict[str, Any]): Values for projected fields not present in the data file.
864865
865866
Raises:
866867
TypeError: In the case of an UnboundPredicate.
@@ -869,10 +870,12 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
869870

870871
file_schema: Schema
871872
case_sensitive: bool
873+
projected_field_values: Dict[str, Any]
872874

873-
def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
875+
def __init__(self, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT) -> None:
874876
self.file_schema = file_schema
875877
self.case_sensitive = case_sensitive
878+
self.projected_field_values = projected_field_values or {}
876879

877880
def visit_true(self) -> BooleanExpression:
878881
return AlwaysTrue()
@@ -897,9 +900,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
897900
file_column_name = self.file_schema.find_column_name(field.field_id)
898901

899902
if file_column_name is None:
900-
# In the case of schema evolution, the column might not be present
901-
# we can use the default value as a constant and evaluate it against
902-
# the predicate
903+
# In the case of schema evolution or column projection, the field might not be present in the file schema.
904+
# we can use the projected value or the field's default value as a constant and evaluate it against the predicate
903905
pred: BooleanExpression
904906
if isinstance(predicate, BoundUnaryPredicate):
905907
pred = predicate.as_unbound(field.name)
@@ -910,6 +912,14 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
910912
else:
911913
raise ValueError(f"Unsupported predicate: {predicate}")
912914

915+
# In the order described by the "Column Projection" section of the Iceberg spec:
916+
# https://iceberg.apache.org/spec/#column-projection
917+
# Evaluate column projection first if it exists
918+
if projected_field_value := self.projected_field_values.get(field.name):
919+
if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(projected_field_value)):
920+
return AlwaysTrue()
921+
922+
# Evaluate initial_default value
913923
return (
914924
AlwaysTrue()
915925
if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default))
@@ -926,8 +936,10 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
926936
raise ValueError(f"Unsupported predicate: {predicate}")
927937

928938

929-
def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression:
930-
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))
939+
def translate_column_names(
940+
expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_field_values: Dict[str, Any] = EMPTY_DICT
941+
) -> BooleanExpression:
942+
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_field_values))
931943

932944

933945
class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):

pyiceberg/io/pyarrow.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,18 +1460,20 @@ def _task_to_record_batches(
14601460
# the table format version.
14611461
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
14621462

1463-
pyarrow_filter = None
1464-
if bound_row_filter is not AlwaysTrue():
1465-
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
1466-
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1467-
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1468-
14691463
# Apply column projection rules
14701464
# https://iceberg.apache.org/spec/#column-projection
14711465
should_project_columns, projected_missing_fields = _get_column_projection_values(
14721466
task.file, projected_schema, partition_spec, file_schema.field_ids
14731467
)
14741468

1469+
pyarrow_filter = None
1470+
if bound_row_filter is not AlwaysTrue():
1471+
translated_row_filter = translate_column_names(
1472+
bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields
1473+
)
1474+
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1475+
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1476+
14751477
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
14761478

14771479
fragment_scanner = ds.Scanner.from_fragment(

pyiceberg/table/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2096,14 +2096,6 @@ def generate_data_file_filename(self, extension: str) -> str:
20962096
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
20972097

20982098

2099-
@dataclass(frozen=True)
2100-
class AddFileTask:
2101-
"""Task with the parameters for adding a Parquet file as a DataFile."""
2102-
2103-
file_path: str
2104-
partition_field_value: Record
2105-
2106-
21072099
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
21082100
"""Convert a list files into DataFiles.
21092101

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ sqlalchemy = { version = "^2.0.18", optional = true }
8282
bodo = { version = ">=2025.7.4", optional = true }
8383
daft = { version = ">=0.5.0", optional = true }
8484
cachetools = ">=5.5,<7.0"
85-
pyiceberg-core = { version = "^0.5.1", optional = true }
85+
pyiceberg-core = { version = ">=0.5.1,<0.7.0", optional = true }
8686
polars = { version = "^1.21.0", optional = true }
8787
thrift-sasl = { version = ">=0.4.3", optional = true }
8888
kerberos = {version = "^1.3.1", optional = true}
@@ -109,14 +109,14 @@ mypy-boto3-dynamodb = ">=1.28.18"
109109
[tool.poetry.group.docs.dependencies]
110110
# for mkdocs
111111
mkdocs = "1.6.1"
112-
griffe = "1.7.3"
112+
griffe = "1.9.0"
113113
jinja2 = "3.1.6"
114114
mkdocstrings = "0.30.0"
115115
mkdocstrings-python = "1.16.12"
116116
mkdocs-literate-nav = "0.6.2"
117117
mkdocs-autorefs = "1.4.2"
118118
mkdocs-gen-files = "0.5.0"
119-
mkdocs-material = "9.6.15"
119+
mkdocs-material = "9.6.16"
120120
mkdocs-material-extensions = "1.3.1"
121121
mkdocs-section-index = "0.3.10"
122122

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2501,7 +2501,7 @@ def spark() -> "SparkSession":
25012501
# Remember to also update `dev/Dockerfile`
25022502
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
25032503
scala_version = "2.12"
2504-
iceberg_version = "1.9.0"
2504+
iceberg_version = "1.9.2"
25052505
# Should match with Spark:
25062506
hadoop_version = "3.3.4"
25072507
aws_sdk_version = "1.12.753"

0 commit comments

Comments
 (0)