Skip to content

Commit 5508ed2

Browse files
committed
Core: Fix partition column projection with schema evolution
Use table schema instead of projected schema when building partition type to avoid 'Could not find field with id' errors during column projection on partitioned tables with schema evolution.
1 parent 5773b7f commit 5508ed2

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,14 +1492,14 @@ def _field_id(self, field: pa.Field) -> int:
14921492

14931493

14941494
def _get_column_projection_values(
1495-
file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
1495+
file: DataFile, projected_schema: Schema, table_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
14961496
) -> Dict[int, Any]:
14971497
"""Apply Column Projection rules to File Schema."""
14981498
project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids)
14991499
if len(project_schema_diff) == 0 or partition_spec is None:
15001500
return EMPTY_DICT
15011501

1502-
partition_schema = partition_spec.partition_type(projected_schema)
1502+
partition_schema = partition_spec.partition_type(table_schema)
15031503
accessors = build_position_accessors(partition_schema)
15041504

15051505
projected_missing_fields = {}
@@ -1517,6 +1517,7 @@ def _task_to_record_batches(
15171517
task: FileScanTask,
15181518
bound_row_filter: BooleanExpression,
15191519
projected_schema: Schema,
1520+
table_schema: Schema,
15201521
projected_field_ids: Set[int],
15211522
positional_deletes: Optional[List[ChunkedArray]],
15221523
case_sensitive: bool,
@@ -1541,7 +1542,7 @@ def _task_to_record_batches(
15411542

15421543
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
15431544
projected_missing_fields = _get_column_projection_values(
1544-
task.file, projected_schema, partition_spec, file_schema.field_ids
1545+
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
15451546
)
15461547

15471548
pyarrow_filter = None
@@ -1763,6 +1764,7 @@ def _record_batches_from_scan_tasks_and_deletes(
17631764
task,
17641765
self._bound_row_filter,
17651766
self._projected_schema,
1767+
self._table_metadata.schema(),
17661768
self._projected_field_ids,
17671769
deletes_per_file.get(task.file.file_path),
17681770
self._case_sensitive,

tests/io/test_pyarrow.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,6 +2846,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str)
28462846
FileScanTask(data_file),
28472847
bound_row_filter=AlwaysTrue(),
28482848
projected_schema=table_schema,
2849+
table_schema=table_schema,
28492850
projected_field_ids={1},
28502851
positional_deletes=None,
28512852
case_sensitive=True,
@@ -4590,3 +4591,73 @@ def test_orc_stripe_based_batching(tmp_path: Path) -> None:
45904591
# Verify total rows
45914592
total_rows = sum(batch.num_rows for batch in batches)
45924593
assert total_rows == 10000, f"Expected 10000 total rows, got {total_rows}"
4594+
4595+
4596+
def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCatalog) -> None:
4597+
"""Test column projection on partitioned table after schema evolution (https://github.com/apache/iceberg-python/issues/2672)."""
4598+
initial_schema = Schema(
4599+
NestedField(1, "partition_date", DateType(), required=False),
4600+
NestedField(2, "id", IntegerType(), required=False),
4601+
NestedField(3, "name", StringType(), required=False),
4602+
NestedField(4, "value", IntegerType(), required=False),
4603+
)
4604+
4605+
partition_spec = PartitionSpec(
4606+
PartitionField(
4607+
source_id=1,
4608+
field_id=1000,
4609+
transform=IdentityTransform(),
4610+
name="partition_date"
4611+
),
4612+
)
4613+
4614+
catalog.create_namespace("default")
4615+
table = catalog.create_table(
4616+
"default.test_schema_evolution_projection",
4617+
schema=initial_schema,
4618+
partition_spec=partition_spec,
4619+
)
4620+
4621+
data_v1 = pa.Table.from_pylist(
4622+
[
4623+
{"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", "value": 100},
4624+
{"partition_date": date(2024, 1, 1), "id": 2, "name": "Bob", "value": 200},
4625+
],
4626+
schema=pa.schema([
4627+
("partition_date", pa.date32()),
4628+
("id", pa.int32()),
4629+
("name", pa.string()),
4630+
("value", pa.int32()),
4631+
])
4632+
)
4633+
4634+
table.append(data_v1)
4635+
4636+
with table.update_schema() as update:
4637+
update.add_column("new_column", StringType())
4638+
4639+
table = catalog.load_table("default.test_schema_evolution_projection")
4640+
4641+
data_v2 = pa.Table.from_pylist(
4642+
[
4643+
{"partition_date": date(2024, 1, 2), "id": 3, "name": "Charlie", "value": 300, "new_column": "new1"},
4644+
{"partition_date": date(2024, 1, 2), "id": 4, "name": "David", "value": 400, "new_column": "new2"},
4645+
],
4646+
schema=pa.schema([
4647+
("partition_date", pa.date32()),
4648+
("id", pa.int32()),
4649+
("name", pa.string()),
4650+
("value", pa.int32()),
4651+
("new_column", pa.string()),
4652+
])
4653+
)
4654+
4655+
table.append(data_v2)
4656+
4657+
result = table.scan(selected_fields=("id", "name", "value", "new_column")).to_arrow()
4658+
4659+
assert set(result.schema.names) == {"id", "name", "value", "new_column"}
4660+
assert result.num_rows == 4
4661+
result_sorted = result.sort_by("name")
4662+
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
4663+
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]

0 commit comments

Comments
 (0)