Skip to content

Commit 3c39d3a

Browse files
committed
xx
1 parent 2d4fa49 commit 3c39d3a

File tree

3 files changed

+68
-36
lines changed

3 files changed

+68
-36
lines changed

src/query/service/src/physical_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub use physical_exchange::Exchange;
8282
pub use physical_exchange_sink::ExchangeSink;
8383
pub use physical_exchange_source::ExchangeSource;
8484
pub use physical_filter::Filter;
85-
pub use physical_hash_join::HashJoin;
85+
pub use physical_hash_join::*;
8686
pub use physical_limit::Limit;
8787
pub use physical_materialized_cte::*;
8888
pub use physical_multi_table_insert::*;

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ use databend_common_pipeline::core::Pipe;
3535
use databend_common_pipeline::core::PipeItem;
3636
use databend_common_pipeline::core::ProcessorPtr;
3737
use databend_common_sql::optimizer::ir::SExpr;
38+
use databend_common_sql::plans::FunctionCall;
3839
use databend_common_sql::plans::Join;
40+
use databend_common_sql::plans::JoinEquiCondition;
3941
use databend_common_sql::plans::JoinType;
4042
use databend_common_sql::ColumnEntry;
4143
use databend_common_sql::ColumnSet;
@@ -52,6 +54,7 @@ use crate::physical_plans::format::PhysicalFormat;
5254
use crate::physical_plans::physical_plan::IPhysicalPlan;
5355
use crate::physical_plans::physical_plan::PhysicalPlan;
5456
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
57+
use crate::physical_plans::resolve_scalar;
5558
use crate::physical_plans::runtime_filter::build_runtime_filter;
5659
use crate::physical_plans::Exchange;
5760
use crate::physical_plans::PhysicalPlanBuilder;
@@ -99,6 +102,12 @@ type MergedFieldsResult = (
99102
Vec<(usize, (bool, bool))>,
100103
);
101104

105+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
106+
pub struct NestedLoopFilterInfo {
107+
pub predicates: Vec<RemoteExpr>,
108+
pub projection: Vec<usize>,
109+
}
110+
102111
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
103112
pub struct HashJoin {
104113
pub meta: PhysicalPlanMeta,
@@ -140,6 +149,7 @@ pub struct HashJoin {
140149

141150
pub runtime_filter: PhysicalRuntimeFilters,
142151
pub broadcast_id: Option<u32>,
152+
pub nested_loop_filter: NestedLoopFilterInfo,
143153
}
144154

145155
#[typetag::serde]
@@ -261,6 +271,7 @@ impl IPhysicalPlan for HashJoin {
261271
build_side_cache_info: self.build_side_cache_info.clone(),
262272
runtime_filter: self.runtime_filter.clone(),
263273
broadcast_id: self.broadcast_id,
274+
nested_loop_filter: self.nested_loop_filter.clone(),
264275
})
265276
}
266277

@@ -1184,6 +1195,25 @@ impl PhysicalPlanBuilder {
11841195
.collect::<Result<_>>()
11851196
}
11861197

1198+
fn build_nested_loop_filter_info(
1199+
&self,
1200+
join: &Join,
1201+
merged_schema: &DataSchemaRef,
1202+
) -> Result<NestedLoopFilterInfo> {
1203+
let predicates = join
1204+
.non_equi_conditions
1205+
.iter()
1206+
.map(|c| Ok(c.clone()))
1207+
.chain(join.equi_conditions.iter().map(condition_to_expr))
1208+
.map(|scalar| resolve_scalar(&scalar?, merged_schema))
1209+
.collect::<Result<_>>()?;
1210+
1211+
Ok(NestedLoopFilterInfo {
1212+
predicates,
1213+
projection: vec![],
1214+
})
1215+
}
1216+
11871217
pub async fn build_hash_join(
11881218
&mut self,
11891219
join: &Join,
@@ -1256,6 +1286,8 @@ impl PhysicalPlanBuilder {
12561286
// Step 10: Process non-equi conditions
12571287
let non_equi_conditions = self.process_non_equi_conditions(join, &merged_schema)?;
12581288

1289+
let nested_loop_filter = self.build_nested_loop_filter_info(join, &merged_schema)?;
1290+
12591291
// Step 11: Build runtime filter
12601292
let runtime_filter = build_runtime_filter(
12611293
self.ctx.clone(),
@@ -1300,6 +1332,32 @@ impl PhysicalPlanBuilder {
13001332
build_side_cache_info,
13011333
runtime_filter,
13021334
broadcast_id,
1335+
nested_loop_filter,
13031336
}))
13041337
}
13051338
}
1339+
1340+
fn condition_to_expr(condition: &JoinEquiCondition) -> Result<ScalarExpr> {
1341+
let left_type = condition.left.data_type()?;
1342+
let right_type = condition.right.data_type()?;
1343+
1344+
let arguments = match (&left_type, &right_type) {
1345+
(DataType::Nullable(left), right) if **left == *right => vec![
1346+
condition.left.clone(),
1347+
condition.right.clone().unify_to_data_type(&left_type),
1348+
],
1349+
(left, DataType::Nullable(right)) if *left == **right => vec![
1350+
condition.left.clone().unify_to_data_type(&right_type),
1351+
condition.right.clone(),
1352+
],
1353+
_ => vec![condition.left.clone(), condition.right.clone()],
1354+
};
1355+
1356+
Ok(FunctionCall {
1357+
span: condition.left.span(),
1358+
func_name: "eq".to_string(),
1359+
params: vec![],
1360+
arguments,
1361+
}
1362+
.into())
1363+
}

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use databend_common_column::bitmap::Bitmap;
16-
use databend_common_exception::ErrorCode;
1716
use databend_common_exception::Result;
1817
use databend_common_expression::arrow::and_validities;
1918
use databend_common_expression::type_check::check_function;
@@ -32,6 +31,7 @@ use databend_common_sql::ColumnSet;
3231
use parking_lot::RwLock;
3332

3433
use crate::physical_plans::HashJoin;
34+
use crate::physical_plans::NestedLoopFilterInfo;
3535
use crate::physical_plans::PhysicalRuntimeFilter;
3636
use crate::physical_plans::PhysicalRuntimeFilters;
3737
use crate::pipelines::processors::transforms::wrap_true_validity;
@@ -65,7 +65,7 @@ pub struct HashJoinDesc {
6565
pub(crate) probe_projection: ColumnSet,
6666
pub(crate) probe_to_build: Vec<(usize, (bool, bool))>,
6767
pub(crate) build_schema: DataSchemaRef,
68-
pub(crate) probe_schema: DataSchemaRef,
68+
pub(crate) nested_loop_filter: NestedLoopFilterInfo,
6969
}
7070

7171
#[derive(Debug, Clone)]
@@ -141,7 +141,7 @@ impl HashJoinDesc {
141141
build_projection: join.build_projections.clone(),
142142
probe_projection: join.probe_projections.clone(),
143143
build_schema: join.build.output_schema()?,
144-
probe_schema: join.probe.output_schema()?,
144+
nested_loop_filter: join.nested_loop_filter.clone(),
145145
})
146146
}
147147

@@ -269,47 +269,21 @@ impl HashJoinDesc {
269269
function_ctx: &FunctionContext,
270270
block_size: usize,
271271
) -> Result<FilterExecutor> {
272-
let probe_len = self.probe_schema.num_fields();
273-
274-
let projection = self
275-
.probe_projection
276-
.iter()
277-
.copied()
278-
.chain(self.build_projection.iter().map(|idx| probe_len + *idx))
279-
.collect::<Vec<_>>();
280-
281-
let eq_predicates = self
282-
.probe_keys
272+
let predicates = self
273+
.nested_loop_filter
274+
.predicates
283275
.iter()
284-
.zip(&self.build_keys)
285-
.map(|(probe, build)| {
286-
let build = build.project_column_ref(|old| Ok(old + probe_len))?;
287-
check_function(None, "eq", &[], &[probe.clone(), build], &BUILTIN_FUNCTIONS)
288-
});
289-
290-
let other_predicate = self.other_predicate.as_ref().map(|expr| {
291-
expr.project_column_ref(|index| {
292-
projection.get(*index).copied().ok_or_else(|| {
293-
ErrorCode::Internal(format!(
294-
"Invalid column index {} for nested loop predicate projection",
295-
index
296-
))
297-
})
298-
})
299-
});
300-
301-
let predicate = eq_predicates
302-
.chain(other_predicate)
276+
.map(|x| Ok(x.as_expr(&BUILTIN_FUNCTIONS)))
303277
.reduce(|lhs, rhs| {
304278
check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS)
305279
})
306280
.unwrap()?;
307281

308282
Ok(FilterExecutor::new(
309-
predicate,
283+
predicates,
310284
function_ctx.clone(),
311285
block_size,
312-
Some(projection.into_iter().collect()),
286+
None, // Some(self.nested_loop_filter.projection.iter().copied().collect()),
313287
&BUILTIN_FUNCTIONS,
314288
false,
315289
))

0 commit comments

Comments
 (0)