Skip to content

Commit 591d4f6

Browse files
committed
update
1 parent b0936f2 commit 591d4f6

File tree

1 file changed

+12
-3
lines changed
  • src/query/service/src/pipelines/processors/transforms/new_hash_join/memory

1 file changed

+12
-3
lines changed

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl<T: Join> Join for NestedLoopJoin<T> {
113113
matches: Vec::with_capacity(max_block_size),
114114
build_block_index: 0,
115115
build_row_index: 0,
116+
use_range: false,
116117
}))
117118
}
118119
}
@@ -126,6 +127,7 @@ struct NestedLoopJoinStream<'a> {
126127
build_block_index: usize,
127128
build_row_index: usize,
128129
matches: Vec<(u32, RowPtr)>,
130+
use_range: bool,
129131
}
130132

131133
impl<'a> NestedLoopJoinStream<'a> {
@@ -171,16 +173,23 @@ impl<'a> NestedLoopJoinStream<'a> {
171173
}
172174

173175
fn emit_block(&mut self, count: usize) -> Result<DataBlock> {
174-
let use_range = count as f64 > SELECTIVITY_THRESHOLD * self.max_block_size as f64;
176+
if !self.use_range
177+
&& self.matches.len() as f64
178+
> SELECTIVITY_THRESHOLD * self.probe_block.num_rows() as f64
179+
{
180+
// Need to test the scenario where a probe matches multiple builds
181+
self.use_range = true;
182+
}
183+
175184
let block = {
176-
if use_range {
185+
if self.use_range {
177186
self.matches.sort_unstable_by_key(|(probe, _)| *probe);
178187
}
179188
let (probe_indices, build_indices): (Vec<_>, Vec<_>) =
180189
self.matches.drain(..count).unzip();
181190

182191
let probe = self.probe_block.clone().project(&self.desc.projections);
183-
let probe = if use_range {
192+
let probe = if self.use_range {
184193
let ranges = DataBlock::merge_indices_to_ranges(&probe_indices);
185194
probe.take_ranges(&ranges, count)?
186195
} else {

0 commit comments

Comments
 (0)