Skip to content

Commit f658044

Browse files
committed
Core: Fix partition column projection with schema evolution
Use table schema instead of projected schema when building partition type to avoid 'Could not find field with id' errors during column projection on partitioned tables with schema evolution.
1 parent 5773b7f commit f658044

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,14 +1492,18 @@ def _field_id(self, field: pa.Field) -> int:
14921492

14931493

14941494
def _get_column_projection_values(
1495-
file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
1495+
file: DataFile,
1496+
projected_schema: Schema,
1497+
table_schema: Schema,
1498+
partition_spec: Optional[PartitionSpec],
1499+
file_project_field_ids: Set[int],
14961500
) -> Dict[int, Any]:
14971501
"""Apply Column Projection rules to File Schema."""
14981502
project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids)
14991503
if len(project_schema_diff) == 0 or partition_spec is None:
15001504
return EMPTY_DICT
15011505

1502-
partition_schema = partition_spec.partition_type(projected_schema)
1506+
partition_schema = partition_spec.partition_type(table_schema)
15031507
accessors = build_position_accessors(partition_schema)
15041508

15051509
projected_missing_fields = {}
@@ -1517,6 +1521,7 @@ def _task_to_record_batches(
15171521
task: FileScanTask,
15181522
bound_row_filter: BooleanExpression,
15191523
projected_schema: Schema,
1524+
table_schema: Schema,
15201525
projected_field_ids: Set[int],
15211526
positional_deletes: Optional[List[ChunkedArray]],
15221527
case_sensitive: bool,
@@ -1541,7 +1546,7 @@ def _task_to_record_batches(
15411546

15421547
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
15431548
projected_missing_fields = _get_column_projection_values(
1544-
task.file, projected_schema, partition_spec, file_schema.field_ids
1549+
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
15451550
)
15461551

15471552
pyarrow_filter = None
@@ -1763,6 +1768,7 @@ def _record_batches_from_scan_tasks_and_deletes(
17631768
task,
17641769
self._bound_row_filter,
17651770
self._projected_schema,
1771+
self._table_metadata.schema(),
17661772
self._projected_field_ids,
17671773
deletes_per_file.get(task.file.file_path),
17681774
self._case_sensitive,

tests/io/test_pyarrow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,6 +2846,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str)
28462846
FileScanTask(data_file),
28472847
bound_row_filter=AlwaysTrue(),
28482848
projected_schema=table_schema,
2849+
table_schema=table_schema,
28492850
projected_field_ids={1},
28502851
positional_deletes=None,
28512852
case_sensitive=True,
@@ -4590,3 +4591,72 @@ def test_orc_stripe_based_batching(tmp_path: Path) -> None:
45904591
# Verify total rows
45914592
total_rows = sum(batch.num_rows for batch in batches)
45924593
assert total_rows == 10000, f"Expected 10000 total rows, got {total_rows}"
4594+
4595+
4596+
def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCatalog) -> None:
4597+
"""Test column projection on partitioned table after schema evolution (https://github.com/apache/iceberg-python/issues/2672)."""
4598+
initial_schema = Schema(
4599+
NestedField(1, "partition_date", DateType(), required=False),
4600+
NestedField(2, "id", IntegerType(), required=False),
4601+
NestedField(3, "name", StringType(), required=False),
4602+
NestedField(4, "value", IntegerType(), required=False),
4603+
)
4604+
4605+
partition_spec = PartitionSpec(
4606+
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_date"),
4607+
)
4608+
4609+
catalog.create_namespace("default")
4610+
table = catalog.create_table(
4611+
"default.test_schema_evolution_projection",
4612+
schema=initial_schema,
4613+
partition_spec=partition_spec,
4614+
)
4615+
4616+
data_v1 = pa.Table.from_pylist(
4617+
[
4618+
{"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", "value": 100},
4619+
{"partition_date": date(2024, 1, 1), "id": 2, "name": "Bob", "value": 200},
4620+
],
4621+
schema=pa.schema(
4622+
[
4623+
("partition_date", pa.date32()),
4624+
("id", pa.int32()),
4625+
("name", pa.string()),
4626+
("value", pa.int32()),
4627+
]
4628+
),
4629+
)
4630+
4631+
table.append(data_v1)
4632+
4633+
with table.update_schema() as update:
4634+
update.add_column("new_column", StringType())
4635+
4636+
table = catalog.load_table("default.test_schema_evolution_projection")
4637+
4638+
data_v2 = pa.Table.from_pylist(
4639+
[
4640+
{"partition_date": date(2024, 1, 2), "id": 3, "name": "Charlie", "value": 300, "new_column": "new1"},
4641+
{"partition_date": date(2024, 1, 2), "id": 4, "name": "David", "value": 400, "new_column": "new2"},
4642+
],
4643+
schema=pa.schema(
4644+
[
4645+
("partition_date", pa.date32()),
4646+
("id", pa.int32()),
4647+
("name", pa.string()),
4648+
("value", pa.int32()),
4649+
("new_column", pa.string()),
4650+
]
4651+
),
4652+
)
4653+
4654+
table.append(data_v2)
4655+
4656+
result = table.scan(selected_fields=("id", "name", "value", "new_column")).to_arrow()
4657+
4658+
assert set(result.schema.names) == {"id", "name", "value", "new_column"}
4659+
assert result.num_rows == 4
4660+
result_sorted = result.sort_by("name")
4661+
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
4662+
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]

0 commit comments

Comments
 (0)