From 895dff5ebfa81f5f9993e228e866d64d4b5fe37c Mon Sep 17 00:00:00 2001 From: redpheonixx Date: Mon, 24 Mar 2025 22:24:15 +0530 Subject: [PATCH 1/3] changes_as_per_pr1799_comments --- pyiceberg/io/pyarrow.py | 18 +++++++++++++++--- tests/io/test_pyarrow_stats.py | 27 +++++++++++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 88be6abac7..58c90ba499 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -177,6 +177,7 @@ from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string +from decimal import Decimal if TYPE_CHECKING: from pyiceberg.table import FileScanTask, WriteTask @@ -1876,7 +1877,11 @@ def visit_fixed(self, fixed_type: FixedType) -> str: return "FIXED_LEN_BYTE_ARRAY" def visit_decimal(self, decimal_type: DecimalType) -> str: - return "FIXED_LEN_BYTE_ARRAY" + return ( + "INT32" if decimal_type.precision <= 9 + else "INT64" if decimal_type.precision <= 18 + else "FIXED_LEN_BYTE_ARRAY" + ) def visit_boolean(self, boolean_type: BooleanType) -> str: return "BOOLEAN" @@ -2350,8 +2355,15 @@ def data_file_statistics_from_parquet_metadata( stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length ) - col_aggs[field_id].update_min(statistics.min) - col_aggs[field_id].update_max(statistics.max) + if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": + precision= stats_col.iceberg_type.precision + scale = stats_col.iceberg_type.scale + decimal_type = pa.decimal128(precision, scale) + col_aggs[field_id].update_min(pa.array([Decimal(statistics.min_raw)/ (10 ** scale)], decimal_type)[0].as_py()) + col_aggs[field_id].update_max(pa.array([Decimal(statistics.max_raw)/ (10 ** scale)], decimal_type)[0].as_py()) + else: + col_aggs[field_id].update_min(statistics.min) + col_aggs[field_id].update_max(statistics.max) except pyarrow.lib.ArrowNotImplementedError as e: invalidate_col.add(field_id) diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index 788891711e..2d8593a186 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -72,7 +72,7 @@ StringType, ) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros - +from decimal import Decimal @dataclass(frozen=True) class TestStruct: @@ -446,6 +446,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table {"id": 10, "name": "strings", "required": False, "type": "string"}, {"id": 11, "name": "uuids", "required": False, "type": "uuid"}, {"id": 12, "name": "binaries", "required": False, "type": "binary"}, + {"id": 13, "name": "decimal8", "required": False, "type": "decimal(5, 2)"}, + {"id": 14, "name": "decimal16", "required": False, "type": "decimal(16, 6)"}, + {"id": 15, "name": "decimal32", "required": False, "type": "decimal(19, 6)"}, ], }, ], @@ -470,6 +473,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table strings = ["hello", "world"] uuids = [uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes, uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes] binaries = [b"hello", b"world"] + decimal8 = pa.array([Decimal('123.45'), Decimal('678.91')], pa.decimal128(8, 2)) + decimal16 = pa.array([Decimal("12345679.123456"), Decimal("67891234.678912")], pa.decimal128(16, 6)) + decimal32 = pa.array([Decimal("1234567890123.123456"), Decimal("9876543210703.654321")], pa.decimal128(19, 6)) table = pa.Table.from_pydict( { @@ -485,6 +491,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table "strings": strings, "uuids": uuids, "binaries": binaries, + "decimal8": decimal8, + "decimal16": decimal16, + "decimal32": decimal32, }, schema=arrow_schema, ) @@ -492,7 +501,7 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table metadata_collector: List[Any] = [] with pa.BufferOutputStream() as f: - with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer: + with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer: writer.write_table(table) return metadata_collector[0], table_metadata @@ -510,13 +519,13 @@ def test_metrics_primitive_types() -> None: ) datafile = DataFile(**statistics.to_serialized_dict()) - assert len(datafile.value_counts) == 12 - assert len(datafile.null_value_counts) == 12 + assert len(datafile.value_counts) == 15 + assert len(datafile.null_value_counts) == 15 assert len(datafile.nan_value_counts) == 0 tz = timezone(timedelta(seconds=19800)) - assert len(datafile.lower_bounds) == 12 + assert len(datafile.lower_bounds) == 15 assert datafile.lower_bounds[1] == STRUCT_BOOL.pack(False) assert datafile.lower_bounds[2] == STRUCT_INT32.pack(23) assert datafile.lower_bounds[3] == STRUCT_INT64.pack(2) @@ -529,8 +538,11 @@ def test_metrics_primitive_types() -> None: assert datafile.lower_bounds[10] == b"he" assert datafile.lower_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes assert datafile.lower_bounds[12] == b"he" + assert datafile.lower_bounds[13][::-1].ljust(4, b'\x00') == STRUCT_INT32.pack(12345) + assert datafile.lower_bounds[14][::-1].ljust(8, b'\x00') == STRUCT_INT64.pack(12345679123456) + assert str(int.from_bytes(datafile.lower_bounds[15], byteorder='big', signed=True)).encode('utf-8')== b"1234567890123123456" - assert len(datafile.upper_bounds) == 12 + assert len(datafile.upper_bounds) == 15 assert datafile.upper_bounds[1] == STRUCT_BOOL.pack(True) assert datafile.upper_bounds[2] == STRUCT_INT32.pack(89) assert datafile.upper_bounds[3] == STRUCT_INT64.pack(54) @@ -543,6 +555,9 @@ def test_metrics_primitive_types() -> None: assert datafile.upper_bounds[10] == b"wp" assert datafile.upper_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes assert datafile.upper_bounds[12] == b"wp" + assert datafile.upper_bounds[13][::-1].ljust(4, b'\x00')== STRUCT_INT32.pack(67891) + assert datafile.upper_bounds[14][::-1].ljust(8, b'\x00')== STRUCT_INT64.pack(67891234678912) + assert str(int.from_bytes(datafile.upper_bounds[15], byteorder='big', signed=True)).encode('utf-8')== b"9876543210703654321" def construct_test_table_invalid_upper_bound() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]: From 7dca26a3013a6c7342af05a695bfcc7b073b3ecd Mon Sep 17 00:00:00 2001 From: redpheonixx Date: Tue, 25 Mar 2025 17:22:06 +0530 Subject: [PATCH 2/3] trim whitespace --- pyiceberg/io/pyarrow.py | 18 +++++++++--------- tests/io/test_pyarrow_stats.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 58c90ba499..354f3ce54e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -39,6 +39,7 @@ from concurrent.futures import Future from copy import copy from dataclasses import dataclass +from decimal import Decimal from enum import Enum from functools import lru_cache, singledispatch from typing import ( @@ -177,7 +178,6 @@ from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string -from decimal import Decimal if TYPE_CHECKING: from pyiceberg.table import FileScanTask, WriteTask @@ -1877,11 +1877,7 @@ def visit_fixed(self, fixed_type: FixedType) -> str: return "FIXED_LEN_BYTE_ARRAY" def visit_decimal(self, decimal_type: DecimalType) -> str: - return ( - "INT32" if decimal_type.precision <= 9 - else "INT64" if decimal_type.precision <= 18 - else "FIXED_LEN_BYTE_ARRAY" - ) + return "INT32" if decimal_type.precision <= 9 else "INT64" if decimal_type.precision <= 18 else "FIXED_LEN_BYTE_ARRAY" def visit_boolean(self, boolean_type: BooleanType) -> str: return "BOOLEAN" @@ -2356,11 +2352,15 @@ def data_file_statistics_from_parquet_metadata( ) if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": - precision= stats_col.iceberg_type.precision + precision = stats_col.iceberg_type.precision scale = stats_col.iceberg_type.scale decimal_type = pa.decimal128(precision, scale) - col_aggs[field_id].update_min(pa.array([Decimal(statistics.min_raw)/ (10 ** scale)], decimal_type)[0].as_py()) - col_aggs[field_id].update_max(pa.array([Decimal(statistics.max_raw)/ (10 ** scale)], decimal_type)[0].as_py()) + col_aggs[field_id].update_min( + pa.array([Decimal(statistics.min_raw) / (10**scale)], decimal_type)[0].as_py() + ) + col_aggs[field_id].update_max( + pa.array([Decimal(statistics.max_raw) / (10**scale)], decimal_type)[0].as_py() + ) else: col_aggs[field_id].update_min(statistics.min) col_aggs[field_id].update_max(statistics.max) diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index 2d8593a186..09c9eb4a67 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -27,6 +27,7 @@ timedelta, timezone, ) +from decimal import Decimal from typing import ( Any, Dict, @@ -72,7 +73,7 @@ StringType, ) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros -from decimal import Decimal + @dataclass(frozen=True) class TestStruct: @@ -473,7 +474,7 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table strings = ["hello", "world"] uuids = [uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes, uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes] binaries = [b"hello", b"world"] - decimal8 = pa.array([Decimal('123.45'), Decimal('678.91')], pa.decimal128(8, 2)) + decimal8 = pa.array([Decimal("123.45"), Decimal("678.91")], pa.decimal128(8, 2)) decimal16 = pa.array([Decimal("12345679.123456"), Decimal("67891234.678912")], pa.decimal128(16, 6)) decimal32 = pa.array([Decimal("1234567890123.123456"), Decimal("9876543210703.654321")], pa.decimal128(19, 6)) @@ -538,9 +539,9 @@ def test_metrics_primitive_types() -> None: assert datafile.lower_bounds[10] == b"he" assert datafile.lower_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes assert datafile.lower_bounds[12] == b"he" - assert datafile.lower_bounds[13][::-1].ljust(4, b'\x00') == STRUCT_INT32.pack(12345) - assert datafile.lower_bounds[14][::-1].ljust(8, b'\x00') == STRUCT_INT64.pack(12345679123456) - assert str(int.from_bytes(datafile.lower_bounds[15], byteorder='big', signed=True)).encode('utf-8')== b"1234567890123123456" + assert datafile.lower_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(12345) + assert datafile.lower_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(12345679123456) + assert str(int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"1234567890123123456" assert len(datafile.upper_bounds) == 15 assert datafile.upper_bounds[1] == STRUCT_BOOL.pack(True) @@ -555,9 +556,9 @@ def test_metrics_primitive_types() -> None: assert datafile.upper_bounds[10] == b"wp" assert datafile.upper_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes assert datafile.upper_bounds[12] == b"wp" - assert datafile.upper_bounds[13][::-1].ljust(4, b'\x00')== STRUCT_INT32.pack(67891) - assert datafile.upper_bounds[14][::-1].ljust(8, b'\x00')== STRUCT_INT64.pack(67891234678912) - assert str(int.from_bytes(datafile.upper_bounds[15], byteorder='big', signed=True)).encode('utf-8')== b"9876543210703654321" + assert datafile.upper_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(67891) + assert datafile.upper_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(67891234678912) + assert str(int.from_bytes(datafile.upper_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"9876543210703654321" def construct_test_table_invalid_upper_bound() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]: From 4d73212b178e7e1dab564c8accda43c09434e98d Mon Sep 17 00:00:00 2001 From: redpheonixx Date: Mon, 31 Mar 2025 18:41:42 +0530 Subject: [PATCH 3/3] using unscaled_to_decimal from pyiceberg.utils.decimal --- pyiceberg/io/pyarrow.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 74e3b18613..5c70636e64 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -39,7 +39,6 @@ from concurrent.futures import Future from copy import copy from dataclasses import dataclass -from decimal import Decimal from enum import Enum from functools import lru_cache, singledispatch from typing import ( @@ -176,6 +175,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton @@ -2364,15 +2364,9 @@ def data_file_statistics_from_parquet_metadata( ) if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": - precision = stats_col.iceberg_type.precision scale = stats_col.iceberg_type.scale - decimal_type = pa.decimal128(precision, scale) - col_aggs[field_id].update_min( - pa.array([Decimal(statistics.min_raw) / (10**scale)], decimal_type)[0].as_py() - ) - col_aggs[field_id].update_max( - pa.array([Decimal(statistics.max_raw) / (10**scale)], decimal_type)[0].as_py() - ) + col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale)) + col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale)) else: col_aggs[field_id].update_min(statistics.min) col_aggs[field_id].update_max(statistics.max)