Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 165 additions & 2 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec,
sort_preserving_merge_exec, stream_exec,
coalesce_partitions_exec, global_limit_exec, hash_join_exec, local_limit_exec,
sort_exec, sort_preserving_merge_exec, stream_exec,
};

use arrow::compute::SortOptions;
Expand All @@ -29,6 +29,7 @@ use datafusion_common::error::Result;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
Expand Down Expand Up @@ -161,6 +162,168 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
Ok(())
}

fn join_on_columns(
left_col: &str,
right_col: &str,
) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
vec![(
Arc::new(datafusion_physical_expr::expressions::Column::new(
left_col, 0,
)) as _,
Arc::new(datafusion_physical_expr::expressions::Column::new(
right_col, 0,
)) as _,
)]
}

#[test]
fn absorbs_limit_into_hash_join_inner() -> Result<()> {
// HashJoinExec with Inner join should absorb limit via with_fetch
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
let global_limit = global_limit_exec(hash_join, 0, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// The limit should be absorbed by the hash join (not pushed to children)
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_into_hash_join_right() -> Result<()> {
// HashJoinExec with Right join should absorb limit via with_fetch
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?;
let global_limit = global_limit_exec(hash_join, 0, Some(10));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=10
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// The limit should be absorbed by the hash join
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_into_hash_join_left() -> Result<()> {
// during probing, then unmatched rows at the end, stopping when limit is reached
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?;
let global_limit = global_limit_exec(hash_join, 0, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=5
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// Left join now absorbs the limit
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_with_skip_into_hash_join() -> Result<()> {
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
let global_limit = global_limit_exec(hash_join, 3, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=3, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=3, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {
let schema = create_schema();
Expand Down
Loading
Loading