Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2114,4 +2114,69 @@ mod tests {
"
)
}

/// Test for multiple EXISTS in OR clause followed by a join
/// Reproduces issue #20083
#[test]
fn exists_or_exists_with_join() -> Result<()> {
use crate::OptimizerContext;

let table_a = test_table_scan_with_name("a")?;
let table_b = test_table_scan_with_name("b")?;

// Create subquery for EXISTS clause from table a
let subquery_a = LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?)
.filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))?
.project(vec![lit(1)])?
.build()?;

// Create subquery for EXISTS clause from table b
let subquery_b = LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?)
.filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))?
.project(vec![lit(1)])?
.build()?;

// Build plan: scan(a) with filter (EXISTS subquery_a OR EXISTS subquery_b), then LEFT JOIN with scan(b)
let plan = LogicalPlanBuilder::from(table_a)
.filter(
datafusion_expr::Expr::BinaryExpr(datafusion_expr::BinaryExpr {
left: Box::new(datafusion_expr::Expr::Exists(
datafusion_expr::Exists::new(
datafusion_expr::Subquery {
subquery: Arc::new(subquery_a),
outer_ref_columns: vec![],
},
false,
),
)),
op: datafusion_expr::Operator::Or,
right: Box::new(datafusion_expr::Expr::Exists(
datafusion_expr::Exists::new(
datafusion_expr::Subquery {
subquery: Arc::new(subquery_b),
outer_ref_columns: vec![],
},
false,
),
)),
}),
)?
.join(
table_b,
datafusion_expr::JoinType::Left,
(vec!["a"], vec!["a"]),
None,
)?
.build()?;

// Run through the full optimizer to ensure it doesn't fail with schema mismatch
let optimizer = crate::Optimizer::new();
let config = OptimizerContext::new();
let optimized = optimizer.optimize(plan, &config, &crate::test::observe)?;

// If we get here without error, the fix is working
// The schema should be preserved correctly through optimization
assert!(optimized.schema().fields().len() > 0);
Ok(())
}
}
20 changes: 17 additions & 3 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,27 @@ fn split_join_requirements(
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftMark
| JoinType::RightMark => {
| JoinType::Full => {
// Decrease right side indices by `left_len` so that they point to valid
// positions within the right child:
indices.split_off(left_len)
}
// For LeftMark joins, the schema is [left columns, mark column]
// The mark column is at index `left_len` and is not from either child
JoinType::LeftMark => {
// Filter out the mark column (at index left_len) and route indices to left child only
// The right child in a LeftMark join only provides columns for the join condition
// and doesn't contribute to the output schema (except the synthetic mark column)
let (left_indices, _mark_and_beyond) = indices.split_off(left_len);
(left_indices, RequiredIndices::new())
}
// For RightMark joins, the schema is [right columns, mark column]
// The mark column is at the end and is not from either child
JoinType::RightMark => {
// Filter out the mark column (at the last index) and route indices to right child only
let (right_indices, _mark_and_beyond) = indices.split_off(left_len);
(RequiredIndices::new(), right_indices)
}
// All requirements can be re-routed to left child directly.
JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
// All requirements can be re-routed to right side directly.
Expand Down
Loading