-
Notifications
You must be signed in to change notification settings - Fork 2k
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
optimize failed if a plan with filter: Filter: EXISTS (<subquery>) OR EXISTS (<subquery>)
To Reproduce
use datafusion::logical_expr::expr::Exists;
use datafusion::logical_expr::utils::disjunction;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableSource};
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
use datafusion::prelude::{Expr, lit};
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datasource::{DefaultTableSource, MemTable},
};
use datafusion_common::{Column, JoinType};
use std::sync::Arc;
async fn get_table() -> datafusion_common::Result<Arc<dyn TableSource>> {
let schema = Arc::new(Schema::new(vec![
Field::new("uid", DataType::Int32, false),
Field::new("val", DataType::Int32, false),
]));
let provider = MemTable::try_new(schema.clone(), vec![vec![]])?;
let table = Arc::new(DefaultTableSource::new(Arc::new(provider)));
Ok(table)
}
async fn build_exists_expr(table_name: &str) -> datafusion_common::Result<Expr> {
let table = get_table().await?;
let scan = LogicalPlanBuilder::scan(table_name, table.clone(), None)?
.project(vec![lit(1)])?
.build()?;
let subquery = datafusion::logical_expr::Subquery {
subquery: Arc::new(scan),
outer_ref_columns: vec![],
spans: Default::default(),
};
Ok(Expr::Exists(Exists::new(subquery, false)))
}
async fn build_plan() -> datafusion_common::Result<LogicalPlan> {
let exists_a = build_exists_expr("a").await?;
let exists_b = build_exists_expr("b").await?;
let filter = disjunction(vec![exists_a, exists_b]);
let mut scan_a =
LogicalPlanBuilder::scan("a", get_table().await?, None)?.filter(filter.unwrap())?;
// this test case will pass if un-comment this line
// scan_a = scan_a.project(vec![Expr::Column(Column::from_name("uid"))])?;
let scan_b = LogicalPlanBuilder::scan("b", get_table().await?, None)?.build()?;
let plan = scan_a
.join(
scan_b,
JoinType::Left,
(vec!["uid".to_string()], vec!["uid".to_string()]),
None,
)?
.build()?;
let optimizer = Optimizer::new();
let config = OptimizerContext::new().with_max_passes(16);
let ret = optimizer.optimize(plan, &config, observer);
if ret.is_err() {
println!("ret = {:#?}", ret);
}
fn observer(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
"\nAfter applying rule '{}':\n{}",
rule.name(),
plan.display_indent()
)
}
ret
}
#[tokio::test]
async fn test_build_plan() {
let ret = build_plan().await;
assert!(ret.is_ok());
}
Expected behavior
this test case should pass
Actual Behavior
ret = Err(
Context(
"Optimizer rule 'optimize_projections' failed",
Context(
"Check optimizer-specific invariants after optimizer rule: optimize_projections",
Internal(
"Failed due to a difference in schemas: original schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"uid\", data_type: Int32, nullable: true }, Field { name: \"val\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"b\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"uid\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"__correlated_sq_1\" }), Some(Bare { table: \"__correlated_sq_2\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }",
),
),
),
)
thread 'test_build_plan' (2129566) panicked at src/main.rs:86:5:
assertion failed: ret.is_ok()
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
test_build_plan
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s
Additional context
This case will not fail if add the projection to scan_a
No response
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working