Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ def partitions(
]
)

partition_record = self.tbl.metadata.specs_struct()
snapshot = self._get_snapshot(snapshot_id)
spec_ids = {manifest.partition_spec_id for manifest in snapshot.manifests(self.tbl.io)}
partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
has_partitions = len(partition_record.fields) > 0

if has_partitions:
Expand All @@ -299,8 +301,6 @@ def partitions(

table_schema = pa.unify_schemas([partitions_schema, table_schema])

snapshot = self._get_snapshot(snapshot_id)

scan = DataScan(
table_metadata=self.tbl.metadata,
io=self.tbl.io,
Expand Down
14 changes: 10 additions & 4 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import uuid
from collections.abc import Iterable
from copy import copy
from typing import Annotated, Any, Literal

Expand Down Expand Up @@ -262,18 +263,23 @@ def specs(self) -> dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}

def specs_struct(self) -> StructType:
"""Produce a struct of all the combined PartitionSpecs.
def specs_struct(self, spec_ids: Iterable[int] | None = None) -> StructType:
"""Produce a struct of the combined PartitionSpecs.

The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.

:return: A StructType that represents all the combined PartitionSpecs of the table
Args:
spec_ids: Optional iterable of spec IDs to include. When not provided,
all table specs are used.

:return: A StructType that represents the combined PartitionSpecs of the table
"""
specs = self.specs()
selected_specs = specs.values() if spec_ids is None else [specs[spec_id] for spec_id in spec_ids if spec_id in specs]

# Collect all the fields
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}
struct_fields = {field.field_id: field for spec in selected_specs for field in spec.fields}

schema = self.schema()

Expand Down
12 changes: 11 additions & 1 deletion tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import math
from datetime import date, datetime
from typing import Any

import pyarrow as pa
import pytest
Expand Down Expand Up @@ -208,9 +209,18 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
def _check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None:
lhs = df.to_pandas().sort_values("last_updated_at")
rhs = spark_df.toPandas().sort_values("last_updated_at")

def _normalize_partition(d: dict[str, Any]) -> dict[str, Any]:
return {k: v for k, v in d.items() if v is not None}

for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list(), strict=True):
assert left == right, f"Difference in column {column}: {left} != {right}"
if column == "partition":
assert _normalize_partition(left) == _normalize_partition(right), (
f"Difference in column {column}: {left} != {right}"
)
else:
assert left == right, f"Difference in column {column}: {left} != {right}"


@pytest.mark.integration
Expand Down
28 changes: 28 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,34 @@ def test_inspect_partition_for_nested_field(catalog: InMemoryCatalog) -> None:
assert {part["part"] for part in partitions} == {"data-a", "data-b"}


def test_inspect_partitions_respects_partition_evolution(catalog: InMemoryCatalog) -> None:
schema = Schema(
NestedField(id=1, name="dt", field_type=DateType(), required=False),
NestedField(id=2, name="category", field_type=StringType(), required=False),
)
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="dt"))
catalog.create_namespace("default")
table = catalog.create_table("default.test_partition_evolution_inspect", schema=schema, partition_spec=spec)

old_spec_id = table.spec().spec_id
old_data = [{"dt": date(2025, 1, 1), "category": "old"}]
table.append(pa.Table.from_pylist(old_data, schema=table.schema().as_arrow()))

table.update_spec().add_identity("category").commit()
new_spec_id = table.spec().spec_id
assert new_spec_id != old_spec_id

partitions_table = table.inspect.partitions()
partitions = partitions_table["partition"].to_pylist()
assert all("category" not in partition for partition in partitions)

new_data = [{"dt": date(2025, 1, 2), "category": "new"}]
table.append(pa.Table.from_pylist(new_data, schema=table.schema().as_arrow()))

partitions_table = table.inspect.partitions()
assert set(partitions_table["spec_id"].to_pylist()) == {old_spec_id, new_spec_id}


def test_identity_partition_on_multi_columns() -> None:
test_pa_schema = pa.schema([("born_year", pa.int64()), ("n_legs", pa.int64()), ("animal", pa.string())])
test_schema = Schema(
Expand Down