Skip to content

Commit be7ccb0

Browse files
committed
fix
1 parent f822514 commit be7ccb0

File tree

5 files changed

+92
-16
lines changed

5 files changed

+92
-16
lines changed

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
use databend_common_exception::ErrorCode;
1616
use databend_common_exception::Result;
17+
use databend_common_expression::types::DataType;
1718
use databend_common_settings::Settings;
1819
use databend_common_sql::binder::is_range_join_condition;
1920
use databend_common_sql::optimizer::ir::RelExpr;
2021
use databend_common_sql::optimizer::ir::SExpr;
2122
use databend_common_sql::plans::FunctionCall;
2223
use databend_common_sql::plans::Join;
24+
use databend_common_sql::plans::JoinEquiCondition;
2325
use databend_common_sql::plans::JoinType;
2426
use databend_common_sql::ColumnSet;
2527
use databend_common_sql::ScalarExpr;
@@ -63,17 +65,9 @@ fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<Phy
6365
let conditions = join
6466
.non_equi_conditions
6567
.iter()
66-
.cloned()
67-
.chain(join.equi_conditions.iter().cloned().map(|condition| {
68-
FunctionCall {
69-
span: condition.left.span(),
70-
func_name: "eq".to_string(),
71-
params: vec![],
72-
arguments: vec![condition.left, condition.right],
73-
}
74-
.into()
75-
}))
76-
.collect();
68+
.map(|c| Ok(c.clone()))
69+
.chain(join.equi_conditions.iter().map(condition_to_expr))
70+
.collect::<Result<_>>()?;
7771
return Ok(PhysicalJoinType::LoopJoin { conditions });
7872
};
7973

@@ -229,3 +223,28 @@ impl PhysicalPlanBuilder {
229223
}
230224
}
231225
}
226+
227+
fn condition_to_expr(condition: &JoinEquiCondition) -> Result<ScalarExpr> {
228+
let left_type = condition.left.data_type()?;
229+
let right_type = condition.right.data_type()?;
230+
231+
let arguments = match (&left_type, &right_type) {
232+
(DataType::Nullable(left), right) if **left == *right => vec![
233+
condition.left.clone(),
234+
condition.right.clone().unify_to_data_type(&left_type),
235+
],
236+
(left, DataType::Nullable(right)) if *left == **right => vec![
237+
condition.left.clone().unify_to_data_type(&right_type),
238+
condition.right.clone(),
239+
],
240+
_ => vec![condition.left.clone(), condition.right.clone()],
241+
};
242+
243+
Ok(FunctionCall {
244+
span: condition.left.span(),
245+
func_name: "eq".to_string(),
246+
params: vec![],
247+
arguments,
248+
}
249+
.into())
250+
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,20 @@ impl BasicHashJoin {
6060
state: Arc<BasicHashJoinState>,
6161
) -> Result<Self> {
6262
let settings = ctx.get_settings();
63-
let block_size = settings.get_max_block_size()? as usize;
64-
let block_bytes = settings.get_max_block_size()? as usize;
63+
let squash_block = SquashBlocks::new(
64+
settings.get_max_block_size()? as _,
65+
settings.get_max_block_bytes()? as _,
66+
);
6567

6668
Ok(BasicHashJoin {
6769
desc,
6870
state,
6971
method,
7072
function_ctx,
71-
squash_block: SquashBlocks::new(block_size, block_bytes),
73+
squash_block,
7274
})
7375
}
76+
7477
pub(crate) fn add_block(&mut self, mut data: Option<DataBlock>) -> Result<()> {
7578
let mut squashed_block = match data.take() {
7679
None => self.squash_block.finalize()?,
@@ -128,6 +131,7 @@ impl BasicHashJoin {
128131
std::mem::swap(&mut chunks[chunk_index], &mut chunk_block);
129132
}
130133

134+
log::info!("build_hash_table chunk_index{chunk_index}");
131135
self.build_hash_table(keys_block, chunk_index)?;
132136

133137
Ok(Some(ProgressValues {

src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod transform_hash_join;
2626
pub use grace::GraceHashJoin;
2727
pub use hash_join_factory::HashJoinFactory;
2828
pub use join::Join;
29+
pub use join::JoinStream;
2930
pub use memory::BasicHashJoinState;
3031
pub use memory::InnerHashJoin;
3132
pub use runtime_filter::RuntimeFiltersDesc;

src/query/service/src/pipelines/processors/transforms/transform_loop_join.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use std::sync::Mutex;
1919
use std::sync::RwLock;
2020

21+
use databend_common_base::base::ProgressValues;
2122
use databend_common_exception::Result;
2223
use databend_common_expression::BlockEntry;
2324
use databend_common_expression::DataBlock;
@@ -28,6 +29,8 @@ use databend_common_pipeline::core::Processor;
2829
use databend_common_pipeline::sinks::Sink;
2930
use databend_common_sql::plans::JoinType;
3031

32+
use super::Join;
33+
use super::JoinStream;
3134
use crate::physical_plans::NestedLoopJoin;
3235
use crate::pipelines::executor::WatchNotify;
3336
use crate::sessions::QueryContext;
@@ -137,7 +140,6 @@ pub struct LoopJoinState {
137140

138141
right_sinker_count: RwLock<usize>,
139142

140-
#[allow(dead_code)]
141143
join_type: JoinType,
142144
}
143145

@@ -255,3 +257,49 @@ impl LoopJoinState {
255257
Ok(iter)
256258
}
257259
}
260+
261+
impl Join for LoopJoinState {
262+
fn add_block(&mut self, data: Option<DataBlock>) -> Result<()> {
263+
let Some(right_block) = data else {
264+
return Ok(());
265+
};
266+
267+
let right = if matches!(self.join_type, JoinType::Left | JoinType::Full) {
268+
let rows = right_block.num_rows();
269+
let entries = right_block
270+
.take_columns()
271+
.into_iter()
272+
.map(|entry| entry.into_nullable())
273+
.collect::<Vec<_>>();
274+
DataBlock::new(entries, rows)
275+
} else {
276+
right_block
277+
};
278+
self.right_table.write()?.push(right);
279+
Ok(())
280+
}
281+
282+
fn final_build(&mut self) -> Result<Option<ProgressValues>> {
283+
let progress = self.right_table.read()?.iter().fold(
284+
ProgressValues::default(),
285+
|mut progress, block| {
286+
progress.rows += block.num_rows();
287+
progress.bytes += block.memory_size();
288+
progress
289+
},
290+
);
291+
Ok(Some(progress))
292+
}
293+
294+
fn probe_block(&mut self, data: DataBlock) -> Result<Box<dyn JoinStream + '_>> {
295+
todo!();
296+
}
297+
}
298+
299+
struct LoopJoinStream {}
300+
301+
impl JoinStream for LoopJoinStream {
302+
fn next(&mut self) -> Result<Option<DataBlock>> {
303+
todo!()
304+
}
305+
}

src/query/sql/src/planner/plans/join.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,11 @@ impl JoinEquiCondition {
222222
left.into_iter()
223223
.zip(right)
224224
.enumerate()
225-
.map(|(index, (l, r))| JoinEquiCondition::new(l, r, is_null_equal.contains(&index)))
225+
.map(|(index, (left, right))| Self {
226+
left,
227+
right,
228+
is_null_equal: is_null_equal.contains(&index),
229+
})
226230
.collect()
227231
}
228232
}

0 commit comments

Comments
 (0)