Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
let predicate = self
.predicate
.as_ref()
.map(|p| logical2physical(p, &table_schema));
.map(|p| logical2physical(p, Arc::clone(&table_schema)));

let mut source = ParquetSource::new(table_schema);
if let Some(predicate) = predicate {
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,17 +1054,15 @@ impl DefaultPhysicalPlanner {
let (left, left_col_keys, left_projected) =
wrap_projection_for_join_if_necessary(
&left_keys,
original_left.as_ref().clone(),
Arc::clone(original_left),
)?;
let (right, right_col_keys, right_projected) =
wrap_projection_for_join_if_necessary(
&right_keys,
original_right.as_ref().clone(),
Arc::clone(original_right),
)?;
let column_on = (left_col_keys, right_col_keys);

let left = Arc::new(left);
let right = Arc::new(right);
let column_on = (left_col_keys, right_col_keys);
let (new_join, requalified) = Join::try_new_with_project_input(
node,
Arc::clone(&left),
Expand Down
36 changes: 20 additions & 16 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ mod test {

// A filter on "a" should not exclude any rows even if it matches the data
let expr = col("a").eq(lit(1));
let predicate = logical2physical(&expr, &schema);
let predicate = logical2physical(&expr, Arc::clone(&schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1359,7 +1359,7 @@ mod test {

// A filter on `b = 5.0` should exclude all rows
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
let predicate = logical2physical(&expr, &schema);
let predicate = logical2physical(&expr, Arc::clone(&schema));
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1405,7 +1405,8 @@ mod test {
let expr = col("part").eq(lit(1));
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1416,7 +1417,7 @@ mod test {
let expr = col("part").eq(lit(2));
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1472,7 +1473,7 @@ mod test {

// Filter should match the partition value and file statistics
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1481,7 +1482,7 @@ mod test {

// Should prune based on partition value but not file statistics
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1490,7 +1491,7 @@ mod test {

// Should prune based on file statistics but not partition value
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1499,7 +1500,7 @@ mod test {

// Should prune based on both partition value and file statistics
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1545,7 +1546,7 @@ mod test {

// Filter should match the partition value and data value
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1554,7 +1555,7 @@ mod test {

// Filter should match the partition value but not the data value
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1563,7 +1564,7 @@ mod test {

// Filter should not match the partition value but match the data value
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1572,7 +1573,7 @@ mod test {

// Filter should not match the partition value or the data value
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1625,7 +1626,7 @@ mod test {
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
// (the assumption is this happened already at planning time)
let expr = col("a").eq(lit(42));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1634,7 +1635,8 @@ mod test {

// If we make the filter dynamic, it should prune.
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1644,7 +1646,8 @@ mod test {
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
let expr = col("part").eq(lit(2));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1653,7 +1656,8 @@ mod test {

// Similarly a filter that combines partition and data columns should prune even if there are no stats.
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down
34 changes: 16 additions & 18 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,15 +487,13 @@ mod test {

let metadata = reader.metadata();

let table_schema =
let table_schema = Arc::new(
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema");
.expect("parsing schema"),
);

let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);

let table_schema = Arc::new(table_schema.clone());

let expr = logical2physical(&expr, Arc::clone(&table_schema));
let candidate = FilterCandidateBuilder::new(expr, table_schema)
.build(metadata)
.expect("building candidate");
Expand All @@ -516,23 +514,23 @@ mod test {

// This is the schema we would like to coerce to,
// which is different from the physical schema of the file.
let table_schema = Schema::new(vec![Field::new(
let table_schema = Arc::new(Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);
)]));

// Test all should fail
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = logical2physical(&expr, Arc::clone(&table_schema));
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");
Expand Down Expand Up @@ -565,10 +563,10 @@ mod test {
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = logical2physical(&expr, Arc::clone(&table_schema));
// Rewrite the expression to add CastExpr for type coercion
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema), Arc::clone(&file_schema))
.create(table_schema, Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema)
Expand All @@ -594,7 +592,7 @@ mod test {
let table_schema = Arc::new(get_lists_table_schema());

let expr = col("utf8_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
let expr = logical2physical(&expr, Arc::clone(&table_schema));
check_expression_can_evaluate_against_schema(&expr, &table_schema);

assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
Expand All @@ -612,22 +610,22 @@ mod test {

#[test]
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let table_schema = Arc::new(get_basic_table_schema());

let expr = col("string_col").is_null();
let expr = logical2physical(&expr, &table_schema);
let expr = logical2physical(&expr, Arc::clone(&table_schema));

assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

#[test]
fn complex_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let table_schema = Arc::new(get_basic_table_schema());

let expr = col("string_col")
.is_not_null()
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
let expr = logical2physical(&expr, &table_schema);
let expr = logical2physical(&expr, Arc::clone(&table_schema));

assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
Expand Down
Loading