Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
from decimal import Decimal, Context

if TYPE_CHECKING:
from pyiceberg.table import FileScanTask, WriteTask
Expand All @@ -194,7 +195,7 @@
UTC_ALIASES = {"UTC", "+00:00", "Etc/UTC", "Z"}

T = TypeVar("T")

DECIMAL_REGEX = re.compile(r"decimal\((\d+),\s*(\d+)\)")

@lru_cache
def _cached_resolve_s3_region(bucket: str) -> Optional[str]:
Expand Down Expand Up @@ -1868,7 +1869,11 @@ def visit_fixed(self, fixed_type: FixedType) -> str:
return "FIXED_LEN_BYTE_ARRAY"

def visit_decimal(self, decimal_type: DecimalType) -> str:
return "FIXED_LEN_BYTE_ARRAY"
return (
"INT32" if decimal_type.precision <= 9
else "INT64" if decimal_type.precision <= 18
else "FIXED_LEN_BYTE_ARRAY"
)

def visit_boolean(self, boolean_type: BooleanType) -> str:
return "BOOLEAN"
Expand Down Expand Up @@ -2335,9 +2340,18 @@ def data_file_statistics_from_parquet_metadata(
col_aggs[field_id] = StatsAggregator(
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
)

col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)
matches=DECIMAL_REGEX.search(str(stats_col.iceberg_type))
if matches and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
precision=int(matches.group(1))
scale=int(matches.group(2))
local_context = Context(prec=precision)
decoded_min = local_context.create_decimal(Decimal(statistics.min_raw)/ (10 ** scale))
decoded_max = local_context.create_decimal(Decimal(statistics.max_raw)/ (10 ** scale))
col_aggs[field_id].update_min(decoded_min)
col_aggs[field_id].update_max(decoded_max)
else:
col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)
Comment on lines +2343 to +2354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im confused why we need to use the regex here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we isinstanceof the stats_col.iceberg_Type as a DecimalType, then we have access to the scale and precision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated above as below

if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
                        precision= stats_col.iceberg_type.precision
                        scale = stats_col.iceberg_type.scale
                        decimal_type = pa.decimal128(precision, scale)
                        col_aggs[field_id].update_min(pa.array([Decimal(statistics.min_raw)/ (10 ** scale)], decimal_type)[0].as_py())
                        col_aggs[field_id].update_max(pa.array([Decimal(statistics.max_raw)/ (10 ** scale)], decimal_type)[0].as_py())
                    else:
                        col_aggs[field_id].update_min(statistics.min)
                        col_aggs[field_id].update_max(statistics.max)


except pyarrow.lib.ArrowNotImplementedError as e:
invalidate_col.add(field_id)
Expand Down
28 changes: 22 additions & 6 deletions tests/io/test_pyarrow_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
StringType,
)
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros

from decimal import Decimal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should use pyarrow.decimal128 instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu ,
thanks for the comments
We need Decimal as to create pa.array of decimal 128 type it accepts only integer and decimal values and not float values. Pls check comment on test_pyarrow_stats.py file changes


@dataclass(frozen=True)
class TestStruct:
Expand Down Expand Up @@ -446,6 +446,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
{"id": 10, "name": "strings", "required": False, "type": "string"},
{"id": 11, "name": "uuids", "required": False, "type": "uuid"},
{"id": 12, "name": "binaries", "required": False, "type": "binary"},
{"id": 13, "name": "decimal8", "required": False, "type": "decimal(8, 2)"},
{"id": 14, "name": "decimal16", "required": False, "type": "decimal(16, 6)"},
{"id": 15, "name": "decimal32", "required": False, "type": "decimal(20, 6)"},
],
},
],
Expand All @@ -470,6 +473,9 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
strings = ["hello", "world"]
uuids = [uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes, uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes]
binaries = [b"hello", b"world"]
decimal8 = [Decimal("123.45"), Decimal("678.91")]
decimal16 = [Decimal("123456789.123456"), Decimal("678912345.678912")]
decimal32 = [Decimal("12345678901234.123456"), Decimal("98765432109870.654321")]
Comment on lines +476 to +478
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should use the proper precision for decimal128

Copy link
Contributor Author

@redpheonixx redpheonixx Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decimal8 = pa.array([Decimal('123.45'), Decimal('678.91')], pa.decimal128(8, 2))

@kevinjqliu pls check if we can go with this as this have now proper precision and scale
pls note here that this will accept only int and Decimal values and not float hence we require the above import Decimal
Do let me know if I am incorrect here or there is any other workaround.


table = pa.Table.from_pydict(
{
Expand All @@ -485,14 +491,17 @@ def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[Table
"strings": strings,
"uuids": uuids,
"binaries": binaries,
"decimal8": decimal8,
"decimal16": decimal16,
"decimal32": decimal32,
},
schema=arrow_schema,
)

metadata_collector: List[Any] = []

with pa.BufferOutputStream() as f:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need store_decimal_as_integer=True?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried without this parameter but then the physical type for decimal(8, 2) will be FIXED_LEN_BYTE_ARRAY. There is a check for this as below

if expected_physical_type != physical_type_string:
            # Allow promotable physical types
            # INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
            if (physical_type_string == "INT32" and expected_physical_type == "INT64") or (
                physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
            ):
                pass
            else:
                raise ValueError(
                    f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
                )

writer.write_table(table)

return metadata_collector[0], table_metadata
Expand All @@ -510,13 +519,13 @@ def test_metrics_primitive_types() -> None:
)
datafile = DataFile(**statistics.to_serialized_dict())

assert len(datafile.value_counts) == 12
assert len(datafile.null_value_counts) == 12
assert len(datafile.value_counts) == 15
assert len(datafile.null_value_counts) == 15
assert len(datafile.nan_value_counts) == 0

tz = timezone(timedelta(seconds=19800))

assert len(datafile.lower_bounds) == 12
assert len(datafile.lower_bounds) == 15
assert datafile.lower_bounds[1] == STRUCT_BOOL.pack(False)
assert datafile.lower_bounds[2] == STRUCT_INT32.pack(23)
assert datafile.lower_bounds[3] == STRUCT_INT64.pack(2)
Expand All @@ -529,8 +538,12 @@ def test_metrics_primitive_types() -> None:
assert datafile.lower_bounds[10] == b"he"
assert datafile.lower_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes
assert datafile.lower_bounds[12] == b"he"
assert int.from_bytes(datafile.lower_bounds[13], byteorder="big", signed=True) == int(12345)
assert int.from_bytes(datafile.lower_bounds[14], byteorder="big", signed=True) == int(123456789123456)
assert int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True) == int(12345678901234123456)
Comment on lines +541 to +543
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should test the physical type, such as STRUCT_INT32, STRUCT_INT64, and b"he" (FIXED_LEN_BYTE_ARRAY)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I am updating as below
assert datafile.upper_bounds[13][::-1].ljust(4, b'\x00')== STRUCT_INT32.pack(67891)
please let me know if this will work
[::-1].ljust(4, b'\x00') to convert it into 4 byte little endian



assert len(datafile.upper_bounds) == 12
assert len(datafile.upper_bounds) == 15
assert datafile.upper_bounds[1] == STRUCT_BOOL.pack(True)
assert datafile.upper_bounds[2] == STRUCT_INT32.pack(89)
assert datafile.upper_bounds[3] == STRUCT_INT64.pack(54)
Expand All @@ -543,6 +556,9 @@ def test_metrics_primitive_types() -> None:
assert datafile.upper_bounds[10] == b"wp"
assert datafile.upper_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes
assert datafile.upper_bounds[12] == b"wp"
assert int.from_bytes(datafile.upper_bounds[13], byteorder="big", signed=True) == int(67891)
assert int.from_bytes(datafile.upper_bounds[14], byteorder="big", signed=True) == int(678912345678912)
assert int.from_bytes(datafile.upper_bounds[15], byteorder="big", signed=True) == int(98765432109870654321)


def construct_test_table_invalid_upper_bound() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
Expand Down
Loading