Skip to content

optimize_projections fails after mark-join involved #20083

@yjerry-fortinet

Description

@yjerry-fortinet

Describe the bug

optimize failed if a plan with filter: Filter: EXISTS (<subquery>) OR EXISTS (<subquery>)

To Reproduce

use datafusion::logical_expr::expr::Exists;
use datafusion::logical_expr::utils::disjunction;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableSource};
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
use datafusion::prelude::{Expr, lit};
use datafusion::{
    arrow::datatypes::{DataType, Field, Schema},
    datasource::{DefaultTableSource, MemTable},
};
use datafusion_common::{Column, JoinType};
use std::sync::Arc;

async fn get_table() -> datafusion_common::Result<Arc<dyn TableSource>> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("uid", DataType::Int32, false),
        Field::new("val", DataType::Int32, false),
    ]));
    let provider = MemTable::try_new(schema.clone(), vec![vec![]])?;
    let table = Arc::new(DefaultTableSource::new(Arc::new(provider)));

    Ok(table)
}

async fn build_exists_expr(table_name: &str) -> datafusion_common::Result<Expr> {
    let table = get_table().await?;

    let scan = LogicalPlanBuilder::scan(table_name, table.clone(), None)?
        .project(vec![lit(1)])?
        .build()?;

    let subquery = datafusion::logical_expr::Subquery {
        subquery: Arc::new(scan),
        outer_ref_columns: vec![],
        spans: Default::default(),
    };

    Ok(Expr::Exists(Exists::new(subquery, false)))
}

async fn build_plan() -> datafusion_common::Result<LogicalPlan> {
    let exists_a = build_exists_expr("a").await?;
    let exists_b = build_exists_expr("b").await?;

    let filter = disjunction(vec![exists_a, exists_b]);

    let mut scan_a =
        LogicalPlanBuilder::scan("a", get_table().await?, None)?.filter(filter.unwrap())?;
    
    // this test case will pass if un-comment this line
    // scan_a = scan_a.project(vec![Expr::Column(Column::from_name("uid"))])?;
    
    let scan_b = LogicalPlanBuilder::scan("b", get_table().await?, None)?.build()?;

    let plan = scan_a
        .join(
            scan_b,
            JoinType::Left,
            (vec!["uid".to_string()], vec!["uid".to_string()]),
            None,
        )?
        .build()?;

    let optimizer = Optimizer::new();
    let config = OptimizerContext::new().with_max_passes(16);
    let ret = optimizer.optimize(plan, &config, observer);
    if ret.is_err() {
        println!("ret = {:#?}", ret);
    }

    fn observer(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
        println!(
            "\nAfter applying rule '{}':\n{}",
            rule.name(),
            plan.display_indent()
        )
    }

    ret
}


#[tokio::test]
async fn test_build_plan() {
    let ret = build_plan().await;
    assert!(ret.is_ok());
}

Expected behavior

this test case should pass

Actual Behavior

ret = Err(
    Context(
        "Optimizer rule 'optimize_projections' failed",
        Context(
            "Check optimizer-specific invariants after optimizer rule: optimize_projections",
            Internal(
                "Failed due to a difference in schemas: original schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"uid\", data_type: Int32, nullable: true }, Field { name: \"val\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"b\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"uid\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"__correlated_sq_1\" }), Some(Bare { table: \"__correlated_sq_2\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }",
            ),
        ),
    ),
)

thread 'test_build_plan' (2129566) panicked at src/main.rs:86:5:
assertion failed: ret.is_ok()
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test_build_plan

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s


Additional context

This case will not fail if add the projection to scan_a

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions