Skip to content

Commit 0b79a14

Browse files
authored
feat: improve runtime filter check via SIMD (#19039)
* check batch * update
1 parent ace8d8b commit 0b79a14

File tree

7 files changed

+242
-317
lines changed

7 files changed

+242
-317
lines changed

src/query/catalog/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#![feature(portable_simd)]
1516
#![allow(clippy::uninlined_format_args)]
1617
#![allow(clippy::large_enum_variant)]
1718

src/query/catalog/src/sbbf.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@
7373
//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
7474
//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
7575
76+
use core::simd::cmp::SimdPartialEq;
77+
use core::simd::Simd;
78+
7679
/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
7780
const SALT: [u32; 8] = [
7881
0x47b6137b_u32,
@@ -90,20 +93,16 @@ const SALT: [u32; 8] = [
9093
#[derive(Debug, Copy, Clone)]
9194
#[repr(transparent)]
9295
struct Block([u32; 8]);
96+
97+
type U32x8 = Simd<u32, 8>;
98+
9399
impl Block {
94100
const ZERO: Block = Block([0; 8]);
95101

96102
/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
97103
/// word has exactly one bit set.
98104
fn mask(x: u32) -> Self {
99-
let mut result = [0_u32; 8];
100-
for i in 0..8 {
101-
// wrapping instead of checking for overflow
102-
let y = x.wrapping_mul(SALT[i]);
103-
let y = y >> 27;
104-
result[i] = 1 << y;
105-
}
106-
Self(result)
105+
Self(Self::mask_simd(x).to_array())
107106
}
108107

109108
#[inline]
@@ -136,13 +135,17 @@ impl Block {
136135

137136
/// Returns true when every bit that is set in the result of mask is also set in the block.
138137
fn check(&self, hash: u32) -> bool {
139-
let mask = Self::mask(hash);
140-
for i in 0..8 {
141-
if self[i] & mask[i] == 0 {
142-
return false;
143-
}
144-
}
145-
true
138+
let mask = Self::mask_simd(hash);
139+
let block_vec = U32x8::from_array(self.0);
140+
(block_vec & mask).simd_ne(U32x8::splat(0)).all()
141+
}
142+
143+
#[inline(always)]
144+
fn mask_simd(x: u32) -> U32x8 {
145+
let hash_vec = U32x8::splat(x);
146+
let salt_vec = U32x8::from_array(SALT);
147+
let bit_index = (hash_vec * salt_vec) >> U32x8::splat(27);
148+
U32x8::splat(1) << bit_index
146149
}
147150
}
148151

@@ -225,6 +228,14 @@ impl Sbbf {
225228
self.0[block_index].insert(hash as u32)
226229
}
227230

231+
/// Insert a batch of hashes into the filter.
232+
pub fn insert_hash_batch(&mut self, hashes: &[u64]) {
233+
for &hash in hashes {
234+
let block_index = self.hash_to_block_index(hash);
235+
self.0[block_index].insert(hash as u32);
236+
}
237+
}
238+
228239
/// Check if a hash is in the filter. May return
229240
/// true for values that was never inserted ("false positive")
230241
/// but will always return false if a hash has not been inserted.
@@ -233,6 +244,17 @@ impl Sbbf {
233244
self.0[block_index].check(hash as u32)
234245
}
235246

247+
/// Check a batch of hashes. The callback is triggered for each matching hash index.
248+
pub fn check_hash_batch<F>(&self, hashes: &[u64], mut on_match: F)
249+
where F: FnMut(usize) {
250+
for (idx, &hash) in hashes.iter().enumerate() {
251+
let block_index = self.hash_to_block_index(hash);
252+
if self.0[block_index].check(hash as u32) {
253+
on_match(idx);
254+
}
255+
}
256+
}
257+
236258
/// Merge another bloom filter into this one (bitwise OR operation)
237259
/// Panics if the filters have different sizes
238260
pub fn union(&mut self, other: &Self) {
@@ -285,6 +307,16 @@ mod tests {
285307
}
286308
}
287309

310+
#[test]
311+
fn test_sbbf_batch_insert_and_check() {
312+
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
313+
let hashes: Vec<u64> = (0..10_000).collect();
314+
sbbf.insert_hash_batch(&hashes);
315+
let mut matched = 0;
316+
sbbf.check_hash_batch(&hashes, |_| matched += 1);
317+
assert_eq!(matched, hashes.len());
318+
}
319+
288320
#[test]
289321
fn test_sbbf_union() {
290322
let mut filter1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_hashtable::BloomHash;
17+
use databend_common_hashtable::FastHash;
18+
19+
use crate::HashMethod;
20+
use crate::HashMethodKind;
21+
use crate::ProjectedBlock;
22+
23+
/// Get row hash by [`HashMethod`] (uses [`FastHash`], i.e. the hashtable hash).
24+
pub fn hash_by_method<T>(
25+
method: &HashMethodKind,
26+
columns: ProjectedBlock,
27+
num_rows: usize,
28+
hashes: &mut T,
29+
) -> Result<()>
30+
where
31+
T: Extend<u64>,
32+
{
33+
match method {
34+
HashMethodKind::Serializer(method) => {
35+
let keys_state = method.build_keys_state(columns, num_rows)?;
36+
hashes.extend(
37+
method
38+
.build_keys_iter(&keys_state)?
39+
.map(|key| key.fast_hash()),
40+
);
41+
}
42+
HashMethodKind::SingleBinary(method) => {
43+
let keys_state = method.build_keys_state(columns, num_rows)?;
44+
hashes.extend(
45+
method
46+
.build_keys_iter(&keys_state)?
47+
.map(|key| key.fast_hash()),
48+
);
49+
}
50+
HashMethodKind::KeysU8(method) => {
51+
let keys_state = method.build_keys_state(columns, num_rows)?;
52+
hashes.extend(
53+
method
54+
.build_keys_iter(&keys_state)?
55+
.map(|key| key.fast_hash()),
56+
);
57+
}
58+
HashMethodKind::KeysU16(method) => {
59+
let keys_state = method.build_keys_state(columns, num_rows)?;
60+
hashes.extend(
61+
method
62+
.build_keys_iter(&keys_state)?
63+
.map(|key| key.fast_hash()),
64+
);
65+
}
66+
HashMethodKind::KeysU32(method) => {
67+
let keys_state = method.build_keys_state(columns, num_rows)?;
68+
hashes.extend(
69+
method
70+
.build_keys_iter(&keys_state)?
71+
.map(|key| key.fast_hash()),
72+
);
73+
}
74+
HashMethodKind::KeysU64(method) => {
75+
let keys_state = method.build_keys_state(columns, num_rows)?;
76+
hashes.extend(
77+
method
78+
.build_keys_iter(&keys_state)?
79+
.map(|key| key.fast_hash()),
80+
);
81+
}
82+
HashMethodKind::KeysU128(method) => {
83+
let keys_state = method.build_keys_state(columns, num_rows)?;
84+
hashes.extend(
85+
method
86+
.build_keys_iter(&keys_state)?
87+
.map(|key| key.fast_hash()),
88+
);
89+
}
90+
HashMethodKind::KeysU256(method) => {
91+
let keys_state = method.build_keys_state(columns, num_rows)?;
92+
hashes.extend(
93+
method
94+
.build_keys_iter(&keys_state)?
95+
.map(|key| key.fast_hash()),
96+
);
97+
}
98+
}
99+
Ok(())
100+
}
101+
102+
/// Get row hash for Bloom filter by [`HashMethod`]. This always uses [`BloomHash`],
103+
/// which is independent of SSE4.2 and provides a well-distributed 64-bit hash.
104+
pub fn hash_by_method_for_bloom<T>(
105+
method: &HashMethodKind,
106+
columns: ProjectedBlock,
107+
num_rows: usize,
108+
hashes: &mut T,
109+
) -> Result<()>
110+
where
111+
T: Extend<u64>,
112+
{
113+
match method {
114+
HashMethodKind::Serializer(method) => {
115+
let keys_state = method.build_keys_state(columns, num_rows)?;
116+
hashes.extend(
117+
method
118+
.build_keys_iter(&keys_state)?
119+
.map(|key| key.bloom_hash()),
120+
);
121+
}
122+
HashMethodKind::SingleBinary(method) => {
123+
let keys_state = method.build_keys_state(columns, num_rows)?;
124+
hashes.extend(
125+
method
126+
.build_keys_iter(&keys_state)?
127+
.map(|key| key.bloom_hash()),
128+
);
129+
}
130+
HashMethodKind::KeysU8(method) => {
131+
let keys_state = method.build_keys_state(columns, num_rows)?;
132+
hashes.extend(
133+
method
134+
.build_keys_iter(&keys_state)?
135+
.map(|key| key.bloom_hash()),
136+
);
137+
}
138+
HashMethodKind::KeysU16(method) => {
139+
let keys_state = method.build_keys_state(columns, num_rows)?;
140+
hashes.extend(
141+
method
142+
.build_keys_iter(&keys_state)?
143+
.map(|key| key.bloom_hash()),
144+
);
145+
}
146+
HashMethodKind::KeysU32(method) => {
147+
let keys_state = method.build_keys_state(columns, num_rows)?;
148+
hashes.extend(
149+
method
150+
.build_keys_iter(&keys_state)?
151+
.map(|key| key.bloom_hash()),
152+
);
153+
}
154+
HashMethodKind::KeysU64(method) => {
155+
let keys_state = method.build_keys_state(columns, num_rows)?;
156+
hashes.extend(
157+
method
158+
.build_keys_iter(&keys_state)?
159+
.map(|key| key.bloom_hash()),
160+
);
161+
}
162+
HashMethodKind::KeysU128(method) => {
163+
let keys_state = method.build_keys_state(columns, num_rows)?;
164+
hashes.extend(
165+
method
166+
.build_keys_iter(&keys_state)?
167+
.map(|key| key.bloom_hash()),
168+
);
169+
}
170+
HashMethodKind::KeysU256(method) => {
171+
let keys_state = method.build_keys_state(columns, num_rows)?;
172+
hashes.extend(
173+
method
174+
.build_keys_iter(&keys_state)?
175+
.map(|key| key.bloom_hash()),
176+
);
177+
}
178+
}
179+
Ok(())
180+
}

src/query/expression/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod evaluator;
5454
mod expression;
5555
pub mod filter;
5656
mod function;
57+
pub mod hash_util;
5758
mod hilbert;
5859
mod kernels;
5960
mod projected_block;

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,7 @@ async fn build_bloom_filter(
272272
if total_items < 50000 {
273273
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
274274
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
275-
for hash in bloom {
276-
filter.insert_hash(hash);
277-
}
275+
filter.insert_hash_batch(&bloom);
278276
return Ok(RuntimeFilterBloom {
279277
column_name,
280278
filter,
@@ -295,9 +293,7 @@ async fn build_bloom_filter(
295293
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
296294
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
297295

298-
for hash in chunk {
299-
filter.insert_hash(hash);
300-
}
296+
filter.insert_hash_batch(&chunk);
301297
Ok::<Sbbf, ErrorCode>(filter)
302298
})
303299
})

0 commit comments

Comments
 (0)