Skip to content

Commit fb56aba

Browse files
committed
x
1 parent 84027e1 commit fb56aba

File tree

2 files changed

+34
-27
lines changed
  • src/query/service/src/pipelines/processors/transforms

2 files changed

+34
-27
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_expression::DataBlock;
2222
use databend_common_expression::DataSchemaRef;
2323
use databend_common_expression::Evaluator;
2424
use databend_common_expression::Expr;
25+
use databend_common_expression::FilterExecutor;
2526
use databend_common_expression::FunctionContext;
2627
use databend_common_expression::RemoteExpr;
2728
use databend_common_functions::BUILTIN_FUNCTIONS;
@@ -259,4 +260,35 @@ impl HashJoinDesc {
259260
}
260261
}
261262
}
263+
264+
pub fn create_nested_loop_filter(
265+
&self,
266+
function_ctx: &FunctionContext,
267+
block_size: usize,
268+
) -> Result<FilterExecutor> {
269+
let offset = self.probe_projections.len();
270+
271+
let predicate = self
272+
.probe_keys
273+
.iter()
274+
.zip(&self.build_keys)
275+
.map(|(probe, build)| {
276+
let build = build.project_column_ref(|old| Ok(old + offset))?;
277+
check_function(None, "eq", &[], &[probe.clone(), build], &BUILTIN_FUNCTIONS)
278+
})
279+
.chain(self.other_predicate.iter().map(|expr| Ok(expr.clone())))
280+
.reduce(|lhs, rhs| {
281+
check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS)
282+
})
283+
.unwrap()?;
284+
285+
Ok(FilterExecutor::new(
286+
predicate,
287+
function_ctx.clone(),
288+
block_size,
289+
None,
290+
&BUILTIN_FUNCTIONS,
291+
false,
292+
))
293+
}
262294
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@ use databend_common_catalog::table_context::TableContext;
2020
use databend_common_column::bitmap::Bitmap;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
23-
use databend_common_expression::type_check::check_function;
2423
use databend_common_expression::types::NullableColumn;
2524
use databend_common_expression::with_join_hash_method;
2625
use databend_common_expression::BlockEntry;
2726
use databend_common_expression::DataBlock;
2827
use databend_common_expression::FilterExecutor;
2928
use databend_common_expression::FunctionContext;
3029
use databend_common_expression::HashMethodKind;
31-
use databend_common_functions::BUILTIN_FUNCTIONS;
3230

3331
use super::basic::BasicHashJoin;
3432
use super::basic_state::BasicHashJoinState;
@@ -79,37 +77,14 @@ impl InnerHashJoin {
7977
settings.get_nested_loop_join_threshold()? as _,
8078
)?;
8179

82-
let offset = desc.probe_projections.len();
83-
let predicate = desc
84-
.probe_keys
85-
.iter()
86-
.zip(&desc.build_keys)
87-
.map(|(probe, build)| {
88-
let build = build.project_column_ref(|old| Ok(old + offset))?;
89-
check_function(None, "eq", &[], &[probe.clone(), build], &BUILTIN_FUNCTIONS)
90-
})
91-
.chain(desc.other_predicate.iter().map(|expr| Ok(expr.clone())))
92-
.reduce(|lhs, rhs| {
93-
check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS)
94-
})
95-
.unwrap()?;
96-
97-
let nested_loop_filter = Some(FilterExecutor::new(
98-
predicate,
99-
function_ctx.clone(),
100-
block_size,
101-
None,
102-
&BUILTIN_FUNCTIONS,
103-
false,
104-
));
105-
80+
let nested_loop_filter = desc.create_nested_loop_filter(&function_ctx, block_size)?;
10681
Ok(InnerHashJoin {
10782
desc,
10883
basic_hash_join,
10984
function_ctx,
11085
basic_state: state,
11186
performance_context: context,
112-
nested_loop_filter,
87+
nested_loop_filter: Some(nested_loop_filter),
11388
})
11489
}
11590
}

0 commit comments

Comments
 (0)