Skip to content

Commit fcbeaa9

Browse files
committed
loop join
1 parent 3efed0c commit fcbeaa9

File tree

18 files changed

+754
-68
lines changed

18 files changed

+754
-68
lines changed

src/common/exception/src/exception_into.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::error::Error;
1616
use std::fmt::Debug;
1717
use std::fmt::Display;
1818
use std::fmt::Formatter;
19+
use std::sync::PoisonError;
1920

2021
use geozero::error::GeozeroError;
2122

@@ -436,3 +437,9 @@ impl From<redis::RedisError> for ErrorCode {
436437
ErrorCode::DictionarySourceError(format!("Dictionary Redis Error, cause: {}", error))
437438
}
438439
}
440+
441+
impl<T> From<PoisonError<T>> for ErrorCode {
442+
fn from(error: PoisonError<T>) -> Self {
443+
ErrorCode::Internal(format!("{error}"))
444+
}
445+
}

src/query/expression/src/block.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,18 @@ impl BlockEntry {
191191
BlockEntry::Column(column) => Ok(ColumnView::Column(T::try_downcast_column(column)?)),
192192
}
193193
}
194+
195+
pub fn into_nullable(self) -> BlockEntry {
196+
match self {
197+
BlockEntry::Const(scalar, data_type, n) if !data_type.is_nullable_or_null() => {
198+
BlockEntry::Const(scalar, DataType::Nullable(Box::new(data_type)), n)
199+
}
200+
entry @ BlockEntry::Const(_, _, _)
201+
| entry @ BlockEntry::Column(Column::Nullable(_))
202+
| entry @ BlockEntry::Column(Column::Null { .. }) => entry,
203+
BlockEntry::Column(column) => column.wrap_nullable(None).into(),
204+
}
205+
}
194206
}
195207

196208
impl From<Column> for BlockEntry {

src/query/expression/src/schema.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,10 @@ impl DataSchema {
362362
}
363363
}
364364

365+
pub fn new_ref(fields: Vec<DataField>) -> Arc<Self> {
366+
Self::new(fields).into()
367+
}
368+
365369
pub fn new_from(fields: Vec<DataField>, metadata: BTreeMap<String, String>) -> Self {
366370
Self { fields, metadata }
367371
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_ast::ast::FormatTreeNode;
16+
use databend_common_exception::Result;
17+
use databend_common_functions::BUILTIN_FUNCTIONS;
18+
19+
use crate::physical_plans::format::append_output_rows_info;
20+
use crate::physical_plans::format::format_output_columns;
21+
use crate::physical_plans::format::plan_stats_info_to_format_tree;
22+
use crate::physical_plans::format::FormatContext;
23+
use crate::physical_plans::format::PhysicalFormat;
24+
use crate::physical_plans::IPhysicalPlan;
25+
use crate::physical_plans::NestedLoopJoin;
26+
use crate::physical_plans::PhysicalPlanMeta;
27+
28+
pub struct NestedLoopJoinFormatter<'a> {
29+
inner: &'a NestedLoopJoin,
30+
}
31+
32+
impl<'a> NestedLoopJoinFormatter<'a> {
33+
pub fn create(inner: &'a NestedLoopJoin) -> Box<dyn PhysicalFormat + 'a> {
34+
Box::new(NestedLoopJoinFormatter { inner })
35+
}
36+
}
37+
38+
impl<'a> PhysicalFormat for NestedLoopJoinFormatter<'a> {
39+
fn get_meta(&self) -> &PhysicalPlanMeta {
40+
self.inner.get_meta()
41+
}
42+
43+
#[recursive::recursive]
44+
fn format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
45+
let conditions = self
46+
.inner
47+
.conditions
48+
.iter()
49+
.map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS).sql_display())
50+
.collect::<Vec<_>>()
51+
.join(", ");
52+
53+
let mut node_children = vec![
54+
FormatTreeNode::new(format!(
55+
"output columns: [{}]",
56+
format_output_columns(self.inner.output_schema()?, ctx.metadata, true)
57+
)),
58+
FormatTreeNode::new(format!("join type: {}", self.inner.join_type)),
59+
FormatTreeNode::new(format!("conditions: [{conditions}]")),
60+
];
61+
62+
if let Some(info) = &self.inner.stat_info {
63+
let items = plan_stats_info_to_format_tree(info);
64+
node_children.extend(items);
65+
}
66+
67+
let left_formatter = self.inner.left.formatter()?;
68+
let mut left_child = left_formatter.dispatch(ctx)?;
69+
left_child.payload = format!("{}(Left)", left_child.payload);
70+
71+
let right_formatter = self.inner.right.formatter()?;
72+
let mut right_child = right_formatter.dispatch(ctx)?;
73+
right_child.payload = format!("{}(Right)", right_child.payload);
74+
75+
node_children.push(left_child);
76+
node_children.push(right_child);
77+
78+
Ok(FormatTreeNode::with_children(
79+
"NestedLoopJoin".to_string(),
80+
node_children,
81+
))
82+
}
83+
84+
#[recursive::recursive]
85+
fn format_join(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
86+
let left_child = self.inner.left.formatter()?.format_join(ctx)?;
87+
let right_child = self.inner.right.formatter()?.format_join(ctx)?;
88+
89+
let children = vec![
90+
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
91+
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
92+
];
93+
94+
Ok(FormatTreeNode::with_children(
95+
format!("NestedLoopJoin: {}", self.inner.join_type),
96+
children,
97+
))
98+
}
99+
100+
#[recursive::recursive]
101+
fn partial_format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
102+
let left_child = self.inner.left.formatter()?.partial_format(ctx)?;
103+
let right_child = self.inner.right.formatter()?.partial_format(ctx)?;
104+
105+
let mut children = vec![];
106+
if let Some(info) = &self.inner.stat_info {
107+
let items = plan_stats_info_to_format_tree(info);
108+
children.extend(items);
109+
}
110+
111+
append_output_rows_info(&mut children, &ctx.profs, self.inner.get_id());
112+
113+
children.push(FormatTreeNode::with_children("Left".to_string(), vec![
114+
left_child,
115+
]));
116+
children.push(FormatTreeNode::with_children("Right".to_string(), vec![
117+
right_child,
118+
]));
119+
120+
Ok(FormatTreeNode::with_children(
121+
format!("NestedLoopJoin: {}", self.inner.join_type),
122+
children,
123+
))
124+
}
125+
}

src/query/service/src/physical_plans/format/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ mod format_mutation_into_organize;
4545
mod format_mutation_into_split;
4646
mod format_mutation_manipulate;
4747
mod format_mutation_source;
48+
mod format_nested_loop_join;
4849
mod format_project_set;
4950
mod format_range_join;
5051
mod format_replace_into;
@@ -93,6 +94,7 @@ pub use format_mutation_into_organize::*;
9394
pub use format_mutation_into_split::*;
9495
pub use format_mutation_manipulate::*;
9596
pub use format_mutation_source::*;
97+
pub use format_nested_loop_join::*;
9698
pub use format_project_set::*;
9799
pub use format_range_join::*;
98100
pub use format_replace_into::*;

src/query/service/src/physical_plans/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ mod physical_mutation_into_organize;
4242
mod physical_mutation_into_split;
4343
mod physical_mutation_manipulate;
4444
mod physical_mutation_source;
45+
mod physical_nested_loop_join;
4546
mod physical_project_set;
4647
mod physical_r_cte_scan;
4748
mod physical_range_join;
@@ -90,6 +91,7 @@ pub use physical_mutation_into_organize::MutationOrganize;
9091
pub use physical_mutation_into_split::MutationSplit;
9192
pub use physical_mutation_manipulate::MutationManipulate;
9293
pub use physical_mutation_source::*;
94+
pub use physical_nested_loop_join::NestedLoopJoin;
9395
pub use physical_project_set::ProjectSet;
9496
pub use physical_r_cte_scan::RecursiveCteScan;
9597
pub use physical_range_join::*;

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -527,13 +527,15 @@ impl PhysicalPlanBuilder {
527527
required: &mut ColumnSet,
528528
others_required: &mut ColumnSet,
529529
) -> (Vec<IndexType>, Vec<IndexType>) {
530-
let retained_columns = self.metadata.read().get_retained_column().clone();
531-
*required = required.union(&retained_columns).cloned().collect();
532-
let column_projections = required.clone().into_iter().collect::<Vec<_>>();
533-
534-
*others_required = others_required.union(&retained_columns).cloned().collect();
535-
let pre_column_projections = others_required.clone().into_iter().collect::<Vec<_>>();
530+
{
531+
let metadata = self.metadata.read();
532+
let retained_columns = metadata.get_retained_column();
533+
required.extend(retained_columns);
534+
others_required.extend(retained_columns);
535+
}
536536

537+
let column_projections = required.iter().copied().collect();
538+
let pre_column_projections = others_required.iter().copied().collect();
537539
(column_projections, pre_column_projections)
538540
}
539541

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use databend_common_exception::ErrorCode;
1616
use databend_common_exception::Result;
17+
use databend_common_settings::Settings;
1718
use databend_common_sql::binder::is_range_join_condition;
1819
use databend_common_sql::optimizer::ir::RelExpr;
1920
use databend_common_sql::optimizer::ir::SExpr;
@@ -30,17 +31,52 @@ use crate::physical_plans::PhysicalPlanBuilder;
3031
enum PhysicalJoinType {
3132
Hash,
3233
// The first arg is range conditions, the second arg is other conditions
33-
RangeJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
34+
RangeJoin {
35+
range: Vec<ScalarExpr>,
36+
other: Vec<ScalarExpr>,
37+
},
38+
LoopJoin {
39+
conditions: Vec<ScalarExpr>,
40+
},
3441
}
3542

3643
// Choose physical join type by join conditions
37-
fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
44+
fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<PhysicalJoinType> {
3845
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
3946
return Err(ErrorCode::SemanticError(
4047
"ANY JOIN only supports equality-based hash joins",
4148
));
4249
}
4350

51+
let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child());
52+
let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child());
53+
let right_stat_info = right_rel_expr.derive_cardinality()?;
54+
let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()?;
55+
if matches!(join.join_type, JoinType::Inner | JoinType::Cross)
56+
&& (right_stat_info
57+
.statistics
58+
.precise_cardinality
59+
.map(|n| n < nested_loop_join_threshold)
60+
.unwrap_or(false)
61+
|| right_stat_info.cardinality < nested_loop_join_threshold as _)
62+
{
63+
let conditions = join
64+
.non_equi_conditions
65+
.iter()
66+
.cloned()
67+
.chain(join.equi_conditions.iter().cloned().map(|condition| {
68+
FunctionCall {
69+
span: condition.left.span(),
70+
func_name: "eq".to_string(),
71+
params: vec![],
72+
arguments: vec![condition.left, condition.right],
73+
}
74+
.into()
75+
}))
76+
.collect();
77+
return Ok(PhysicalJoinType::LoopJoin { conditions });
78+
};
79+
4480
if !join.equi_conditions.is_empty() {
4581
// Contain equi condition, use hash join
4682
return Ok(PhysicalJoinType::Hash);
@@ -51,32 +87,29 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
5187
return Ok(PhysicalJoinType::Hash);
5288
}
5389

54-
let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
55-
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
56-
let right_stat_info = right_rel_expr.derive_cardinality()?;
5790
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
5891
|| right_stat_info.cardinality == 1.0
5992
{
6093
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
6194
return Ok(PhysicalJoinType::Hash);
6295
}
6396

64-
let left_prop = left_rel_expr.derive_relational_prop()?;
65-
let right_prop = right_rel_expr.derive_relational_prop()?;
66-
let (range_conditions, other_conditions) = join
67-
.non_equi_conditions
68-
.iter()
69-
.cloned()
70-
.partition::<Vec<_>, _>(|condition| {
71-
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
72-
});
73-
74-
if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
75-
return Ok(PhysicalJoinType::RangeJoin(
76-
range_conditions,
77-
other_conditions,
78-
));
97+
if matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
98+
let left_prop = left_rel_expr.derive_relational_prop()?;
99+
let right_prop = right_rel_expr.derive_relational_prop()?;
100+
let (range, other) = join
101+
.non_equi_conditions
102+
.iter()
103+
.cloned()
104+
.partition::<Vec<_>, _>(|condition| {
105+
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
106+
});
107+
108+
if !range.is_empty() {
109+
return Ok(PhysicalJoinType::RangeJoin { range, other });
110+
}
79111
}
112+
80113
// Leverage hash join to execute nested loop join
81114
Ok(PhysicalJoinType::Hash)
82115
}
@@ -157,7 +190,8 @@ impl PhysicalPlanBuilder {
157190
)
158191
.await
159192
} else {
160-
match physical_join(join, s_expr)? {
193+
let settings = self.ctx.get_settings();
194+
match physical_join(join, s_expr, &settings)? {
161195
PhysicalJoinType::Hash => {
162196
self.build_hash_join(
163197
join,
@@ -170,7 +204,7 @@ impl PhysicalPlanBuilder {
170204
)
171205
.await
172206
}
173-
PhysicalJoinType::RangeJoin(range, other) => {
207+
PhysicalJoinType::RangeJoin { range, other } => {
174208
self.build_range_join(
175209
join.join_type,
176210
s_expr,
@@ -181,6 +215,16 @@ impl PhysicalPlanBuilder {
181215
)
182216
.await
183217
}
218+
PhysicalJoinType::LoopJoin { conditions } => {
219+
self.build_loop_join(
220+
join.join_type,
221+
s_expr,
222+
left_required,
223+
right_required,
224+
conditions,
225+
)
226+
.await
227+
}
184228
}
185229
}
186230
}

0 commit comments

Comments
 (0)