Skip to content

Commit 8da46e9

Browse files
committed
x
1 parent 6e82cf8 commit 8da46e9

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod transform_hash_join;
2626
pub use grace::GraceHashJoin;
2727
pub use hash_join_factory::HashJoinFactory;
2828
pub use join::Join;
29+
pub use join::JoinStream;
2930
pub use memory::BasicHashJoinState;
3031
pub use memory::InnerHashJoin;
3132
pub use runtime_filter::RuntimeFiltersDesc;

src/query/service/src/pipelines/processors/transforms/transform_loop_join.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use std::sync::Mutex;
1919
use std::sync::RwLock;
2020

21+
use databend_common_base::base::ProgressValues;
2122
use databend_common_exception::Result;
2223
use databend_common_expression::BlockEntry;
2324
use databend_common_expression::DataBlock;
@@ -28,6 +29,8 @@ use databend_common_pipeline::core::Processor;
2829
use databend_common_pipeline::sinks::Sink;
2930
use databend_common_sql::plans::JoinType;
3031

32+
use super::Join;
33+
use super::JoinStream;
3134
use crate::physical_plans::NestedLoopJoin;
3235
use crate::pipelines::executor::WatchNotify;
3336
use crate::sessions::QueryContext;
@@ -137,7 +140,6 @@ pub struct LoopJoinState {
137140

138141
right_sinker_count: RwLock<usize>,
139142

140-
#[allow(dead_code)]
141143
join_type: JoinType,
142144
}
143145

@@ -255,3 +257,49 @@ impl LoopJoinState {
255257
Ok(iter)
256258
}
257259
}
260+
261+
impl Join for LoopJoinState {
262+
fn add_block(&mut self, data: Option<DataBlock>) -> Result<()> {
263+
let Some(right_block) = data else {
264+
return Ok(());
265+
};
266+
267+
let right = if matches!(self.join_type, JoinType::Left | JoinType::Full) {
268+
let rows = right_block.num_rows();
269+
let entries = right_block
270+
.take_columns()
271+
.into_iter()
272+
.map(|entry| entry.into_nullable())
273+
.collect::<Vec<_>>();
274+
DataBlock::new(entries, rows)
275+
} else {
276+
right_block
277+
};
278+
self.right_table.write()?.push(right);
279+
Ok(())
280+
}
281+
282+
fn final_build(&mut self) -> Result<Option<ProgressValues>> {
283+
let progress = self.right_table.read()?.iter().fold(
284+
ProgressValues::default(),
285+
|mut progress, block| {
286+
progress.rows += block.num_rows();
287+
progress.bytes += block.memory_size();
288+
progress
289+
},
290+
);
291+
Ok(Some(progress))
292+
}
293+
294+
fn probe_block(&mut self, data: DataBlock) -> Result<Box<dyn JoinStream + '_>> {
295+
todo!();
296+
}
297+
}
298+
299+
struct LoopJoinStream {}
300+
301+
impl JoinStream for LoopJoinStream {
302+
fn next(&mut self) -> Result<Option<DataBlock>> {
303+
todo!()
304+
}
305+
}

0 commit comments

Comments
 (0)