Skip to content

Commit 447487b

Browse files
committed
Merge branch 'main' into ehsan/bodo_support
2 parents f99171c + 86bf71c commit 447487b

File tree

10 files changed

+337
-162
lines changed

10 files changed

+337
-162
lines changed

poetry.lock

Lines changed: 156 additions & 152 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/table/__init__.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,13 +1852,11 @@ def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
18521852
def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
18531853
spec = self.table_metadata.specs()[spec_id]
18541854

1855+
from pyiceberg.expressions.visitors import residual_evaluator_of
1856+
18551857
# The lambda created here is run in multiple threads.
18561858
# So we avoid creating _EvaluatorExpression methods bound to a single
18571859
# shared instance across multiple threads.
1858-
# return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
1859-
from pyiceberg.expressions.visitors import residual_evaluator_of
1860-
1861-
# assert self.row_filter == False
18621860
return lambda datafile: (
18631861
residual_evaluator_of(
18641862
spec=spec,
@@ -1868,7 +1866,8 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
18681866
)
18691867
)
18701868

1871-
def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
1869+
@staticmethod
1870+
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
18721871
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
18731872
18741873
Args:

pyiceberg/table/snapshots.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
TOTAL_FILE_SIZE = "total-files-size"
5959
CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
6060
CHANGED_PARTITION_PREFIX = "partitions."
61+
PARTITION_SUMMARY_PROP = "partition-summaries-included"
6162
OPERATION = "operation"
6263

6364
INITIAL_SEQUENCE_NUMBER = 0
@@ -306,6 +307,8 @@ def build(self) -> Dict[str, str]:
306307
changed_partitions_size = len(self.partition_metrics)
307308
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
308309
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
310+
if changed_partitions_size > 0:
311+
properties[PARTITION_SUMMARY_PROP] = "true"
309312
for partition_path, update_metrics_partition in self.partition_metrics.items():
310313
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
311314
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

pyiceberg/table/statistics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Dict, List, Literal, Optional
17+
from typing import Dict, List, Literal, Optional, Union
1818

1919
from pydantic import Field
2020

@@ -48,7 +48,7 @@ class PartitionStatisticsFile(StatisticsCommonFields):
4848

4949

5050
def filter_statistics_by_snapshot_id(
51-
statistics: List[StatisticsFile],
51+
statistics: List[Union[StatisticsFile, PartitionStatisticsFile]],
5252
reject_snapshot_id: int,
53-
) -> List[StatisticsFile]:
53+
) -> List[Union[StatisticsFile, PartitionStatisticsFile]]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39-
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
39+
from pyiceberg.table.statistics import (
40+
PartitionStatisticsFile,
41+
StatisticsFile,
42+
filter_statistics_by_snapshot_id,
43+
)
4044
from pyiceberg.typedef import (
4145
IcebergBaseModel,
4246
Properties,
@@ -198,6 +202,16 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
198202
snapshot_id: int = Field(alias="snapshot-id")
199203

200204

205+
class SetPartitionStatisticsUpdate(IcebergBaseModel):
206+
action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics")
207+
partition_statistics: PartitionStatisticsFile
208+
209+
210+
class RemovePartitionStatisticsUpdate(IcebergBaseModel):
211+
action: Literal["remove-partition-statistics"] = Field(default="remove-partition-statistics")
212+
snapshot_id: int = Field(alias="snapshot-id")
213+
214+
201215
TableUpdate = Annotated[
202216
Union[
203217
AssignUUIDUpdate,
@@ -217,6 +231,8 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
217231
RemovePropertiesUpdate,
218232
SetStatisticsUpdate,
219233
RemoveStatisticsUpdate,
234+
SetPartitionStatisticsUpdate,
235+
RemovePartitionStatisticsUpdate,
220236
],
221237
Field(discriminator="action"),
222238
]
@@ -582,6 +598,29 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
582598
return base_metadata.model_copy(update={"statistics": statistics})
583599

584600

601+
@_apply_table_update.register(SetPartitionStatisticsUpdate)
602+
def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
603+
partition_statistics = filter_statistics_by_snapshot_id(
604+
base_metadata.partition_statistics, update.partition_statistics.snapshot_id
605+
)
606+
context.add_update(update)
607+
608+
return base_metadata.model_copy(update={"partition_statistics": partition_statistics + [update.partition_statistics]})
609+
610+
611+
@_apply_table_update.register(RemovePartitionStatisticsUpdate)
612+
def _(
613+
update: RemovePartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext
614+
) -> TableMetadata:
615+
if not any(part_stat.snapshot_id == update.snapshot_id for part_stat in base_metadata.partition_statistics):
616+
raise ValueError(f"Partition Statistics with snapshot id {update.snapshot_id} does not exist")
617+
618+
statistics = filter_statistics_by_snapshot_id(base_metadata.partition_statistics, update.snapshot_id)
619+
context.add_update(update)
620+
621+
return base_metadata.model_copy(update={"partition_statistics": statistics})
622+
623+
585624
def update_table_metadata(
586625
base_metadata: TableMetadata,
587626
updates: Tuple[TableUpdate, ...],

pyiceberg/utils/schema_conversion.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@
6969
LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
7070
("date", "int"): DateType(),
7171
("time-micros", "long"): TimeType(),
72+
("timestamp-millis", "int"): TimestampType(),
7273
("timestamp-micros", "long"): TimestampType(),
7374
("uuid", "fixed"): UUIDType(),
75+
("uuid", "string"): UUIDType(),
7476
}
7577

7678
AvroType = Union[str, Any]

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ pyiceberg-core = ["pyiceberg-core"]
319319
datafusion = ["datafusion"]
320320

321321
[tool.pytest.ini_options]
322+
testpaths = ["tests"]
323+
322324
markers = [
323325
"unmarked: marks a test as a unittest",
324326
"s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, --aws-secret-access-key, and --endpoint args)",

tests/table/test_init.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
SortField,
6565
SortOrder,
6666
)
67-
from pyiceberg.table.statistics import BlobMetadata, StatisticsFile
67+
from pyiceberg.table.statistics import BlobMetadata, PartitionStatisticsFile, StatisticsFile
6868
from pyiceberg.table.update import (
6969
AddSnapshotUpdate,
7070
AddSortOrderUpdate,
@@ -76,11 +76,13 @@
7676
AssertLastAssignedPartitionId,
7777
AssertRefSnapshotId,
7878
AssertTableUUID,
79+
RemovePartitionStatisticsUpdate,
7980
RemovePropertiesUpdate,
8081
RemoveSnapshotRefUpdate,
8182
RemoveSnapshotsUpdate,
8283
RemoveStatisticsUpdate,
8384
SetDefaultSortOrderUpdate,
85+
SetPartitionStatisticsUpdate,
8486
SetPropertiesUpdate,
8587
SetSnapshotRefUpdate,
8688
SetStatisticsUpdate,
@@ -1359,3 +1361,79 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
13591361
table_v2_with_statistics.metadata,
13601362
(RemoveStatisticsUpdate(snapshot_id=123456789),),
13611363
)
1364+
1365+
1366+
def test_set_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1367+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1368+
1369+
partition_statistics_file = PartitionStatisticsFile(
1370+
snapshot_id=snapshot_id,
1371+
statistics_path="s3://bucket/warehouse/stats.puffin",
1372+
file_size_in_bytes=124,
1373+
)
1374+
1375+
update = SetPartitionStatisticsUpdate(
1376+
partition_statistics=partition_statistics_file,
1377+
)
1378+
1379+
new_metadata = update_table_metadata(
1380+
table_v2_with_statistics.metadata,
1381+
(update,),
1382+
)
1383+
1384+
expected = """
1385+
{
1386+
"snapshot-id": 3055729675574597004,
1387+
"statistics-path": "s3://bucket/warehouse/stats.puffin",
1388+
"file-size-in-bytes": 124
1389+
}"""
1390+
1391+
assert len(new_metadata.partition_statistics) == 1
1392+
1393+
updated_statistics = [stat for stat in new_metadata.partition_statistics if stat.snapshot_id == snapshot_id]
1394+
1395+
assert len(updated_statistics) == 1
1396+
assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected)
1397+
1398+
1399+
def test_remove_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1400+
# Add partition statistics file.
1401+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1402+
1403+
partition_statistics_file = PartitionStatisticsFile(
1404+
snapshot_id=snapshot_id,
1405+
statistics_path="s3://bucket/warehouse/stats.puffin",
1406+
file_size_in_bytes=124,
1407+
)
1408+
1409+
update = SetPartitionStatisticsUpdate(
1410+
partition_statistics=partition_statistics_file,
1411+
)
1412+
1413+
new_metadata = update_table_metadata(
1414+
table_v2_with_statistics.metadata,
1415+
(update,),
1416+
)
1417+
assert len(new_metadata.partition_statistics) == 1
1418+
1419+
# Remove the same partition statistics file.
1420+
remove_update = RemovePartitionStatisticsUpdate(snapshot_id=snapshot_id)
1421+
1422+
remove_metadata = update_table_metadata(
1423+
new_metadata,
1424+
(remove_update,),
1425+
)
1426+
1427+
assert len(remove_metadata.partition_statistics) == 0
1428+
1429+
1430+
def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_with_statistics: Table) -> None:
1431+
# Remove the same partition statistics file.
1432+
with pytest.raises(
1433+
ValueError,
1434+
match="Partition Statistics with snapshot id 123456789 does not exist",
1435+
):
1436+
update_table_metadata(
1437+
table_v2_with_statistics.metadata,
1438+
(RemovePartitionStatisticsUpdate(snapshot_id=123456789),),
1439+
)

tests/table/test_snapshots.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def test_snapshot_summary_collector_with_partition() -> None:
224224
"added-records": "100",
225225
"deleted-records": "300",
226226
"changed-partition-count": "2",
227+
"partition-summaries-included": "true",
227228
"partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
228229
"partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200",
229230
}
@@ -259,11 +260,32 @@ def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> Non
259260
"added-records": "100",
260261
"deleted-records": "300",
261262
"changed-partition-count": "2",
263+
"partition-summaries-included": "true",
262264
"partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
263265
"partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200",
264266
}
265267

266268

269+
@pytest.mark.integration
270+
def test_partition_summaries_included_not_set_when_no_change() -> None:
271+
ssc = SnapshotSummaryCollector()
272+
# No files added, so no partition_metrics
273+
ssc.set_partition_summary_limit(10)
274+
result = ssc.build()
275+
assert "partition-summaries-included" not in result
276+
assert result == {} # Should be empty dict
277+
278+
279+
@pytest.mark.integration
280+
def test_partition_summaries_included_not_set_when_unpartitioned_files(table_schema_simple: Schema) -> None:
281+
ssc = SnapshotSummaryCollector()
282+
data_file = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record())
283+
ssc.add_file(data_file, schema=table_schema_simple)
284+
ssc.set_partition_summary_limit(10)
285+
result = ssc.build()
286+
assert "partition-summaries-included" not in result
287+
288+
267289
def test_merge_snapshot_summaries_empty() -> None:
268290
assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
269291
operation=Operation.APPEND,

tests/utils/test_schema_conversion.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
NestedField,
3434
StringType,
3535
StructType,
36+
TimestampType,
3637
UnknownType,
38+
UUIDType,
3739
)
3840
from pyiceberg.utils.schema_conversion import AvroSchemaConversion
3941

@@ -327,6 +329,30 @@ def test_convert_date_type() -> None:
327329
assert actual == DateType()
328330

329331

332+
def test_convert_uuid_str_type() -> None:
333+
avro_logical_type = {"type": "string", "logicalType": "uuid"}
334+
actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
335+
assert actual == UUIDType()
336+
337+
338+
def test_convert_uuid_fixed_type() -> None:
339+
avro_logical_type = {"type": "fixed", "logicalType": "uuid"}
340+
actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
341+
assert actual == UUIDType()
342+
343+
344+
def test_convert_timestamp_millis_type() -> None:
345+
avro_logical_type = {"type": "int", "logicalType": "timestamp-millis"}
346+
actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
347+
assert actual == TimestampType()
348+
349+
350+
def test_convert_timestamp_micros_type() -> None:
351+
avro_logical_type = {"type": "int", "logicalType": "timestamp-micros"}
352+
actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
353+
assert actual == TimestampType()
354+
355+
330356
def test_unknown_logical_type() -> None:
331357
"""Test raising a ValueError when converting an unknown logical type as part of an Avro schema conversion"""
332358
avro_logical_type = {"type": "bytes", "logicalType": "date"}

0 commit comments

Comments
 (0)