Skip to content

Commit 6e63033

Browse files
refactor: build the runtime filter during the HashJoin block collection process (#19058)
* refactor old hash join * refactor new * fix * use seperate runtime * add log * optimize build * use Runtime * Update src/query/catalog/src/sbbf.rs Co-authored-by: Winter Zhang <coswde@gmail.com> * make lint --------- Co-authored-by: Winter Zhang <coswde@gmail.com>
1 parent 8305bcd commit 6e63033

File tree

13 files changed

+561
-145
lines changed

13 files changed

+561
-145
lines changed

src/query/catalog/src/sbbf.rs

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@
7575
7676
use core::simd::cmp::SimdPartialEq;
7777
use core::simd::Simd;
78+
use std::mem::size_of;
79+
use std::sync::atomic::AtomicU32;
80+
use std::sync::atomic::Ordering;
81+
use std::sync::Arc;
82+
83+
use databend_common_base::runtime::Runtime;
84+
use databend_common_base::runtime::TrySpawn;
7885

7986
/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
8087
const SALT: [u32; 8] = [
@@ -165,16 +172,41 @@ impl std::ops::IndexMut<usize> for Block {
165172
}
166173
}
167174

175+
#[derive(Debug)]
176+
#[repr(transparent)]
177+
struct BlockAtomic([AtomicU32; 8]);
178+
179+
impl BlockAtomic {
180+
fn new() -> Self {
181+
Self(std::array::from_fn(|_| AtomicU32::new(0)))
182+
}
183+
184+
fn insert(&self, hash: u32) {
185+
let mask = Block::mask(hash);
186+
for (slot, value) in self.0.iter().zip(mask.0.iter()) {
187+
slot.fetch_or(*value, Ordering::Relaxed);
188+
}
189+
}
190+
}
191+
168192
/// A split block Bloom filter.
169193
///
170194
/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
171195
/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
172196
#[derive(Debug, Clone)]
173197
pub struct Sbbf(Vec<Block>);
174198

199+
#[derive(Debug)]
200+
pub struct SbbfAtomic(Vec<BlockAtomic>);
201+
175202
pub(crate) const BITSET_MIN_LENGTH: usize = 32;
176203
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
177204

205+
#[inline]
206+
fn hash_to_block_index_for_blocks(hash: u64, num_blocks: usize) -> usize {
207+
unsafe { (((hash >> 32).unchecked_mul(num_blocks as u64)) >> 32) as usize }
208+
}
209+
178210
#[inline]
179211
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
180212
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
@@ -219,7 +251,7 @@ impl Sbbf {
219251
fn hash_to_block_index(&self, hash: u64) -> usize {
220252
// unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
221253
// but it will not saturate
222-
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
254+
hash_to_block_index_for_blocks(hash, self.0.len())
223255
}
224256

225257
/// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash.
@@ -277,6 +309,109 @@ impl Sbbf {
277309
}
278310
}
279311

312+
impl SbbfAtomic {
313+
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, String> {
314+
if !(0.0..1.0).contains(&fpp) {
315+
return Err(format!(
316+
"False positive probability must be between 0.0 and 1.0, got {fpp}"
317+
));
318+
}
319+
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
320+
Ok(Self::new_with_num_of_bytes(num_bits / 8))
321+
}
322+
323+
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
324+
let num_bytes = optimal_num_of_bytes(num_bytes);
325+
assert_eq!(size_of::<BlockAtomic>(), size_of::<Block>());
326+
assert_eq!(num_bytes % size_of::<BlockAtomic>(), 0);
327+
let num_blocks = num_bytes / size_of::<BlockAtomic>();
328+
let bitset = (0..num_blocks).map(|_| BlockAtomic::new()).collect();
329+
Self(bitset)
330+
}
331+
332+
#[inline]
333+
fn hash_to_block_index(&self, hash: u64) -> usize {
334+
hash_to_block_index_for_blocks(hash, self.0.len())
335+
}
336+
337+
pub fn insert_hash(&self, hash: u64) {
338+
let block_index = self.hash_to_block_index(hash);
339+
self.0[block_index].insert(hash as u32)
340+
}
341+
342+
pub fn insert_hash_batch(&self, hashes: &[u64]) {
343+
for &hash in hashes {
344+
self.insert_hash(hash);
345+
}
346+
}
347+
348+
pub fn insert_hash_batch_parallel(self, hashes: Vec<u64>, max_threads: usize) -> Self {
349+
if hashes.is_empty() || max_threads <= 1 || self.0.len() < 2 {
350+
self.insert_hash_batch(&hashes);
351+
return self;
352+
}
353+
354+
let worker_nums = max_threads.min(hashes.len()).max(1);
355+
let chunk_size = hashes.len().div_ceil(worker_nums).max(1);
356+
let runtime = Runtime::with_worker_threads(worker_nums, Some("sbbf-insert".to_string()))
357+
.expect("failed to create runtime for inserting bloom filter hashes");
358+
359+
let hashes = Arc::new(hashes);
360+
let builder = Arc::new(self);
361+
let total = hashes.len();
362+
let mut join_handlers = Vec::with_capacity(total.div_ceil(chunk_size));
363+
364+
for start in (0..total).step_by(chunk_size) {
365+
let end = (start + chunk_size).min(total);
366+
let hashes = hashes.clone();
367+
let builder = builder.clone();
368+
369+
let handler = runtime
370+
.try_spawn(
371+
async move {
372+
for hash in &hashes[start..end] {
373+
builder.insert_hash(*hash);
374+
}
375+
},
376+
None,
377+
)
378+
.expect("failed to spawn runtime task for inserting bloom filter hashes");
379+
join_handlers.push(handler);
380+
}
381+
382+
runtime
383+
.block_on(async move {
384+
for handler in join_handlers {
385+
handler.await?;
386+
}
387+
Ok(())
388+
})
389+
.expect("runtime bloom filter insert tasks failed");
390+
391+
Arc::try_unwrap(builder)
392+
.expect("unexpected extra references when finishing bloom filter insert")
393+
}
394+
395+
pub fn finish(self) -> Sbbf {
396+
let blocks: Vec<Block> = self
397+
.0
398+
.into_iter()
399+
.map(|block| {
400+
let mut arr = [0u32; 8];
401+
for (dst, src) in arr.iter_mut().zip(block.0.iter()) {
402+
*dst = src.load(Ordering::Relaxed);
403+
}
404+
Block(arr)
405+
})
406+
.collect();
407+
Sbbf(blocks)
408+
}
409+
410+
pub fn estimated_memory_size(&self) -> usize {
411+
self.0.capacity() * size_of::<BlockAtomic>()
412+
}
413+
}
414+
280415
#[cfg(test)]
281416
mod tests {
282417
use super::*;
@@ -336,6 +471,28 @@ mod tests {
336471
}
337472
}
338473

474+
#[test]
475+
fn test_sbbf_atomic_parallel_matches_serial() {
476+
let hashes: Vec<u64> = (0..100_000)
477+
.map(|i| {
478+
let val = i as u64;
479+
val.wrapping_mul(6364136223846793005)
480+
.wrapping_add(1442695040888963407)
481+
})
482+
.collect();
483+
484+
let mut serial = Sbbf::new_with_ndv_fpp(hashes.len() as u64, 0.01).unwrap();
485+
serial.insert_hash_batch(&hashes);
486+
487+
let builder = SbbfAtomic::new_with_ndv_fpp(hashes.len() as u64, 0.01).unwrap();
488+
let builder = builder.insert_hash_batch_parallel(hashes.clone(), 8);
489+
let atomic = builder.finish();
490+
491+
for hash in &hashes {
492+
assert_eq!(serial.check_hash(*hash), atomic.check_hash(*hash));
493+
}
494+
}
495+
339496
#[test]
340497
fn test_optimal_num_of_bytes() {
341498
for (input, expected) in &[

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ impl HashJoin {
459459
stage_sync_barrier.clone(),
460460
self.projections.clone(),
461461
rf_desc.clone(),
462-
);
462+
)?;
463463

464464
join_pipe_items.push(PipeItem::create(
465465
hash_join,

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use parking_lot::RwLock;
5959

6060
use super::concat_buffer::ConcatBuffer;
6161
use super::desc::RuntimeFilterDesc;
62+
use super::runtime_filter::JoinRuntimeFilterPacket;
6263
use crate::pipelines::memory_settings::MemorySettingsExt;
6364
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
6465
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
@@ -104,6 +105,7 @@ pub struct HashJoinBuildState {
104105
pub(crate) concat_buffer: Mutex<ConcatBuffer>,
105106
pub(crate) broadcast_id: Option<u32>,
106107
pub(crate) is_runtime_filter_added: AtomicBool,
108+
runtime_filter_packets: Mutex<Vec<JoinRuntimeFilterPacket>>,
107109
}
108110

109111
impl HashJoinBuildState {
@@ -154,6 +156,7 @@ impl HashJoinBuildState {
154156
concat_buffer: Mutex::new(ConcatBuffer::new(concat_threshold)),
155157
broadcast_id,
156158
is_runtime_filter_added: AtomicBool::new(false),
159+
runtime_filter_packets: Mutex::new(Vec::new()),
157160
}))
158161
}
159162

@@ -875,6 +878,15 @@ impl HashJoinBuildState {
875878
&self.hash_join_state.hash_join_desc.runtime_filter.filters
876879
}
877880

881+
pub fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) {
882+
self.runtime_filter_packets.lock().push(packet);
883+
}
884+
885+
pub fn take_runtime_filter_packets(&self) -> Vec<JoinRuntimeFilterPacket> {
886+
let mut guard = self.runtime_filter_packets.lock();
887+
guard.drain(..).collect()
888+
}
889+
878890
/// only used for test
879891
pub fn get_enable_bloom_runtime_filter(&self) -> bool {
880892
self.hash_join_state

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
2020
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
2121
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
2222
use databend_common_catalog::sbbf::Sbbf;
23+
use databend_common_catalog::sbbf::SbbfAtomic;
2324
use databend_common_exception::ErrorCode;
2425
use databend_common_exception::Result;
2526
use databend_common_expression::type_check;
@@ -74,7 +75,10 @@ pub async fn build_runtime_filter_infos(
7475
probe_expr: probe_key.clone(),
7576
bloom: if enabled {
7677
if let Some(ref bloom) = packet.bloom {
77-
Some(build_bloom_filter(bloom.clone(), probe_key, max_threads).await?)
78+
Some(
79+
build_bloom_filter(bloom.clone(), probe_key, max_threads, desc.id)
80+
.await?,
81+
)
7882
} else {
7983
None
8084
}
@@ -256,6 +260,7 @@ async fn build_bloom_filter(
256260
bloom: Vec<u64>,
257261
probe_key: &Expr<String>,
258262
max_threads: usize,
263+
filter_id: usize,
259264
) -> Result<RuntimeFilterBloom> {
260265
let probe_key = match probe_key {
261266
Expr::ColumnRef(col) => col,
@@ -269,7 +274,7 @@ async fn build_bloom_filter(
269274
let column_name = probe_key.id.to_string();
270275
let total_items = bloom.len();
271276

272-
if total_items < 50000 {
277+
if total_items < 3_000_000 {
273278
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
274279
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
275280
filter.insert_hash_batch(&bloom);
@@ -279,65 +284,23 @@ async fn build_bloom_filter(
279284
});
280285
}
281286

282-
let chunk_size = total_items.div_ceil(max_threads);
283-
284-
let chunks: Vec<Vec<u64>> = bloom
285-
.chunks(chunk_size)
286-
.map(|chunk| chunk.to_vec())
287-
.collect();
288-
289-
let tasks: Vec<_> = chunks
290-
.into_iter()
291-
.map(|chunk| {
292-
databend_common_base::runtime::spawn(async move {
293-
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
294-
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
295-
296-
filter.insert_hash_batch(&chunk);
297-
Ok::<Sbbf, ErrorCode>(filter)
298-
})
299-
})
300-
.collect();
301-
302-
let task_results = futures::future::join_all(tasks).await;
303-
304-
let filters: Vec<Sbbf> = task_results
305-
.into_iter()
306-
.map(|r| r.expect("Task panicked"))
307-
.collect::<Result<Vec<_>>>()?;
308-
309-
let merged_filter = merge_bloom_filters_tree(filters);
287+
let start = std::time::Instant::now();
288+
let builder = SbbfAtomic::new_with_ndv_fpp(total_items as u64, 0.01)
289+
.map_err(|e| ErrorCode::Internal(e.to_string()))?
290+
.insert_hash_batch_parallel(bloom, max_threads);
291+
let filter = builder.finish();
292+
log::info!(
293+
"filter_id: {}, build_time: {:?}",
294+
filter_id,
295+
start.elapsed()
296+
);
310297

311298
Ok(RuntimeFilterBloom {
312299
column_name,
313-
filter: merged_filter,
300+
filter,
314301
})
315302
}
316303

317-
fn merge_bloom_filters_tree(mut filters: Vec<Sbbf>) -> Sbbf {
318-
if filters.is_empty() {
319-
return Sbbf::new_with_ndv_fpp(1, 0.01).unwrap();
320-
}
321-
322-
while filters.len() > 1 {
323-
let mut next_level = Vec::new();
324-
let mut iter = filters.into_iter();
325-
326-
while let Some(mut left) = iter.next() {
327-
if let Some(right) = iter.next() {
328-
left.union(&right);
329-
next_level.push(left);
330-
} else {
331-
next_level.push(left);
332-
}
333-
}
334-
335-
filters = next_level;
336-
}
337-
338-
filters.pop().unwrap()
339-
}
340-
341304
#[cfg(test)]
342305
mod tests {
343306
use std::collections::HashMap;

0 commit comments

Comments
 (0)