Skip to content

Commit b4d1bf5

Browse files
committed
x
1 parent 5001ec6 commit b4d1bf5

File tree

3 files changed

+35
-41
lines changed

3 files changed

+35
-41
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub struct HashJoinDesc {
6262
pub(crate) runtime_filter: RuntimeFiltersDesc,
6363

6464
pub(crate) build_projection: ColumnSet,
65-
pub(crate) probe_projections: ColumnSet,
65+
pub(crate) probe_projection: ColumnSet,
6666
pub(crate) probe_to_build: Vec<(usize, (bool, bool))>,
6767
pub(crate) build_schema: DataSchemaRef,
6868
pub(crate) probe_schema: DataSchemaRef,
@@ -139,7 +139,7 @@ impl HashJoinDesc {
139139
runtime_filter: (&join.runtime_filter).into(),
140140
probe_to_build: join.probe_to_build.clone(),
141141
build_projection: join.build_projections.clone(),
142-
probe_projections: join.probe_projections.clone(),
142+
probe_projection: join.probe_projections.clone(),
143143
build_schema: join.build.output_schema()?,
144144
probe_schema: join.probe.output_schema()?,
145145
})
@@ -268,11 +268,11 @@ impl HashJoinDesc {
268268
&self,
269269
function_ctx: &FunctionContext,
270270
block_size: usize,
271-
) -> Result<Option<FilterExecutor>> {
271+
) -> Result<FilterExecutor> {
272272
let probe_len = self.probe_schema.num_fields();
273273

274274
let projection = self
275-
.probe_projections
275+
.probe_projection
276276
.iter()
277277
.copied()
278278
.chain(self.build_projection.iter().map(|idx| probe_len + *idx))
@@ -305,13 +305,13 @@ impl HashJoinDesc {
305305
})
306306
.unwrap()?;
307307

308-
Ok(Some(FilterExecutor::new(
308+
Ok(FilterExecutor::new(
309309
predicate,
310310
function_ctx.clone(),
311311
block_size,
312-
Some(projection),
312+
Some(projection.into_iter().collect()),
313313
&BUILTIN_FUNCTIONS,
314314
false,
315-
)))
315+
))
316316
}
317317
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct InnerHashJoin {
5252
pub(crate) function_ctx: FunctionContext,
5353
pub(crate) basic_state: Arc<BasicHashJoinState>,
5454
pub(crate) performance_context: PerformanceContext,
55-
nested_loop_filter: Option<FilterExecutor>,
55+
nested_loop_filter: FilterExecutor,
5656
}
5757

5858
impl InnerHashJoin {
@@ -84,7 +84,7 @@ impl InnerHashJoin {
8484
function_ctx,
8585
basic_state: state,
8686
performance_context: context,
87-
nested_loop_filter: Some(nested_loop_filter),
87+
nested_loop_filter,
8888
})
8989
}
9090
}
@@ -127,12 +127,10 @@ impl Join for InnerHashJoin {
127127
}
128128
HashJoinHashTable::NestedLoop(build_blocks) => {
129129
let nested = Box::new(LoopJoinStream::new(data, build_blocks));
130-
return match self.nested_loop_filter.as_mut() {
131-
Some(filter_executor) => {
132-
Ok(InnerHashJoinFilterStream::create(nested, filter_executor))
133-
}
134-
None => Ok(nested),
135-
};
130+
return Ok(InnerHashJoinFilterStream::create(
131+
nested,
132+
&mut self.nested_loop_filter,
133+
));
136134
}
137135
_ => (),
138136
}
@@ -146,7 +144,7 @@ impl Join for InnerHashJoin {
146144
};
147145

148146
self.desc.remove_keys_nullable(&mut keys);
149-
let probe_block = data.project(&self.desc.probe_projections);
147+
let probe_block = data.project(&self.desc.probe_projection);
150148

151149
let joined_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() {
152150
HashJoinHashTable::T(table) => {
@@ -252,31 +250,27 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> {
252250
(None, None) => DataBlock::new(vec![], self.probed_rows.matched_build.len()),
253251
};
254252

255-
if !self.desc.probe_to_build.is_empty() {
256-
for (index, (is_probe_nullable, is_build_nullable)) in
257-
self.desc.probe_to_build.iter()
258-
{
259-
let entry = match (is_probe_nullable, is_build_nullable) {
260-
(true, true) | (false, false) => result_block.get_by_offset(*index).clone(),
261-
(true, false) => {
262-
result_block.get_by_offset(*index).clone().remove_nullable()
263-
}
264-
(false, true) => {
265-
let entry = result_block.get_by_offset(*index);
266-
let col = entry.to_column();
267-
268-
match col.is_null() || col.is_nullable() {
269-
true => entry.clone(),
270-
false => BlockEntry::from(NullableColumn::new_column(
271-
col,
272-
Bitmap::new_constant(true, result_block.num_rows()),
273-
)),
274-
}
253+
for (index, (is_probe_nullable, is_build_nullable)) in
254+
self.desc.probe_to_build.iter().cloned()
255+
{
256+
let entry = match (is_probe_nullable, is_build_nullable) {
257+
(true, true) | (false, false) => result_block.get_by_offset(index).clone(),
258+
(true, false) => result_block.get_by_offset(index).clone().remove_nullable(),
259+
(false, true) => {
260+
let entry = result_block.get_by_offset(index);
261+
let col = entry.to_column();
262+
263+
match col.is_null() || col.is_nullable() {
264+
true => entry.clone(),
265+
false => BlockEntry::from(NullableColumn::new_column(
266+
col,
267+
Bitmap::new_constant(true, result_block.num_rows()),
268+
)),
275269
}
276-
};
270+
}
271+
};
277272

278-
result_block.add_entry(entry);
279-
}
273+
result_block.add_entry(entry);
280274
}
281275

282276
return Ok(Some(result_block));

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl Join for OuterLeftHashJoin {
112112
.collect::<Vec<_>>();
113113

114114
let build_block = null_build_block(&types, data.num_rows());
115-
let probe_block = Some(data.project(&self.desc.probe_projections));
115+
let probe_block = Some(data.project(&self.desc.probe_projection));
116116
let result_block = final_result_block(&self.desc, probe_block, build_block, num_rows);
117117
return Ok(Box::new(OneBlockJoinStream(Some(result_block))));
118118
}
@@ -128,7 +128,7 @@ impl Join for OuterLeftHashJoin {
128128
};
129129

130130
self.desc.remove_keys_nullable(&mut keys);
131-
let probe_block = data.project(&self.desc.probe_projections);
131+
let probe_block = data.project(&self.desc.probe_projection);
132132

133133
let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() {
134134
HashJoinHashTable::T(table) => {

0 commit comments

Comments
 (0)