From 02bf9fbac9fb5dbebbc0763b1ee86d1163244b0c Mon Sep 17 00:00:00 2001 From: Peng-Jui Wang Date: Tue, 8 Jul 2025 23:35:47 -0700 Subject: [PATCH 1/4] add column name to the error message in StatsAggregator --- pyiceberg/io/pyarrow.py | 8 +++++--- tests/io/test_pyarrow.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3e49885e58..4b979a0265 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1982,11 +1982,13 @@ class StatsAggregator: current_min: Any current_max: Any trunc_length: Optional[int] + column_name: Optional[str] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None, column_name: Optional[str] = None) -> None: self.current_min = None self.current_max = None self.trunc_length = trunc_length + self.column_name = column_name expected_physical_type = _primitive_to_physical(iceberg_type) if expected_physical_type != physical_type_string: @@ -1998,7 +2000,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc pass else: raise ValueError( - f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}" + f"Unexpected physical type {physical_type_string} for {self.column_name or ''} with iceberg type {iceberg_type}, expected {expected_physical_type}" ) self.primitive_type = iceberg_type @@ -2405,7 +2407,7 @@ def data_file_statistics_from_parquet_metadata( if field_id not in col_aggs: col_aggs[field_id] = StatsAggregator( - stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length + stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length, stats_col.column_name ) if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 92494455af..6f69488b9f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2054,7 +2054,7 @@ def test_make_compatible_name() -> None: ], ) def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: - stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) + stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col") for val in vals: stats.update_min(val) @@ -2074,7 +2074,7 @@ def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveT ], ) def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: - stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) + stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col") for val in vals: stats.update_max(val) From fc1f6dd0d578468b92e6da6de03bf2cab385b8fc Mon Sep 17 00:00:00 2001 From: Peng-Jui Wang Date: Thu, 10 Jul 2025 00:38:59 -0700 Subject: [PATCH 2/4] refactor StatsAggregator to require column_name as a non-optional parameter --- pyiceberg/io/pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4b979a0265..fb3ef30bf3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1981,14 +1981,14 @@ def visit_unknown(self, unknown_type: UnknownType) -> str: class StatsAggregator: current_min: Any current_max: Any + column_name: str trunc_length: Optional[int] - column_name: Optional[str] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None, column_name: Optional[str] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, column_name: str, trunc_length: Optional[int] = None) -> None: self.current_min = None self.current_max = None - self.trunc_length = trunc_length self.column_name = column_name + self.trunc_length = trunc_length expected_physical_type = _primitive_to_physical(iceberg_type) if expected_physical_type != physical_type_string: @@ -2000,7 +2000,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc pass else: raise ValueError( - f"Unexpected physical type {physical_type_string} for {self.column_name or ''} with iceberg type {iceberg_type}, expected {expected_physical_type}" + f"Unexpected physical type {physical_type_string} for {self.column_name} with iceberg type {iceberg_type}, expected {expected_physical_type}" ) self.primitive_type = iceberg_type @@ -2407,7 +2407,7 @@ def data_file_statistics_from_parquet_metadata( if field_id not in col_aggs: col_aggs[field_id] = StatsAggregator( - stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length, stats_col.column_name + stats_col.iceberg_type, statistics.physical_type, stats_col.column_name, stats_col.mode.length ) if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": From a7828f07353eb2c206ad2e3d851eab47b4e69204 Mon Sep 17 00:00:00 2001 From: Peng-Jui Wang Date: Fri, 18 Jul 2025 18:36:49 -0700 Subject: [PATCH 3/4] refactor StatsAggregator to make column_name an optional parameter --- pyiceberg/io/pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index fb3ef30bf3..4b979a0265 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1981,14 +1981,14 @@ def visit_unknown(self, unknown_type: UnknownType) -> str: class StatsAggregator: current_min: Any current_max: Any - column_name: str trunc_length: Optional[int] + column_name: Optional[str] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, column_name: str, trunc_length: Optional[int] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None, column_name: Optional[str] = None) -> None: self.current_min = None self.current_max = None - self.column_name = column_name self.trunc_length = trunc_length + self.column_name = column_name expected_physical_type = _primitive_to_physical(iceberg_type) if expected_physical_type != physical_type_string: @@ -2000,7 +2000,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, colum pass else: raise ValueError( - f"Unexpected physical type {physical_type_string} for {self.column_name} with iceberg type {iceberg_type}, expected {expected_physical_type}" + f"Unexpected physical type {physical_type_string} for {self.column_name or ''} with iceberg type {iceberg_type}, expected {expected_physical_type}" ) self.primitive_type = iceberg_type @@ -2407,7 +2407,7 @@ def data_file_statistics_from_parquet_metadata( if field_id not in col_aggs: col_aggs[field_id] = StatsAggregator( - stats_col.iceberg_type, statistics.physical_type, stats_col.column_name, stats_col.mode.length + stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length, stats_col.column_name ) if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": From 0500478fd4ec39ba0758169cadfcbb9434d0561b Mon Sep 17 00:00:00 2001 From: Peng-Jui Wang Date: Sun, 20 Jul 2025 15:55:54 -0700 Subject: [PATCH 4/4] update error handling for StatsAggregator --- pyiceberg/io/pyarrow.py | 15 ++++++++------- tests/io/test_pyarrow.py | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4b979a0265..e93a514db6 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1982,13 +1982,11 @@ class StatsAggregator: current_min: Any current_max: Any trunc_length: Optional[int] - column_name: Optional[str] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None, column_name: Optional[str] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None: self.current_min = None self.current_max = None self.trunc_length = trunc_length - self.column_name = column_name expected_physical_type = _primitive_to_physical(iceberg_type) if expected_physical_type != physical_type_string: @@ -2000,7 +1998,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc pass else: raise ValueError( - f"Unexpected physical type {physical_type_string} for {self.column_name or ''} with iceberg type {iceberg_type}, expected {expected_physical_type}" + f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}" ) self.primitive_type = iceberg_type @@ -2406,9 +2404,12 @@ def data_file_statistics_from_parquet_metadata( continue if field_id not in col_aggs: - col_aggs[field_id] = StatsAggregator( - stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length, stats_col.column_name - ) + try: + col_aggs[field_id] = StatsAggregator( + stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length + ) + except ValueError as e: + raise ValueError(f"{e} for column '{stats_col.column_name}'") from e if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": scale = stats_col.iceberg_type.scale diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 6f69488b9f..92494455af 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2054,7 +2054,7 @@ def test_make_compatible_name() -> None: ], ) def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: - stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col") + stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) for val in vals: stats.update_min(val) @@ -2074,7 +2074,7 @@ def test_stats_aggregator_update_min(vals: List[Any], primitive_type: PrimitiveT ], ) def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveType, expected_result: Any) -> None: - stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type), column_name="test_col") + stats = StatsAggregator(primitive_type, _primitive_to_physical(primitive_type)) for val in vals: stats.update_max(val)