Skip to content

Commit 08a51fe

Browse files
committed
x
1 parent b4d1bf5 commit 08a51fe

File tree

1 file changed

+33
-94
lines changed

1 file changed

+33
-94
lines changed

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 33 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,82 +1184,6 @@ impl PhysicalPlanBuilder {
11841184
.collect::<Result<_>>()
11851185
}
11861186

1187-
/// Creates a HashJoin physical plan
1188-
///
1189-
/// # Arguments
1190-
/// * `join` - Join operation
1191-
/// * `probe_side` - Probe side physical plan
1192-
/// * `build_side` - Build side physical plan
1193-
/// * `is_broadcast` - Whether this is a broadcast join
1194-
/// * `projections` - Column projections
1195-
/// * `probe_projections` - Probe side projections
1196-
/// * `build_projections` - Build side projections
1197-
/// * `left_join_conditions` - Left join conditions
1198-
/// * `right_join_conditions` - Right join conditions
1199-
/// * `is_null_equal` - Null equality flags
1200-
/// * `non_equi_conditions` - Non-equi conditions
1201-
/// * `probe_to_build` - Probe to build mapping
1202-
/// * `output_schema` - Output schema
1203-
/// * `build_side_cache_info` - Build side cache info
1204-
/// * `runtime_filter` - Runtime filter
1205-
/// * `stat_info` - Statistics info
1206-
///
1207-
/// # Returns
1208-
/// * `Result<PhysicalPlan>` - The HashJoin physical plan
1209-
#[allow(clippy::too_many_arguments)]
1210-
fn create_hash_join(
1211-
&self,
1212-
s_expr: &SExpr,
1213-
join: &Join,
1214-
probe_side: PhysicalPlan,
1215-
build_side: PhysicalPlan,
1216-
projections: ColumnSet,
1217-
probe_projections: ColumnSet,
1218-
build_projections: ColumnSet,
1219-
left_join_conditions: Vec<RemoteExpr>,
1220-
right_join_conditions: Vec<RemoteExpr>,
1221-
is_null_equal: Vec<bool>,
1222-
non_equi_conditions: Vec<RemoteExpr>,
1223-
probe_to_build: Vec<(usize, (bool, bool))>,
1224-
output_schema: DataSchemaRef,
1225-
build_side_cache_info: Option<(usize, HashMap<IndexType, usize>)>,
1226-
runtime_filter: PhysicalRuntimeFilters,
1227-
stat_info: PlanStatsInfo,
1228-
) -> Result<PhysicalPlan> {
1229-
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
1230-
let broadcast_id = if build_side_data_distribution
1231-
.as_ref()
1232-
.is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_)))
1233-
{
1234-
Some(self.ctx.get_next_broadcast_id())
1235-
} else {
1236-
None
1237-
};
1238-
Ok(PhysicalPlan::new(HashJoin {
1239-
projections,
1240-
build_projections,
1241-
probe_projections,
1242-
build: build_side,
1243-
probe: probe_side,
1244-
join_type: join.join_type,
1245-
build_keys: right_join_conditions,
1246-
probe_keys: left_join_conditions,
1247-
is_null_equal,
1248-
non_equi_conditions,
1249-
marker_index: join.marker_index,
1250-
meta: PhysicalPlanMeta::new("HashJoin"),
1251-
from_correlated_subquery: join.from_correlated_subquery,
1252-
probe_to_build,
1253-
output_schema,
1254-
need_hold_hash_table: join.need_hold_hash_table,
1255-
stat_info: Some(stat_info),
1256-
single_to_inner: join.single_to_inner,
1257-
build_side_cache_info,
1258-
runtime_filter,
1259-
broadcast_id,
1260-
}))
1261-
}
1262-
12631187
pub async fn build_hash_join(
12641188
&mut self,
12651189
join: &Join,
@@ -1345,23 +1269,38 @@ impl PhysicalPlanBuilder {
13451269
.await?;
13461270

13471271
// Step 12: Create and return the HashJoin
1348-
self.create_hash_join(
1349-
s_expr,
1350-
join,
1351-
probe_side,
1352-
build_side,
1353-
projections,
1354-
probe_projections,
1355-
build_projections,
1356-
left_join_conditions,
1357-
right_join_conditions,
1358-
is_null_equal,
1359-
non_equi_conditions,
1360-
probe_to_build,
1361-
output_schema,
1362-
build_side_cache_info,
1363-
runtime_filter,
1364-
stat_info,
1365-
)
1272+
{
1273+
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
1274+
let broadcast_id = if build_side_data_distribution.as_ref().is_some_and(|e| {
1275+
matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_))
1276+
}) {
1277+
Some(self.ctx.get_next_broadcast_id())
1278+
} else {
1279+
None
1280+
};
1281+
Ok(PhysicalPlan::new(HashJoin {
1282+
projections: projections,
1283+
build_projections: build_projections,
1284+
probe_projections: probe_projections,
1285+
build: build_side,
1286+
probe: probe_side,
1287+
join_type: join.join_type,
1288+
build_keys: right_join_conditions,
1289+
probe_keys: left_join_conditions,
1290+
is_null_equal: is_null_equal,
1291+
non_equi_conditions: non_equi_conditions,
1292+
marker_index: join.marker_index,
1293+
meta: PhysicalPlanMeta::new("HashJoin"),
1294+
from_correlated_subquery: join.from_correlated_subquery,
1295+
probe_to_build: probe_to_build,
1296+
output_schema: output_schema,
1297+
need_hold_hash_table: join.need_hold_hash_table,
1298+
stat_info: Some(stat_info),
1299+
single_to_inner: join.single_to_inner,
1300+
build_side_cache_info: build_side_cache_info,
1301+
runtime_filter: runtime_filter,
1302+
broadcast_id,
1303+
}))
1304+
}
13661305
}
13671306
}

0 commit comments

Comments
 (0)