Skip to content

Commit 562de40

Browse files
committed
Fix add_files with non-identity transforms (#1925)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Found out I broke this myself after doing a `git bisect`: ``` 36d383d is the first bad commit commit 36d383d Author: Fokko Driesprong <fokko@apache.org> Date: Thu Jan 23 07:50:54 2025 +0100 PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555) Second attempt of #1539 This was already being discussed back here: #208 (comment) This PR changes from doing a sort, and then a single pass over the table to the approach where we determine the unique partition tuples filter on them individually. Fixes #1491 Because the sort caused buffers to be joined where it would overflow in Arrow. I think this is an issue on the Arrow side, and it should automatically break up into smaller buffers. The `combine_chunks` method does this correctly. Now: ``` 0.42877754200890195 Run 1 took: 0.2507691659993725 Run 2 took: 0.24833179199777078 Run 3 took: 0.24401691700040828 Run 4 took: 0.2419595829996979 Average runtime of 0.28 seconds ``` Before: ``` Run 0 took: 1.0768639159941813 Run 1 took: 0.8784021250030492 Run 2 took: 0.8486490420036716 Run 3 took: 0.8614017910003895 Run 4 took: 0.8497851670108503 Average runtime of 0.9 seconds ``` So it comes with a nice speedup as well :) --------- Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com> pyiceberg/io/pyarrow.py | 129 ++- pyiceberg/partitioning.py | 39 +- pyiceberg/table/__init__.py | 6 +- pyproject.toml | 1 + tests/benchmark/test_benchmark.py | 72 ++ tests/integration/test_partitioning_key.py | 1299 ++++++++++++++-------------- tests/table/test_locations.py | 2 +- 7 files changed, 805 insertions(+), 743 deletions(-) create mode 100644 tests/benchmark/test_benchmark.py ``` Closes #1917 <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent ddd7225 commit 562de40

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2205,29 +2205,36 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
22052205
if partition_field.source_id not in self.column_aggregates:
22062206
return None
22072207

2208-
if not partition_field.transform.preserves_order:
2208+
source_field = schema.find_field(partition_field.source_id)
2209+
iceberg_transform = partition_field.transform
2210+
2211+
if not iceberg_transform.preserves_order:
22092212
raise ValueError(
22102213
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
22112214
)
22122215

2213-
lower_value = partition_record_value(
2214-
partition_field=partition_field,
2215-
value=self.column_aggregates[partition_field.source_id].current_min,
2216-
schema=schema,
2216+
transform_func = iceberg_transform.transform(source_field.field_type)
2217+
2218+
lower_value = transform_func(
2219+
partition_record_value(
2220+
partition_field=partition_field,
2221+
value=self.column_aggregates[partition_field.source_id].current_min,
2222+
schema=schema,
2223+
)
22172224
)
2218-
upper_value = partition_record_value(
2219-
partition_field=partition_field,
2220-
value=self.column_aggregates[partition_field.source_id].current_max,
2221-
schema=schema,
2225+
upper_value = transform_func(
2226+
partition_record_value(
2227+
partition_field=partition_field,
2228+
value=self.column_aggregates[partition_field.source_id].current_max,
2229+
schema=schema,
2230+
)
22222231
)
22232232
if lower_value != upper_value:
22242233
raise ValueError(
22252234
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
22262235
)
22272236

2228-
source_field = schema.find_field(partition_field.source_id)
2229-
transform = partition_field.transform.transform(source_field.field_type)
2230-
return transform(lower_value)
2237+
return lower_value
22312238

22322239
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
22332240
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})

tests/integration/test_add_files.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@
3030
from pyiceberg.catalog import Catalog
3131
from pyiceberg.exceptions import NoSuchTableError
3232
from pyiceberg.io import FileIO
33-
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
33+
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, schema_to_pyarrow
34+
from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_large_types
3435
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3536
from pyiceberg.schema import Schema
3637
from pyiceberg.table import Table
37-
from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform
38+
from pyiceberg.transforms import BucketTransform, HourTransform, IdentityTransform, MonthTransform
3839
from pyiceberg.types import (
3940
BooleanType,
4041
DateType,
4142
IntegerType,
4243
LongType,
4344
NestedField,
4445
StringType,
46+
TimestampType,
4547
TimestamptzType,
4648
)
4749

@@ -850,3 +852,30 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
850852
with pytest.raises(ValueError) as exc_info:
851853
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
852854
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)
855+
856+
857+
@pytest.mark.integration
858+
def test_add_files_hour_transform(session_catalog: Catalog) -> None:
859+
identifier = "default.test_add_files_hour_transform"
860+
861+
schema = Schema(NestedField(1, "hourly", TimestampType()))
862+
schema_arrow = schema_to_pyarrow(schema, include_field_ids=False)
863+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=HourTransform(), name="spec_hour"))
864+
865+
tbl = _create_table(session_catalog, identifier, format_version=1, schema=schema, partition_spec=spec)
866+
867+
file_path = "s3://warehouse/default/test_add_files_hour_transform/test.parquet"
868+
869+
from pyiceberg.utils.datetime import micros_to_timestamp
870+
871+
arrow_table = pa.Table.from_pylist(
872+
[{"hourly": micros_to_timestamp(1743465600155254)}, {"hourly": micros_to_timestamp(1743469198047855)}],
873+
schema=schema_arrow,
874+
)
875+
876+
fo = tbl.io.new_output(file_path)
877+
with fo.create(overwrite=True) as fos:
878+
with pq.ParquetWriter(fos, schema=schema_arrow) as writer:
879+
writer.write_table(arrow_table)
880+
881+
tbl.add_files(file_paths=[file_path])

0 commit comments

Comments
 (0)