Skip to content

Commit 84027e1

Browse files
committed
x
1 parent 66c6793 commit 84027e1

File tree

5 files changed

+47
-15
lines changed

5 files changed

+47
-15
lines changed

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ enum PhysicalJoinType {
4343
}
4444

4545
// Choose physical join type by join conditions
46-
fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<PhysicalJoinType> {
46+
fn physical_join(join: &Join, s_expr: &SExpr, _settings: &Settings) -> Result<PhysicalJoinType> {
4747
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
4848
return Err(ErrorCode::SemanticError(
4949
"ANY JOIN only supports equality-based hash joins",
@@ -53,7 +53,7 @@ fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<Phy
5353
let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child());
5454
let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child());
5555
let right_stat_info = right_rel_expr.derive_cardinality()?;
56-
let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()?;
56+
let nested_loop_join_threshold = 0;
5757
if matches!(join.join_type, JoinType::Inner | JoinType::Cross)
5858
&& (right_stat_info
5959
.statistics

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use std::sync::PoisonError;
1818

1919
use databend_common_base::base::ProgressValues;
20-
use databend_common_catalog::table_context::TableContext;
2120
use databend_common_exception::Result;
2221
use databend_common_expression::Column;
2322
use databend_common_expression::DataBlock;
@@ -27,6 +26,7 @@ use databend_common_expression::HashMethodSerializer;
2726
use databend_common_expression::HashMethodSingleBinary;
2827
use databend_common_hashtable::BinaryHashJoinHashMap;
2928
use databend_common_hashtable::HashJoinHashMap;
29+
use databend_common_settings::Settings;
3030
use databend_common_sql::plans::JoinType;
3131
use ethnum::U256;
3232

@@ -40,7 +40,6 @@ use crate::pipelines::processors::transforms::SkipDuplicatesFixedKeyHashJoinHash
4040
use crate::pipelines::processors::transforms::SkipDuplicatesSerializerHashJoinHashTable;
4141
use crate::pipelines::processors::transforms::SkipDuplicatesSingleBinaryHashJoinHashTable;
4242
use crate::pipelines::processors::HashJoinDesc;
43-
use crate::sessions::QueryContext;
4443

4544
pub struct BasicHashJoin {
4645
pub(crate) desc: Arc<HashJoinDesc>,
@@ -54,14 +53,13 @@ pub struct BasicHashJoin {
5453

5554
impl BasicHashJoin {
5655
pub fn create(
57-
ctx: &QueryContext,
56+
settings: &Settings,
5857
function_ctx: FunctionContext,
5958
method: HashMethodKind,
6059
desc: Arc<HashJoinDesc>,
6160
state: Arc<BasicHashJoinState>,
61+
nested_loop_join_threshold: usize,
6262
) -> Result<Self> {
63-
let settings = ctx.get_settings();
64-
let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()? as _;
6563
let squash_block = SquashBlocks::new(
6664
settings.get_max_block_size()? as _,
6765
settings.get_max_block_bytes()? as _,
@@ -92,7 +90,7 @@ impl BasicHashJoin {
9290

9391
pub(crate) fn final_build(&mut self) -> Result<Option<ProgressValues>> {
9492
if let Some(true) = self.init_memory_hash_table() {
95-
return Ok(Some(self.build_plain()));
93+
return Ok(Some(self.build_nested_loop()));
9694
};
9795

9896
let Some(chunk_index) = self.state.steal_chunk_index() else {
@@ -352,7 +350,7 @@ impl BasicHashJoin {
352350
Ok(())
353351
}
354352

355-
fn build_plain(&self) -> ProgressValues {
353+
fn build_nested_loop(&self) -> ProgressValues {
356354
let mut progress = ProgressValues::default();
357355
let mut plain = vec![];
358356
while let Some(chunk_index) = self.state.steal_chunk_index() {
@@ -361,8 +359,6 @@ impl BasicHashJoin {
361359
let mut chunk_block = DataBlock::empty();
362360
std::mem::swap(chunk_mut, &mut chunk_block);
363361

364-
chunk_block = chunk_block.project(&self.desc.build_projection);
365-
366362
progress.rows += chunk_block.num_rows();
367363
progress.bytes += chunk_block.memory_size();
368364

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ use databend_common_catalog::table_context::TableContext;
2020
use databend_common_column::bitmap::Bitmap;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
23+
use databend_common_expression::type_check::check_function;
2324
use databend_common_expression::types::NullableColumn;
2425
use databend_common_expression::with_join_hash_method;
2526
use databend_common_expression::BlockEntry;
2627
use databend_common_expression::DataBlock;
2728
use databend_common_expression::FilterExecutor;
2829
use databend_common_expression::FunctionContext;
2930
use databend_common_expression::HashMethodKind;
31+
use databend_common_functions::BUILTIN_FUNCTIONS;
3032

3133
use super::basic::BasicHashJoin;
3234
use super::basic_state::BasicHashJoinState;
@@ -52,6 +54,7 @@ pub struct InnerHashJoin {
5254
pub(crate) function_ctx: FunctionContext,
5355
pub(crate) basic_state: Arc<BasicHashJoinState>,
5456
pub(crate) performance_context: PerformanceContext,
57+
nested_loop_filter: Option<FilterExecutor>,
5558
}
5659

5760
impl InnerHashJoin {
@@ -68,19 +71,45 @@ impl InnerHashJoin {
6871
let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone());
6972

7073
let basic_hash_join = BasicHashJoin::create(
71-
ctx,
74+
&settings,
7275
function_ctx.clone(),
7376
method,
7477
desc.clone(),
7578
state.clone(),
79+
settings.get_nested_loop_join_threshold()? as _,
7680
)?;
7781

82+
let offset = desc.probe_projections.len();
83+
let predicate = desc
84+
.probe_keys
85+
.iter()
86+
.zip(&desc.build_keys)
87+
.map(|(probe, build)| {
88+
let build = build.project_column_ref(|old| Ok(old + offset))?;
89+
check_function(None, "eq", &[], &[probe.clone(), build], &BUILTIN_FUNCTIONS)
90+
})
91+
.chain(desc.other_predicate.iter().map(|expr| Ok(expr.clone())))
92+
.reduce(|lhs, rhs| {
93+
check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS)
94+
})
95+
.unwrap()?;
96+
97+
let nested_loop_filter = Some(FilterExecutor::new(
98+
predicate,
99+
function_ctx.clone(),
100+
block_size,
101+
None,
102+
&BUILTIN_FUNCTIONS,
103+
false,
104+
));
105+
78106
Ok(InnerHashJoin {
79107
desc,
80108
basic_hash_join,
81109
function_ctx,
82110
basic_state: state,
83111
performance_context: context,
112+
nested_loop_filter,
84113
})
85114
}
86115
}
@@ -123,7 +152,13 @@ impl Join for InnerHashJoin {
123152
}
124153
HashJoinHashTable::NestedLoop(build_blocks) => {
125154
let probe_block = data.project(&self.desc.probe_projections);
126-
return Ok(Box::new(LoopJoinStream::new(probe_block, build_blocks)));
155+
let nested = Box::new(LoopJoinStream::new(probe_block, build_blocks));
156+
return match self.nested_loop_filter.as_mut() {
157+
Some(filter_executor) => {
158+
Ok(InnerHashJoinFilterStream::create(nested, filter_executor))
159+
}
160+
None => Ok(nested),
161+
};
127162
}
128163
_ => (),
129164
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,12 @@ impl OuterLeftHashJoin {
6868
let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone());
6969

7070
let basic_hash_join = BasicHashJoin::create(
71-
ctx,
71+
&settings,
7272
function_ctx.clone(),
7373
method,
7474
desc.clone(),
7575
state.clone(),
76+
0,
7677
)?;
7778

7879
Ok(OuterLeftHashJoin {

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ impl DefaultSettings {
496496
range: Some(SettingRange::Numeric(0..=u64::MAX)),
497497
}),
498498
("nested_loop_join_threshold", DefaultSettingValue {
499-
value: UserSettingValue::UInt64(0),
499+
value: UserSettingValue::UInt64(1024),
500500
desc: "Set the threshold for use nested loop join. Setting it to 0 disable nested loop join.",
501501
mode: SettingMode::Both,
502502
scope: SettingScope::Both,

0 commit comments

Comments
 (0)