-
Notifications
You must be signed in to change notification settings - Fork 414
handle decimal physicial type mapping #1799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I added some comments
| ) | ||
| from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros | ||
|
|
||
| from decimal import Decimal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should use pyarrow.decimal128 instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kevinjqliu ,
thanks for the comments
We need Decimal as to create pa.array of decimal 128 type it accepts only integer and decimal values and not float values. Pls check comment on test_pyarrow_stats.py file changes
| decimal8 = [Decimal("123.45"), Decimal("678.91")] | ||
| decimal16 = [Decimal("123456789.123456"), Decimal("678912345.678912")] | ||
| decimal32 = [Decimal("12345678901234.123456"), Decimal("98765432109870.654321")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should use the proper precision for decimal128
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decimal8 = pa.array([Decimal('123.45'), Decimal('678.91')], pa.decimal128(8, 2))
@kevinjqliu pls check if we can go with this as this have now proper precision and scale
pls note here that this will accept only int and Decimal values and not float hence we require the above import Decimal
Do let me know if I am incorrect here or there is any other workaround.
|
|
||
| with pa.BufferOutputStream() as f: | ||
| with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer: | ||
| with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need store_decimal_as_integer=True?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried without this parameter but then the physical type for decimal(8, 2) will be FIXED_LEN_BYTE_ARRAY. There is a check for this as below
if expected_physical_type != physical_type_string:
# Allow promotable physical types
# INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
if (physical_type_string == "INT32" and expected_physical_type == "INT64") or (
physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
):
pass
else:
raise ValueError(
f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
)
| assert int.from_bytes(datafile.lower_bounds[13], byteorder="big", signed=True) == int(12345) | ||
| assert int.from_bytes(datafile.lower_bounds[14], byteorder="big", signed=True) == int(123456789123456) | ||
| assert int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True) == int(12345678901234123456) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should test the physical type, such as STRUCT_INT32, STRUCT_INT64, and b"he" (FIXED_LEN_BYTE_ARRAY)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I am updating as below
assert datafile.upper_bounds[13][::-1].ljust(4, b'\x00')== STRUCT_INT32.pack(67891)
please let me know if this will work
[::-1].ljust(4, b'\x00') to convert it into 4 byte little endian
| matches=DECIMAL_REGEX.search(str(stats_col.iceberg_type)) | ||
| if matches and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": | ||
| precision=int(matches.group(1)) | ||
| scale=int(matches.group(2)) | ||
| local_context = Context(prec=precision) | ||
| decoded_min = local_context.create_decimal(Decimal(statistics.min_raw)/ (10 ** scale)) | ||
| decoded_max = local_context.create_decimal(Decimal(statistics.max_raw)/ (10 ** scale)) | ||
| col_aggs[field_id].update_min(decoded_min) | ||
| col_aggs[field_id].update_max(decoded_max) | ||
| else: | ||
| col_aggs[field_id].update_min(statistics.min) | ||
| col_aggs[field_id].update_max(statistics.max) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im confused why we need to use the regex here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we isinstanceof the stats_col.iceberg_Type as a DecimalType, then we have access to the scale and precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated above as below
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
precision= stats_col.iceberg_type.precision
scale = stats_col.iceberg_type.scale
decimal_type = pa.decimal128(precision, scale)
col_aggs[field_id].update_min(pa.array([Decimal(statistics.min_raw)/ (10 ** scale)], decimal_type)[0].as_py())
col_aggs[field_id].update_max(pa.array([Decimal(statistics.max_raw)/ (10 ** scale)], decimal_type)[0].as_py())
else:
col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)
This pull request addresses the handling of decimal physical type matching in Parquet. It implements rules such that:
For precision ≤ 9, values are stored as
int32.For precision ≤ 18, values are stored as
int64.For higher precision, values are stored as a
FIXED_LEN_BYTE_ARRAY.This pull request is associated with issue #1789