diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index b9d160d55589f..17f5ac8bc9bc2 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -2114,4 +2114,69 @@ mod tests { " ) } + + /// Test for multiple EXISTS in OR clause followed by a join + /// Reproduces issue #20083 + #[test] + fn exists_or_exists_with_join() -> Result<()> { + use crate::OptimizerContext; + + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + + // Create subquery for EXISTS clause from table a + let subquery_a = LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?) + .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))? + .project(vec![lit(1)])? + .build()?; + + // Create subquery for EXISTS clause from table b + let subquery_b = LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?) + .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))? + .project(vec![lit(1)])? + .build()?; + + // Build plan: scan(a) with filter (EXISTS subquery_a OR EXISTS subquery_b), then LEFT JOIN with scan(b) + let plan = LogicalPlanBuilder::from(table_a) + .filter( + datafusion_expr::Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(datafusion_expr::Expr::Exists( + datafusion_expr::Exists::new( + datafusion_expr::Subquery { + subquery: Arc::new(subquery_a), + outer_ref_columns: vec![], + }, + false, + ), + )), + op: datafusion_expr::Operator::Or, + right: Box::new(datafusion_expr::Expr::Exists( + datafusion_expr::Exists::new( + datafusion_expr::Subquery { + subquery: Arc::new(subquery_b), + outer_ref_columns: vec![], + }, + false, + ), + )), + }), + )? + .join( + table_b, + datafusion_expr::JoinType::Left, + (vec!["a"], vec!["a"]), + None, + )? + .build()?; + + // Run through the full optimizer to ensure it doesn't fail with schema mismatch + let optimizer = crate::Optimizer::new(); + let config = OptimizerContext::new(); + let optimized = optimizer.optimize(plan, &config, &crate::test::observe)?; + + // If we get here without error, the fix is working + // The schema should be preserved correctly through optimization + assert!(optimized.schema().fields().len() > 0); + Ok(()) + } } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index f97b05ea68fbd..b408eeeda91dd 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -759,13 +759,27 @@ fn split_join_requirements( JoinType::Inner | JoinType::Left | JoinType::Right - | JoinType::Full - | JoinType::LeftMark - | JoinType::RightMark => { + | JoinType::Full => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: indices.split_off(left_len) } + // For LeftMark joins, the schema is [left columns, mark column] + // The mark column is at index `left_len` and is not from either child + JoinType::LeftMark => { + // Filter out the mark column (at index left_len) and route indices to left child only + // The right child in a LeftMark join only provides columns for the join condition + // and doesn't contribute to the output schema (except the synthetic mark column) + let (left_indices, _mark_and_beyond) = indices.split_off(left_len); + (left_indices, RequiredIndices::new()) + } + // For RightMark joins, the schema is [right columns, mark column] + // The mark column is at the end and is not from either child + JoinType::RightMark => { + // Filter out the mark column (at the last index) and route indices to right child only + let (right_indices, _mark_and_beyond) = indices.split_off(left_len); + (RequiredIndices::new(), right_indices) + } // All requirements can be re-routed to left child directly. JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()), // All requirements can be re-routed to right side directly.