Skip to content

Commit 5001ec6

Browse files
committed
x
1 parent fb56aba commit 5001ec6

File tree

2 files changed

+32
-10
lines changed
  • src/query/service/src/pipelines/processors/transforms

2 files changed

+32
-10
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use databend_common_column::bitmap::Bitmap;
16+
use databend_common_exception::ErrorCode;
1617
use databend_common_exception::Result;
1718
use databend_common_expression::arrow::and_validities;
1819
use databend_common_expression::type_check::check_function;
@@ -64,6 +65,7 @@ pub struct HashJoinDesc {
6465
pub(crate) probe_projections: ColumnSet,
6566
pub(crate) probe_to_build: Vec<(usize, (bool, bool))>,
6667
pub(crate) build_schema: DataSchemaRef,
68+
pub(crate) probe_schema: DataSchemaRef,
6769
}
6870

6971
#[derive(Debug, Clone)]
@@ -139,6 +141,7 @@ impl HashJoinDesc {
139141
build_projection: join.build_projections.clone(),
140142
probe_projections: join.probe_projections.clone(),
141143
build_schema: join.build.output_schema()?,
144+
probe_schema: join.probe.output_schema()?,
142145
})
143146
}
144147

@@ -265,30 +268,50 @@ impl HashJoinDesc {
265268
&self,
266269
function_ctx: &FunctionContext,
267270
block_size: usize,
268-
) -> Result<FilterExecutor> {
269-
let offset = self.probe_projections.len();
271+
) -> Result<Option<FilterExecutor>> {
272+
let probe_len = self.probe_schema.num_fields();
270273

271-
let predicate = self
274+
let projection = self
275+
.probe_projections
276+
.iter()
277+
.copied()
278+
.chain(self.build_projection.iter().map(|idx| probe_len + *idx))
279+
.collect::<Vec<_>>();
280+
281+
let eq_predicates = self
272282
.probe_keys
273283
.iter()
274284
.zip(&self.build_keys)
275285
.map(|(probe, build)| {
276-
let build = build.project_column_ref(|old| Ok(old + offset))?;
286+
let build = build.project_column_ref(|old| Ok(old + probe_len))?;
277287
check_function(None, "eq", &[], &[probe.clone(), build], &BUILTIN_FUNCTIONS)
288+
});
289+
290+
let other_predicate = self.other_predicate.as_ref().map(|expr| {
291+
expr.project_column_ref(|index| {
292+
projection.get(*index).copied().ok_or_else(|| {
293+
ErrorCode::Internal(format!(
294+
"Invalid column index {} for nested loop predicate projection",
295+
index
296+
))
297+
})
278298
})
279-
.chain(self.other_predicate.iter().map(|expr| Ok(expr.clone())))
299+
});
300+
301+
let predicate = eq_predicates
302+
.chain(other_predicate)
280303
.reduce(|lhs, rhs| {
281304
check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS)
282305
})
283306
.unwrap()?;
284307

285-
Ok(FilterExecutor::new(
308+
Ok(Some(FilterExecutor::new(
286309
predicate,
287310
function_ctx.clone(),
288311
block_size,
289-
None,
312+
Some(projection),
290313
&BUILTIN_FUNCTIONS,
291314
false,
292-
))
315+
)))
293316
}
294317
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ impl Join for InnerHashJoin {
126126
))
127127
}
128128
HashJoinHashTable::NestedLoop(build_blocks) => {
129-
let probe_block = data.project(&self.desc.probe_projections);
130-
let nested = Box::new(LoopJoinStream::new(probe_block, build_blocks));
129+
let nested = Box::new(LoopJoinStream::new(data, build_blocks));
131130
return match self.nested_loop_filter.as_mut() {
132131
Some(filter_executor) => {
133132
Ok(InnerHashJoinFilterStream::create(nested, filter_executor))

0 commit comments

Comments
 (0)