Skip to content

Commit 0c5ec51

Browse files
committed
add read support for parquet bloom filters Add read support for parquet bloom filters.
Closes #2649
1 parent bb41a6d commit 0c5ec51

File tree

5 files changed

+790
-0
lines changed

5 files changed

+790
-0
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from typing import Any
20+
21+
from pyiceberg.expressions import (
22+
BoundEqualTo,
23+
BoundGreaterThan,
24+
BoundGreaterThanOrEqual,
25+
BoundIn,
26+
BoundIsNaN,
27+
BoundIsNull,
28+
BoundLessThan,
29+
BoundLessThanOrEqual,
30+
BoundLiteralPredicate,
31+
BoundNotEqualTo,
32+
BoundNotIn,
33+
BoundNotNaN,
34+
BoundNotNull,
35+
BoundNotStartsWith,
36+
BoundPredicate,
37+
BoundSetPredicate,
38+
BoundStartsWith,
39+
BoundUnaryPredicate,
40+
)
41+
from pyiceberg.expressions.visitors import BooleanExpressionVisitor
42+
from pyiceberg.manifest import DataFile
43+
from pyiceberg.schema import Schema
44+
from pyiceberg.table.bloom_filter import BloomFilter
45+
46+
47+
class BloomFilterEvaluator(BooleanExpressionVisitor[bool]):
48+
"""Evaluator that uses bloom filters to check if a file might contain matching rows.
49+
50+
This evaluator helps prune data files that definitely cannot contain rows matching
51+
a query predicate by using bloom filters for column values.
52+
"""
53+
54+
def __init__(self, data_file: DataFile, schema: Schema):
55+
"""Initialize the bloom filter evaluator.
56+
57+
Args:
58+
data_file: The data file to evaluate bloom filters for.
59+
schema: The table schema for column resolution.
60+
"""
61+
self.data_file = data_file
62+
self.schema = schema
63+
64+
def visit_true(self) -> bool:
65+
"""Visit AlwaysTrue - file might contain matching rows."""
66+
return True
67+
68+
def visit_false(self) -> bool:
69+
"""Visit AlwaysFalse - file definitely contains no matching rows."""
70+
return False
71+
72+
def visit_not(self, child_result: bool) -> bool:
73+
"""Visit Not - invert the child result."""
74+
return not child_result
75+
76+
def visit_and(self, left_result: bool, right_result: bool) -> bool:
77+
"""Visit And - both conditions must allow the file."""
78+
return left_result and right_result
79+
80+
def visit_or(self, left_result: bool, right_result: bool) -> bool:
81+
"""Visit Or - at least one condition must allow the file."""
82+
return left_result or right_result
83+
84+
def visit_unbound_predicate(self, predicate: object) -> bool:
85+
"""Visit an unbound predicate - conservatively allow the file."""
86+
# Unbound predicates haven't been bound to a schema, so we can't evaluate them
87+
return True
88+
89+
def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> bool:
90+
"""Visit a bound predicate and evaluate using bloom filter if available."""
91+
if isinstance(predicate, BoundUnaryPredicate):
92+
# Unary predicates (IsNull, IsNaN, etc.)
93+
return self._visit_unary_predicate(predicate)
94+
elif isinstance(predicate, BoundLiteralPredicate):
95+
# Literal predicates with a single value (EqualTo, NotEqualTo, etc.)
96+
return self._visit_literal_predicate(predicate)
97+
elif isinstance(predicate, BoundSetPredicate):
98+
# Set predicates (In, NotIn)
99+
return self._visit_set_predicate(predicate)
100+
else:
101+
# Unknown predicate type - be conservative and allow the file
102+
return True
103+
104+
def visit_predicate(self, predicate: BoundPredicate[Any]) -> bool:
105+
"""Visit a bound predicate and evaluate using bloom filter if available."""
106+
if isinstance(predicate, BoundUnaryPredicate):
107+
# Unary predicates (IsNull, IsNaN, etc.)
108+
return self._visit_unary_predicate(predicate)
109+
elif isinstance(predicate, BoundLiteralPredicate):
110+
# Literal predicates with a single value (EqualTo, NotEqualTo, etc.)
111+
return self._visit_literal_predicate(predicate)
112+
elif isinstance(predicate, BoundSetPredicate):
113+
# Set predicates (In, NotIn)
114+
return self._visit_set_predicate(predicate)
115+
else:
116+
# Unknown predicate type - be conservative and allow the file
117+
return True
118+
119+
def _visit_unary_predicate(self, predicate: BoundUnaryPredicate[Any]) -> bool:
120+
"""Evaluate unary predicates using bloom filter."""
121+
if isinstance(predicate, BoundIsNull):
122+
# IsNull cannot use bloom filter (nulls not in BF)
123+
return True
124+
elif isinstance(predicate, BoundIsNaN):
125+
# IsNaN cannot use bloom filter (NaN not in BF)
126+
return True
127+
elif isinstance(predicate, BoundNotNull):
128+
# NotNull cannot use bloom filter effectively
129+
return True
130+
elif isinstance(predicate, BoundNotNaN):
131+
# NotNaN cannot use bloom filter effectively
132+
return True
133+
else:
134+
# Unknown unary predicate
135+
return True
136+
137+
def _visit_literal_predicate(self, predicate: BoundLiteralPredicate[Any]) -> bool:
138+
"""Evaluate literal predicates using bloom filter."""
139+
term = predicate.term
140+
literal = predicate.literal
141+
column_id = term.ref().field.field_id
142+
143+
# Get the bloom filter for this column
144+
bloom_filter_bytes = self.data_file.get_bloom_filter(column_id)
145+
if bloom_filter_bytes is None:
146+
# No bloom filter for this column - can't prune
147+
return True
148+
149+
# Deserialize the bloom filter
150+
try:
151+
bloom_filter = BloomFilter.from_bytes(bloom_filter_bytes)
152+
except Exception:
153+
# Error deserializing - be conservative
154+
return True
155+
156+
if isinstance(predicate, BoundEqualTo):
157+
# For EqualTo, check if value might be in the filter
158+
return bloom_filter.might_contain(literal.value)
159+
elif isinstance(predicate, BoundNotEqualTo):
160+
# For NotEqualTo, we can't prune based on bloom filter
161+
# (we need to be in the filter to exclude based on NOT)
162+
return True
163+
elif isinstance(predicate, BoundLessThan):
164+
# For LessThan, we can't use bloom filter effectively
165+
return True
166+
elif isinstance(predicate, BoundLessThanOrEqual):
167+
# For LessThanOrEqual, we can't use bloom filter effectively
168+
return True
169+
elif isinstance(predicate, BoundGreaterThan):
170+
# For GreaterThan, we can't use bloom filter effectively
171+
return True
172+
elif isinstance(predicate, BoundGreaterThanOrEqual):
173+
# For GreaterThanOrEqual, we can't use bloom filter effectively
174+
return True
175+
elif isinstance(predicate, BoundStartsWith):
176+
# For StartsWith, we can't use exact bloom filter matching
177+
return True
178+
elif isinstance(predicate, BoundNotStartsWith):
179+
# For NotStartsWith, we can't prune based on bloom filter
180+
return True
181+
else:
182+
# Unknown literal predicate
183+
return True
184+
185+
def _visit_set_predicate(self, predicate: BoundSetPredicate[Any]) -> bool:
186+
"""Evaluate set predicates using bloom filter."""
187+
term = predicate.term
188+
column_id = term.ref().field.field_id
189+
190+
# Get the bloom filter for this column
191+
bloom_filter_bytes = self.data_file.get_bloom_filter(column_id)
192+
if bloom_filter_bytes is None:
193+
# No bloom filter for this column - can't prune
194+
return True
195+
196+
# Deserialize the bloom filter
197+
try:
198+
bloom_filter = BloomFilter.from_bytes(bloom_filter_bytes)
199+
except Exception:
200+
# Error deserializing - be conservative
201+
return True
202+
203+
if isinstance(predicate, BoundIn):
204+
# For IN predicate, check if any value might be in the filter
205+
# If at least one value might be in the filter, we can't prune the file
206+
for value in predicate.literals:
207+
if bloom_filter.might_contain(value.value):
208+
return True
209+
# None of the values are in the filter - can prune the file
210+
return False
211+
elif isinstance(predicate, BoundNotIn):
212+
# For NOT IN predicate, we can't prune based on bloom filter
213+
return True
214+
else:
215+
# Unknown set predicate
216+
return True

pyiceberg/manifest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,13 @@ def __repr__(self) -> str:
288288
required=False,
289289
doc="ID representing sort order for this file",
290290
),
291+
NestedField(
292+
field_id=146,
293+
name="bloom_filter_bytes",
294+
field_type=MapType(key_id=147, key_type=IntegerType(), value_id=148, value_type=BinaryType()),
295+
required=False,
296+
doc="Map of column id to bloom filter",
297+
),
291298
),
292299
3: StructType(
293300
NestedField(
@@ -411,6 +418,13 @@ def __repr__(self) -> str:
411418
required=False,
412419
doc="The length of a referenced content stored in the file; required if content_offset is present",
413420
),
421+
NestedField(
422+
field_id=146,
423+
name="bloom_filter_bytes",
424+
field_type=MapType(key_id=147, key_type=IntegerType(), value_id=148, value_type=BinaryType()),
425+
required=False,
426+
doc="Map of column id to bloom filter",
427+
),
414428
),
415429
}
416430

@@ -514,6 +528,17 @@ def equality_ids(self) -> List[int] | None:
514528
def sort_order_id(self) -> int | None:
515529
return self._data[15]
516530

531+
@property
532+
def bloom_filter_bytes(self) -> Dict[int, bytes] | None:
533+
"""Get bloom filter bytes for all columns.
534+
535+
Returns a dict mapping column ID to bloom filter bytes.
536+
"""
537+
# Get bloom_filter_bytes which is the last field in the struct
538+
if len(self._data) > 16:
539+
return self._data[16]
540+
return None
541+
517542
# Spec ID should not be stored in the file
518543
_spec_id: int
519544

@@ -536,6 +561,19 @@ def __hash__(self) -> int:
536561
"""Return the hash of the file path."""
537562
return hash(self.file_path)
538563

564+
def get_bloom_filter(self, column_id: int) -> bytes | None:
565+
"""Get bloom filter bytes for a specific column.
566+
567+
Args:
568+
column_id: The column ID to get the bloom filter for.
569+
570+
Returns:
571+
Bloom filter bytes for the column, or None if not available.
572+
"""
573+
if self.bloom_filter_bytes and column_id in self.bloom_filter_bytes:
574+
return self.bloom_filter_bytes[column_id]
575+
return None
576+
539577
def __eq__(self, other: Any) -> bool:
540578
"""Compare the datafile with another object.
541579

pyiceberg/table/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,6 +1922,30 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
19221922
)
19231923
)
19241924

1925+
def _should_keep_file_with_bloom_filter(self, data_file: DataFile) -> bool:
1926+
"""Check if a data file should be kept based on bloom filter evaluation.
1927+
1928+
Args:
1929+
data_file: The data file to evaluate.
1930+
1931+
Returns:
1932+
True if the file should be kept, False if it can be pruned.
1933+
"""
1934+
if data_file.bloom_filter_bytes is None:
1935+
# No bloom filter for this file
1936+
return True
1937+
1938+
try:
1939+
from pyiceberg.expressions.bloom_filter import BloomFilterEvaluator
1940+
from pyiceberg.expressions.visitors import visit
1941+
1942+
# Use the bloom filter evaluator to check if the file might contain matching rows
1943+
evaluator = BloomFilterEvaluator(data_file, self.table_metadata.schema())
1944+
return visit(self.row_filter, evaluator)
1945+
except Exception:
1946+
# If there's any error evaluating bloom filters, be conservative and keep the file
1947+
return True
1948+
19251949
@staticmethod
19261950
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
19271951
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
@@ -1997,6 +2021,10 @@ def plan_files(self) -> Iterable[FileScanTask]:
19972021
for manifest_entry in chain.from_iterable(self.scan_plan_helper()):
19982022
data_file = manifest_entry.data_file
19992023
if data_file.content == DataFileContent.DATA:
2024+
# Apply bloom filter evaluation to prune files that definitely don't match the filter
2025+
if not self._should_keep_file_with_bloom_filter(data_file):
2026+
# Skip this file as it cannot contain matching rows
2027+
continue
20002028
data_entries.append(manifest_entry)
20012029
elif data_file.content == DataFileContent.POSITION_DELETES:
20022030
positional_delete_entries.add(manifest_entry)

0 commit comments

Comments
 (0)