Skip to content

Commit 57a0ed6

Browse files
committed
Respect partition evolution in inspect.partitions
1 parent c542e99 commit 57a0ed6

File tree

3 files changed

+41
-7
lines changed

3 files changed

+41
-7
lines changed

pyiceberg/table/inspect.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,9 @@ def partitions(
285285
]
286286
)
287287

288-
partition_record = self.tbl.metadata.specs_struct()
288+
snapshot = self._get_snapshot(snapshot_id)
289+
spec_ids = {manifest.partition_spec_id for manifest in snapshot.manifests(self.tbl.io)}
290+
partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
289291
has_partitions = len(partition_record.fields) > 0
290292

291293
if has_partitions:
@@ -299,8 +301,6 @@ def partitions(
299301

300302
table_schema = pa.unify_schemas([partitions_schema, table_schema])
301303

302-
snapshot = self._get_snapshot(snapshot_id)
303-
304304
scan = DataScan(
305305
table_metadata=self.tbl.metadata,
306306
io=self.tbl.io,

pyiceberg/table/metadata.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import datetime
2020
import uuid
21+
from collections.abc import Iterable
2122
from copy import copy
2223
from typing import Annotated, Any, Literal
2324

@@ -262,18 +263,23 @@ def specs(self) -> dict[int, PartitionSpec]:
262263
"""Return a dict the partition specs this table."""
263264
return {spec.spec_id: spec for spec in self.partition_specs}
264265

265-
def specs_struct(self) -> StructType:
266-
"""Produce a struct of all the combined PartitionSpecs.
266+
def specs_struct(self, spec_ids: Iterable[int] | None = None) -> StructType:
267+
"""Produce a struct of the combined PartitionSpecs.
267268
268269
The partition fields should be optional: Partition fields may be added later,
269270
in which case not all files would have the result field, and it may be null.
270271
271-
:return: A StructType that represents all the combined PartitionSpecs of the table
272+
Args:
273+
spec_ids: Optional iterable of spec IDs to include. When not provided,
274+
all table specs are used.
275+
276+
:return: A StructType that represents the combined PartitionSpecs of the table
272277
"""
273278
specs = self.specs()
279+
selected_specs = specs.values() if spec_ids is None else [specs[spec_id] for spec_id in spec_ids if spec_id in specs]
274280

275281
# Collect all the fields
276-
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}
282+
struct_fields = {field.field_id: field for spec in selected_specs for field in spec.fields}
277283

278284
schema = self.schema()
279285

tests/io/test_pyarrow.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,6 +2588,34 @@ def test_inspect_partition_for_nested_field(catalog: InMemoryCatalog) -> None:
25882588
assert {part["part"] for part in partitions} == {"data-a", "data-b"}
25892589

25902590

2591+
def test_inspect_partitions_respects_partition_evolution(catalog: InMemoryCatalog) -> None:
2592+
schema = Schema(
2593+
NestedField(id=1, name="dt", field_type=DateType(), required=False),
2594+
NestedField(id=2, name="category", field_type=StringType(), required=False),
2595+
)
2596+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="dt"))
2597+
catalog.create_namespace("default")
2598+
table = catalog.create_table("default.test_partition_evolution_inspect", schema=schema, partition_spec=spec)
2599+
2600+
old_spec_id = table.spec().spec_id
2601+
old_data = [{"dt": date(2025, 1, 1), "category": "old"}]
2602+
table.append(pa.Table.from_pylist(old_data, schema=table.schema().as_arrow()))
2603+
2604+
table.update_spec().add_identity("category").commit()
2605+
new_spec_id = table.spec().spec_id
2606+
assert new_spec_id != old_spec_id
2607+
2608+
partitions_table = table.inspect.partitions()
2609+
partitions = partitions_table["partition"].to_pylist()
2610+
assert all("category" not in partition for partition in partitions)
2611+
2612+
new_data = [{"dt": date(2025, 1, 2), "category": "new"}]
2613+
table.append(pa.Table.from_pylist(new_data, schema=table.schema().as_arrow()))
2614+
2615+
partitions_table = table.inspect.partitions()
2616+
assert set(partitions_table["spec_id"].to_pylist()) == {old_spec_id, new_spec_id}
2617+
2618+
25912619
def test_identity_partition_on_multi_columns() -> None:
25922620
test_pa_schema = pa.schema([("born_year", pa.int64()), ("n_legs", pa.int64()), ("animal", pa.string())])
25932621
test_schema = Schema(

0 commit comments

Comments
 (0)