Skip to content

Conversation

@redpheonixx
Copy link
Contributor

This pull request addresses the handling of decimal physical type matching in Parquet. It implements rules such that:
For precision ≤ 9, values are stored as int32.
For precision ≤ 18, values are stored as int64.
For higher precision, values are stored as a FIXED_LEN_BYTE_ARRAY.

This pull request is associated with issue #1789

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I added some comments

)
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros

from decimal import Decimal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should use pyarrow.decimal128 instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu ,
thanks for the comments
We need Decimal as to create pa.array of decimal 128 type it accepts only integer and decimal values and not float values. Pls check comment on test_pyarrow_stats.py file changes

Comment on lines +476 to +478
decimal8 = [Decimal("123.45"), Decimal("678.91")]
decimal16 = [Decimal("123456789.123456"), Decimal("678912345.678912")]
decimal32 = [Decimal("12345678901234.123456"), Decimal("98765432109870.654321")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should use the proper precision for decimal128

Copy link
Contributor Author

@redpheonixx redpheonixx Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decimal8 = pa.array([Decimal('123.45'), Decimal('678.91')], pa.decimal128(8, 2))

@kevinjqliu pls check if we can go with this as this have now proper precision and scale
pls note here that this will accept only int and Decimal values and not float hence we require the above import Decimal
Do let me know if I am incorrect here or there is any other workaround.


with pa.BufferOutputStream() as f:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need store_decimal_as_integer=True?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried without this parameter but then the physical type for decimal(8, 2) will be FIXED_LEN_BYTE_ARRAY. There is a check for this as below

if expected_physical_type != physical_type_string:
            # Allow promotable physical types
            # INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
            if (physical_type_string == "INT32" and expected_physical_type == "INT64") or (
                physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
            ):
                pass
            else:
                raise ValueError(
                    f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
                )

Comment on lines +541 to +543
assert int.from_bytes(datafile.lower_bounds[13], byteorder="big", signed=True) == int(12345)
assert int.from_bytes(datafile.lower_bounds[14], byteorder="big", signed=True) == int(123456789123456)
assert int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True) == int(12345678901234123456)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should test the physical type, such as STRUCT_INT32, STRUCT_INT64, and b"he" (FIXED_LEN_BYTE_ARRAY)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I am updating as below
assert datafile.upper_bounds[13][::-1].ljust(4, b'\x00')== STRUCT_INT32.pack(67891)
please let me know if this will work
[::-1].ljust(4, b'\x00') to convert it into 4 byte little endian

Comment on lines +2343 to +2354
matches=DECIMAL_REGEX.search(str(stats_col.iceberg_type))
if matches and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
precision=int(matches.group(1))
scale=int(matches.group(2))
local_context = Context(prec=precision)
decoded_min = local_context.create_decimal(Decimal(statistics.min_raw)/ (10 ** scale))
decoded_max = local_context.create_decimal(Decimal(statistics.max_raw)/ (10 ** scale))
col_aggs[field_id].update_min(decoded_min)
col_aggs[field_id].update_max(decoded_max)
else:
col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im confused why we need to use the regex here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we isinstanceof the stats_col.iceberg_Type as a DecimalType, then we have access to the scale and precision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated above as below

if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
                        precision= stats_col.iceberg_type.precision
                        scale = stats_col.iceberg_type.scale
                        decimal_type = pa.decimal128(precision, scale)
                        col_aggs[field_id].update_min(pa.array([Decimal(statistics.min_raw)/ (10 ** scale)], decimal_type)[0].as_py())
                        col_aggs[field_id].update_max(pa.array([Decimal(statistics.max_raw)/ (10 ** scale)], decimal_type)[0].as_py())
                    else:
                        col_aggs[field_id].update_min(statistics.min)
                        col_aggs[field_id].update_max(statistics.max)

@redpheonixx redpheonixx deleted the decimal_physical_type_mapping branch March 24, 2025 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants