Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1982,11 +1982,13 @@ class StatsAggregator:
current_min: Any
current_max: Any
trunc_length: Optional[int]
column_name: Optional[str]

def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None:
def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None, column_name: Optional[str] = None) -> None:
self.current_min = None
self.current_max = None
self.trunc_length = trunc_length
self.column_name = column_name

expected_physical_type = _primitive_to_physical(iceberg_type)
if expected_physical_type != physical_type_string:
Expand All @@ -1998,7 +2000,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc
pass
else:
raise ValueError(
f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
f"Unexpected physical type {physical_type_string} for {self.column_name or '<unknown column>'} with iceberg type {iceberg_type}, expected {expected_physical_type}"
)

self.primitive_type = iceberg_type
Expand Down Expand Up @@ -2405,7 +2407,7 @@ def data_file_statistics_from_parquet_metadata(

if field_id not in col_aggs:
col_aggs[field_id] = StatsAggregator(
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length, stats_col.column_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that StatsAggregator is only used here. WDYT of this approach?

try:
    col_aggs[field_id] = StatsAggregator(
        stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
    )
except ValueError as e:
    raise ValueError(f"{e} for column '{stats_col.column_name}'") from e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I’ll update the PR to follow this approach. Thanks!

)

if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
Expand Down
4 changes: 2 additions & 2 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,7 @@ def test_make_compatible_name() -> None:
],
)
def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None:
stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type))
stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col")

for val in vals:
stats.update_min(val)
Expand All @@ -2074,7 +2074,7 @@ def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveT
],
)
def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None:
stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type))
stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col")

for val in vals:
stats.update_max(val)
Expand Down
Loading