Skip to content

Commit b0a7878

Browse files
010Sohamkevinjqliu
andauthored
Core: Respect partition evolution in inspect.partitions (#2845)
What does this change do? - Build the inspect.partitions schema using only the partition specs present in the selected snapshot’s manifests, so newer partition fields don’t appear as None for older snapshots. - Normalize partition comparisons in integration tests to ignore trailing None fields from dropped partition columns. Why is this needed? - Partition evolution should show the partition shape that actually existed when the data was written. Mixing all specs produced misleading None fields for older partitions. How was this tested? - make lint - uv run python -m pytest tests/io/test_pyarrow.py -k "partition_evolution" -v - CI: full integration suite (GitHub Actions) Closes #1120 --------- Co-authored-by: Soham <010Soham@users.noreply.github.com> Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com> Co-authored-by: Kevin Liu <kevin.jq.liu@gmail.com>
1 parent 1754064 commit b0a7878

File tree

4 files changed

+54
-8
lines changed

4 files changed

+54
-8
lines changed

pyiceberg/table/inspect.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,9 @@ def partitions(
285285
]
286286
)
287287

288-
partition_record = self.tbl.metadata.specs_struct()
288+
snapshot = self._get_snapshot(snapshot_id)
289+
spec_ids = {manifest.partition_spec_id for manifest in snapshot.manifests(self.tbl.io)}
290+
partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
289291
has_partitions = len(partition_record.fields) > 0
290292

291293
if has_partitions:
@@ -299,8 +301,6 @@ def partitions(
299301

300302
table_schema = pa.unify_schemas([partitions_schema, table_schema])
301303

302-
snapshot = self._get_snapshot(snapshot_id)
303-
304304
scan = DataScan(
305305
table_metadata=self.tbl.metadata,
306306
io=self.tbl.io,

pyiceberg/table/metadata.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import datetime
2020
import uuid
21+
from collections.abc import Iterable
2122
from copy import copy
2223
from typing import Annotated, Any, Literal
2324

@@ -262,18 +263,23 @@ def specs(self) -> dict[int, PartitionSpec]:
262263
"""Return a dict the partition specs this table."""
263264
return {spec.spec_id: spec for spec in self.partition_specs}
264265

265-
def specs_struct(self) -> StructType:
266-
"""Produce a struct of all the combined PartitionSpecs.
266+
def specs_struct(self, spec_ids: Iterable[int] | None = None) -> StructType:
267+
"""Produce a struct of the combined PartitionSpecs.
267268
268269
The partition fields should be optional: Partition fields may be added later,
269270
in which case not all files would have the result field, and it may be null.
270271
271-
:return: A StructType that represents all the combined PartitionSpecs of the table
272+
Args:
273+
spec_ids: Optional iterable of spec IDs to include. When not provided,
274+
all table specs are used.
275+
276+
:return: A StructType that represents the combined PartitionSpecs of the table
272277
"""
273278
specs = self.specs()
279+
selected_specs = specs.values() if spec_ids is None else [specs[spec_id] for spec_id in spec_ids if spec_id in specs]
274280

275281
# Collect all the fields
276-
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}
282+
struct_fields = {field.field_id: field for spec in selected_specs for field in spec.fields}
277283

278284
schema = self.schema()
279285

tests/integration/test_inspect_table.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import math
2020
from datetime import date, datetime
21+
from typing import Any
2122

2223
import pyarrow as pa
2324
import pytest
@@ -208,9 +209,18 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
208209
def _check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None:
209210
lhs = df.to_pandas().sort_values("last_updated_at")
210211
rhs = spark_df.toPandas().sort_values("last_updated_at")
212+
213+
def _normalize_partition(d: dict[str, Any]) -> dict[str, Any]:
214+
return {k: v for k, v in d.items() if v is not None}
215+
211216
for column in df.column_names:
212217
for left, right in zip(lhs[column].to_list(), rhs[column].to_list(), strict=True):
213-
assert left == right, f"Difference in column {column}: {left} != {right}"
218+
if column == "partition":
219+
assert _normalize_partition(left) == _normalize_partition(right), (
220+
f"Difference in column {column}: {left} != {right}"
221+
)
222+
else:
223+
assert left == right, f"Difference in column {column}: {left} != {right}"
214224

215225

216226
@pytest.mark.integration

tests/io/test_pyarrow.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2594,6 +2594,36 @@ def test_inspect_partition_for_nested_field(catalog: InMemoryCatalog) -> None:
25942594
assert {part["part"] for part in partitions} == {"data-a", "data-b"}
25952595

25962596

2597+
def test_inspect_partitions_respects_partition_evolution(catalog: InMemoryCatalog) -> None:
2598+
schema = Schema(
2599+
NestedField(id=1, name="dt", field_type=DateType(), required=False),
2600+
NestedField(id=2, name="category", field_type=StringType(), required=False),
2601+
)
2602+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="dt"))
2603+
catalog.create_namespace("default")
2604+
table = catalog.create_table(
2605+
"default.test_inspect_partitions_respects_partition_evolution", schema=schema, partition_spec=spec
2606+
)
2607+
2608+
old_spec_id = table.spec().spec_id
2609+
old_data = [{"dt": date(2025, 1, 1), "category": "old"}]
2610+
table.append(pa.Table.from_pylist(old_data, schema=table.schema().as_arrow()))
2611+
2612+
table.update_spec().add_identity("category").commit()
2613+
new_spec_id = table.spec().spec_id
2614+
assert new_spec_id != old_spec_id
2615+
2616+
partitions_table = table.inspect.partitions()
2617+
partitions = partitions_table["partition"].to_pylist()
2618+
assert all("category" not in partition for partition in partitions)
2619+
2620+
new_data = [{"dt": date(2025, 1, 2), "category": "new"}]
2621+
table.append(pa.Table.from_pylist(new_data, schema=table.schema().as_arrow()))
2622+
2623+
partitions_table = table.inspect.partitions()
2624+
assert set(partitions_table["spec_id"].to_pylist()) == {old_spec_id, new_spec_id}
2625+
2626+
25972627
def test_identity_partition_on_multi_columns() -> None:
25982628
test_pa_schema = pa.schema([("born_year", pa.int64()), ("n_legs", pa.int64()), ("animal", pa.string())])
25992629
test_schema = Schema(

0 commit comments

Comments
 (0)