Skip to content

Commit d9c8ddf

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-integrate-residuals
2 parents 93debf1 + 4d4714a commit d9c8ddf

File tree

19 files changed

+581
-358
lines changed

19 files changed

+581
-358
lines changed

poetry.lock

Lines changed: 233 additions & 230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/expressions/__init__.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
from __future__ import annotations
1919

2020
from abc import ABC, abstractmethod
21-
from functools import cached_property, reduce
21+
from functools import cached_property
2222
from typing import (
2323
Any,
24+
Callable,
2425
Generic,
2526
Iterable,
27+
Sequence,
2628
Set,
2729
Tuple,
2830
Type,
@@ -79,6 +81,45 @@ def __or__(self, other: BooleanExpression) -> BooleanExpression:
7981
return Or(self, other)
8082

8183

84+
def _build_balanced_tree(
85+
operator_: Callable[[BooleanExpression, BooleanExpression], BooleanExpression], items: Sequence[BooleanExpression]
86+
) -> BooleanExpression:
87+
"""
88+
Recursively constructs a balanced binary tree of BooleanExpressions using the provided binary operator.
89+
90+
This function is a safer and more scalable alternative to:
91+
reduce(operator_, items)
92+
93+
Using `reduce` creates a deeply nested, unbalanced tree (e.g., operator_(a, operator_(b, operator_(c, ...)))),
94+
which grows linearly with the number of items. This can lead to RecursionError exceptions in Python
95+
when the number of expressions is large (e.g., >1000).
96+
97+
In contrast, this function builds a balanced binary tree with logarithmic depth (O(log n)),
98+
helping avoid recursion issues and ensuring that expression trees remain stable, predictable,
99+
and safe to traverse — especially in tools like PyIceberg that operate on large logical trees.
100+
101+
Parameters:
102+
operator_ (Callable): A binary operator function (e.g., pyiceberg.expressions.Or, And) that takes two
103+
BooleanExpressions and returns a combined BooleanExpression.
104+
items (Sequence[BooleanExpression]): A sequence of BooleanExpression objects to combine.
105+
106+
Returns:
107+
BooleanExpression: The balanced combination of all input BooleanExpressions.
108+
109+
Raises:
110+
ValueError: If the input sequence is empty.
111+
"""
112+
if not items:
113+
raise ValueError("No expressions to combine")
114+
if len(items) == 1:
115+
return items[0]
116+
mid = len(items) // 2
117+
118+
left = _build_balanced_tree(operator_, items[:mid])
119+
right = _build_balanced_tree(operator_, items[mid:])
120+
return operator_(left, right)
121+
122+
82123
class Term(Generic[L], ABC):
83124
"""A simple expression that evaluates to a value."""
84125

@@ -214,7 +255,7 @@ class And(BooleanExpression):
214255

215256
def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore
216257
if rest:
217-
return reduce(And, (left, right, *rest))
258+
return _build_balanced_tree(And, (left, right, *rest))
218259
if left is AlwaysFalse() or right is AlwaysFalse():
219260
return AlwaysFalse()
220261
elif left is AlwaysTrue():
@@ -257,7 +298,7 @@ class Or(BooleanExpression):
257298

258299
def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore
259300
if rest:
260-
return reduce(Or, (left, right, *rest))
301+
return _build_balanced_tree(Or, (left, right, *rest))
261302
if left is AlwaysTrue() or right is AlwaysTrue():
262303
return AlwaysTrue()
263304
elif left is AlwaysFalse():

pyiceberg/io/pyarrow.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
from pyiceberg.utils.concurrent import ExecutorFactory
176176
from pyiceberg.utils.config import Config
177177
from pyiceberg.utils.datetime import millis_to_datetime
178+
from pyiceberg.utils.decimal import unscaled_to_decimal
178179
from pyiceberg.utils.deprecated import deprecation_message
179180
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
180181
from pyiceberg.utils.singleton import Singleton
@@ -1384,7 +1385,6 @@ def _task_to_record_batches(
13841385
positional_deletes: Optional[List[ChunkedArray]],
13851386
case_sensitive: bool,
13861387
name_mapping: Optional[NameMapping] = None,
1387-
use_large_types: bool = True,
13881388
partition_spec: Optional[PartitionSpec] = None,
13891389
) -> Iterator[pa.RecordBatch]:
13901390
_, _, path = _parse_location(task.file.file_path)
@@ -1420,13 +1420,7 @@ def _task_to_record_batches(
14201420

14211421
fragment_scanner = ds.Scanner.from_fragment(
14221422
fragment=fragment,
1423-
# With PyArrow 16.0.0 there is an issue with casting record-batches:
1424-
# https://github.com/apache/arrow/issues/41884
1425-
# https://github.com/apache/arrow/issues/43183
1426-
# Would be good to remove this later on
1427-
schema=_pyarrow_schema_ensure_large_types(physical_schema)
1428-
if use_large_types
1429-
else (_pyarrow_schema_ensure_small_types(physical_schema)),
1423+
schema=physical_schema,
14301424
# This will push down the query to Arrow.
14311425
# But in case there are positional deletes, we have to apply them first
14321426
filter=pyarrow_filter if not positional_deletes else None,
@@ -1461,7 +1455,6 @@ def _task_to_record_batches(
14611455
file_project_schema,
14621456
current_batch,
14631457
downcast_ns_timestamp_to_us=True,
1464-
use_large_types=use_large_types,
14651458
)
14661459

14671460
# Inject projected column values if available
@@ -1555,14 +1548,6 @@ def __init__(
15551548
self._case_sensitive = case_sensitive
15561549
self._limit = limit
15571550

1558-
@property
1559-
def _use_large_types(self) -> bool:
1560-
"""Whether to represent data as large arrow types.
1561-
1562-
Defaults to True.
1563-
"""
1564-
return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1565-
15661551
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
15671552
"""Scan the Iceberg table and return a pa.Table.
15681553
@@ -1618,11 +1603,21 @@ def _table_from_scan_task(task: FileScanTask) -> Optional[pa.Table]:
16181603

16191604
tables = [f.result() for f in completed_futures if f.result()]
16201605

1606+
arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)
1607+
16211608
if len(tables) < 1:
1622-
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
1609+
return pa.Table.from_batches([], schema=arrow_schema)
16231610

16241611
result = pa.concat_tables(tables, promote_options="permissive")
16251612

1613+
if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
1614+
deprecation_message(
1615+
deprecated_in="0.10.0",
1616+
removed_in="0.11.0",
1617+
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
1618+
)
1619+
result = result.cast(arrow_schema)
1620+
16261621
if self._limit is not None:
16271622
return result.slice(0, self._limit)
16281623

@@ -1666,7 +1661,6 @@ def _record_batches_from_scan_tasks_and_deletes(
16661661
deletes_per_file.get(task.file.file_path),
16671662
self._case_sensitive,
16681663
self._table_metadata.name_mapping(),
1669-
self._use_large_types,
16701664
self._table_metadata.spec(),
16711665
)
16721666
for batch in batches:
@@ -1685,13 +1679,12 @@ def _to_requested_schema(
16851679
batch: pa.RecordBatch,
16861680
downcast_ns_timestamp_to_us: bool = False,
16871681
include_field_ids: bool = False,
1688-
use_large_types: bool = True,
16891682
) -> pa.RecordBatch:
16901683
# We could reuse some of these visitors
16911684
struct_array = visit_with_partner(
16921685
requested_schema,
16931686
batch,
1694-
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
1687+
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
16951688
ArrowAccessor(file_schema),
16961689
)
16971690
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1701,20 +1694,27 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
17011694
_file_schema: Schema
17021695
_include_field_ids: bool
17031696
_downcast_ns_timestamp_to_us: bool
1704-
_use_large_types: bool
1697+
_use_large_types: Optional[bool]
17051698

17061699
def __init__(
17071700
self,
17081701
file_schema: Schema,
17091702
downcast_ns_timestamp_to_us: bool = False,
17101703
include_field_ids: bool = False,
1711-
use_large_types: bool = True,
1704+
use_large_types: Optional[bool] = None,
17121705
) -> None:
17131706
self._file_schema = file_schema
17141707
self._include_field_ids = include_field_ids
17151708
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
17161709
self._use_large_types = use_large_types
17171710

1711+
if use_large_types is not None:
1712+
deprecation_message(
1713+
deprecated_in="0.10.0",
1714+
removed_in="0.11.0",
1715+
help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor",
1716+
)
1717+
17181718
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17191719
file_field = self._file_schema.find_field(field.field_id)
17201720

@@ -1723,7 +1723,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17231723
target_schema = schema_to_pyarrow(
17241724
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
17251725
)
1726-
if not self._use_large_types:
1726+
if self._use_large_types is False:
17271727
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
17281728
return values.cast(target_schema)
17291729
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
@@ -1784,7 +1784,7 @@ def struct(
17841784
field_arrays.append(array)
17851785
fields.append(self._construct_field(field, array.type))
17861786
elif field.optional:
1787-
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=False)
1787+
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)
17881788
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
17891789
fields.append(self._construct_field(field, arrow_type))
17901790
else:
@@ -1896,7 +1896,7 @@ def visit_fixed(self, fixed_type: FixedType) -> str:
18961896
return "FIXED_LEN_BYTE_ARRAY"
18971897

18981898
def visit_decimal(self, decimal_type: DecimalType) -> str:
1899-
return "FIXED_LEN_BYTE_ARRAY"
1899+
return "INT32" if decimal_type.precision <= 9 else "INT64" if decimal_type.precision <= 18 else "FIXED_LEN_BYTE_ARRAY"
19001900

19011901
def visit_boolean(self, boolean_type: BooleanType) -> str:
19021902
return "BOOLEAN"
@@ -2370,8 +2370,13 @@ def data_file_statistics_from_parquet_metadata(
23702370
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
23712371
)
23722372

2373-
col_aggs[field_id].update_min(statistics.min)
2374-
col_aggs[field_id].update_max(statistics.max)
2373+
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
2374+
scale = stats_col.iceberg_type.scale
2375+
col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale))
2376+
col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale))
2377+
else:
2378+
col_aggs[field_id].update_min(statistics.min)
2379+
col_aggs[field_id].update_max(statistics.max)
23752380

23762381
except pyarrow.lib.ArrowNotImplementedError as e:
23772382
invalidate_col.add(field_id)

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1793,7 +1793,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
17931793
return pa.RecordBatchReader.from_batches(
17941794
target_schema,
17951795
batches,
1796-
)
1796+
).cast(target_schema)
17971797

17981798
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
17991799
"""Read a Pandas DataFrame eagerly from this Iceberg table.

pyiceberg/table/upsert_util.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
BooleanExpression,
2727
EqualTo,
2828
In,
29+
Or,
2930
)
3031

3132

@@ -39,7 +40,12 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre
3940
functools.reduce(operator.and_, [EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()
4041
]
4142

42-
return AlwaysFalse() if len(filters) == 0 else functools.reduce(operator.or_, filters)
43+
if len(filters) == 0:
44+
return AlwaysFalse()
45+
elif len(filters) == 1:
46+
return filters[0]
47+
else:
48+
return Or(*filters)
4349

4450

4551
def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool:
@@ -65,7 +71,16 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6571
# When the target table is empty, there is nothing to update :)
6672
return source_table.schema.empty_table()
6773

68-
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
74+
diff_expr = functools.reduce(
75+
operator.or_,
76+
[
77+
pc.or_kleene(
78+
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
79+
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
80+
)
81+
for col in non_key_cols
82+
],
83+
)
6984

7085
return (
7186
source_table

pyiceberg/transforms.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ def hour_func(v: Any) -> int:
703703

704704
elif isinstance(source, (TimestampNanoType, TimestamptzNanoType)):
705705

706-
def day_func(v: Any) -> int:
706+
def hour_func(v: Any) -> int:
707707
# python datetime has no nanoseconds support.
708708
# nanosecond datetimes will be expressed as int as a workaround
709709
return datetime.nanos_to_hours(v)

pyiceberg/types.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
Field,
4848
PrivateAttr,
4949
SerializeAsAny,
50+
field_validator,
5051
model_serializer,
5152
model_validator,
5253
)
@@ -310,6 +311,14 @@ class NestedField(IcebergType):
310311
... doc="Just a long"
311312
... ))
312313
'2: bar: required long (Just a long)'
314+
>>> str(NestedField(
315+
... field_id=3,
316+
... name='baz',
317+
... field_type="string",
318+
... required=True,
319+
... doc="A string field"
320+
... ))
321+
'3: baz: required string (A string field)'
313322
"""
314323

315324
field_id: int = Field(alias="id")
@@ -320,11 +329,21 @@ class NestedField(IcebergType):
320329
initial_default: Optional[Any] = Field(alias="initial-default", default=None, repr=False)
321330
write_default: Optional[L] = Field(alias="write-default", default=None, repr=False) # type: ignore
322331

332+
@field_validator("field_type", mode="before")
333+
def convert_field_type(cls, v: Any) -> IcebergType:
334+
"""Convert string values into IcebergType instances."""
335+
if isinstance(v, str):
336+
try:
337+
return IcebergType.handle_primitive_type(v, None)
338+
except ValueError as e:
339+
raise ValueError(f"Unsupported field type: '{v}'") from e
340+
return v
341+
323342
def __init__(
324343
self,
325344
field_id: Optional[int] = None,
326345
name: Optional[str] = None,
327-
field_type: Optional[IcebergType] = None,
346+
field_type: Optional[IcebergType | str] = None,
328347
required: bool = False,
329348
doc: Optional[str] = None,
330349
initial_default: Optional[Any] = None,

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ python = "^3.9.2, !=3.9.7"
5353
mmh3 = ">=4.0.0,<6.0.0"
5454
requests = ">=2.20.0,<3.0.0"
5555
click = ">=7.1.1,<9.0.0"
56-
rich = ">=10.11.0,<14.0.0"
56+
rich = ">=10.11.0,<15.0.0"
5757
strictyaml = ">=1.7.0,<2.0.0" # CVE-2020-14343 was fixed in 5.4.
5858
pydantic = ">=2.0,<3.0,!=2.4.0,!=2.4.1" # 2.4.0, 2.4.1 has a critical bug
5959
sortedcontainers = "2.4.0"
@@ -98,20 +98,20 @@ pytest-mock = "3.14.0"
9898
pyspark = "3.5.5"
9999
cython = "3.0.12"
100100
deptry = ">=0.14,<0.24"
101-
datafusion = ">=44,<46"
101+
datafusion = ">=44,<47"
102102
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
103103

104104
[tool.poetry.group.docs.dependencies]
105105
# for mkdocs
106106
mkdocs = "1.6.1"
107-
griffe = "1.6.3"
107+
griffe = "1.7.1"
108108
jinja2 = "3.1.6"
109-
mkdocstrings = "0.29.0"
109+
mkdocstrings = "0.29.1"
110110
mkdocstrings-python = "1.16.8"
111111
mkdocs-literate-nav = "0.6.2"
112112
mkdocs-autorefs = "1.4.1"
113113
mkdocs-gen-files = "0.5.0"
114-
mkdocs-material = "9.6.9"
114+
mkdocs-material = "9.6.10"
115115
mkdocs-material-extensions = "1.3.1"
116116
mkdocs-section-index = "0.3.9"
117117

0 commit comments

Comments
 (0)