Skip to content

Commit 00fbe5f

Browse files
committed
Don't clone the schema in logical2physical
1 parent b1acfb5 commit 00fbe5f

File tree

10 files changed

+204
-161
lines changed

10 files changed

+204
-161
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ impl ParquetTestCase {
285285

286286
if let Some(predicate) = &self.predicate {
287287
let filter_expr =
288-
logical2physical(predicate, self.table_schema.table_schema());
288+
logical2physical(predicate, Arc::clone(self.table_schema.table_schema()));
289289
let mut config = ConfigOptions::default();
290290
config.execution.parquet.pushdown_filters = self.push_down_filters;
291291
let result = file_source.try_pushdown_filters(vec![filter_expr], &config)?;

datafusion/datasource-parquet/src/opener.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ mod test {
961961

962962
// A filter on "a" should not exclude any rows even if it matches the data
963963
let expr = col("a").eq(lit(1));
964-
let predicate = logical2physical(&expr, &schema);
964+
let predicate = logical2physical(&expr, Arc::clone(&schema));
965965
let opener = make_opener(predicate);
966966
let stream = opener.open(file.clone()).unwrap().await.unwrap();
967967
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -970,7 +970,7 @@ mod test {
970970

971971
// A filter on `b = 5.0` should exclude all rows
972972
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
973-
let predicate = logical2physical(&expr, &schema);
973+
let predicate = logical2physical(&expr, Arc::clone(&schema));
974974
let opener = make_opener(predicate);
975975
let stream = opener.open(file).unwrap().await.unwrap();
976976
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1034,7 +1034,8 @@ mod test {
10341034
let expr = col("part").eq(lit(1));
10351035
// Mark the expression as dynamic even if it's not to force partition pruning to happen
10361036
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1037-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1037+
let predicate =
1038+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
10381039
let opener = make_opener(predicate);
10391040
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10401041
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1045,7 +1046,7 @@ mod test {
10451046
let expr = col("part").eq(lit(2));
10461047
// Mark the expression as dynamic even if it's not to force partition pruning to happen
10471048
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1048-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1049+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
10491050
let opener = make_opener(predicate);
10501051
let stream = opener.open(file).unwrap().await.unwrap();
10511052
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1119,7 +1120,7 @@ mod test {
11191120

11201121
// Filter should match the partition value and file statistics
11211122
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1122-
let predicate = logical2physical(&expr, &table_schema);
1123+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11231124
let opener = make_opener(predicate);
11241125
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11251126
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1128,7 +1129,7 @@ mod test {
11281129

11291130
// Should prune based on partition value but not file statistics
11301131
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1131-
let predicate = logical2physical(&expr, &table_schema);
1132+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11321133
let opener = make_opener(predicate);
11331134
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11341135
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1137,7 +1138,7 @@ mod test {
11371138

11381139
// Should prune based on file statistics but not partition value
11391140
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1140-
let predicate = logical2physical(&expr, &table_schema);
1141+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11411142
let opener = make_opener(predicate);
11421143
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11431144
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1146,7 +1147,7 @@ mod test {
11461147

11471148
// Should prune based on both partition value and file statistics
11481149
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1149-
let predicate = logical2physical(&expr, &table_schema);
1150+
let predicate = logical2physical(&expr, table_schema);
11501151
let opener = make_opener(predicate);
11511152
let stream = opener.open(file).unwrap().await.unwrap();
11521153
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1209,7 +1210,7 @@ mod test {
12091210

12101211
// Filter should match the partition value and data value
12111212
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1212-
let predicate = logical2physical(&expr, &table_schema);
1213+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
12131214
let opener = make_opener(predicate);
12141215
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12151216
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1218,7 +1219,7 @@ mod test {
12181219

12191220
// Filter should match the partition value but not the data value
12201221
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1221-
let predicate = logical2physical(&expr, &table_schema);
1222+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
12221223
let opener = make_opener(predicate);
12231224
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12241225
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1227,7 +1228,7 @@ mod test {
12271228

12281229
// Filter should not match the partition value but match the data value
12291230
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1230-
let predicate = logical2physical(&expr, &table_schema);
1231+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
12311232
let opener = make_opener(predicate);
12321233
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12331234
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1236,7 +1237,7 @@ mod test {
12361237

12371238
// Filter should not match the partition value or the data value
12381239
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1239-
let predicate = logical2physical(&expr, &table_schema);
1240+
let predicate = logical2physical(&expr, table_schema);
12401241
let opener = make_opener(predicate);
12411242
let stream = opener.open(file).unwrap().await.unwrap();
12421243
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1308,7 +1309,7 @@ mod test {
13081309
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
13091310
// (the assumption is this happened already at planning time)
13101311
let expr = col("a").eq(lit(42));
1311-
let predicate = logical2physical(&expr, &table_schema);
1312+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
13121313
let opener = make_opener(predicate);
13131314
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13141315
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1317,7 +1318,8 @@ mod test {
13171318

13181319
// If we make the filter dynamic, it should prune.
13191320
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
1320-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1321+
let predicate =
1322+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
13211323
let opener = make_opener(predicate);
13221324
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13231325
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1327,7 +1329,8 @@ mod test {
13271329
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
13281330
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
13291331
let expr = col("part").eq(lit(2));
1330-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1332+
let predicate =
1333+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
13311334
let opener = make_opener(predicate);
13321335
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13331336
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1336,7 +1339,8 @@ mod test {
13361339

13371340
// Similarly a filter that combines partition and data columns should prune even if there are no stats.
13381341
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
1339-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1342+
let predicate =
1343+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
13401344
let opener = make_opener(predicate);
13411345
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13421346
let (num_batches, num_rows) = count_batches_and_rows(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -487,15 +487,13 @@ mod test {
487487

488488
let metadata = reader.metadata();
489489

490-
let table_schema =
490+
let table_schema = Arc::new(
491491
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
492-
.expect("parsing schema");
492+
.expect("parsing schema"),
493+
);
493494

494495
let expr = col("int64_list").is_not_null();
495-
let expr = logical2physical(&expr, &table_schema);
496-
497-
let table_schema = Arc::new(table_schema.clone());
498-
496+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
499497
let candidate = FilterCandidateBuilder::new(expr, table_schema)
500498
.build(metadata)
501499
.expect("building candidate");
@@ -516,23 +514,23 @@ mod test {
516514

517515
// This is the schema we would like to coerce to,
518516
// which is different from the physical schema of the file.
519-
let table_schema = Schema::new(vec![Field::new(
517+
let table_schema = Arc::new(Schema::new(vec![Field::new(
520518
"timestamp_col",
521519
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
522520
false,
523-
)]);
521+
)]));
524522

525523
// Test all should fail
526524
let expr = col("timestamp_col").lt(Expr::Literal(
527525
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
528526
None,
529527
));
530-
let expr = logical2physical(&expr, &table_schema);
528+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
531529
let expr = DefaultPhysicalExprAdapterFactory {}
532-
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
530+
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
533531
.rewrite(expr)
534532
.expect("rewriting expression");
535-
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
533+
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
536534
.build(&metadata)
537535
.expect("building candidate")
538536
.expect("candidate expected");
@@ -565,10 +563,10 @@ mod test {
565563
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
566564
None,
567565
));
568-
let expr = logical2physical(&expr, &table_schema);
566+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
569567
// Rewrite the expression to add CastExpr for type coercion
570568
let expr = DefaultPhysicalExprAdapterFactory {}
571-
.create(Arc::new(table_schema), Arc::clone(&file_schema))
569+
.create(table_schema, Arc::clone(&file_schema))
572570
.rewrite(expr)
573571
.expect("rewriting expression");
574572
let candidate = FilterCandidateBuilder::new(expr, file_schema)
@@ -594,7 +592,7 @@ mod test {
594592
let table_schema = Arc::new(get_lists_table_schema());
595593

596594
let expr = col("utf8_list").is_not_null();
597-
let expr = logical2physical(&expr, &table_schema);
595+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
598596
check_expression_can_evaluate_against_schema(&expr, &table_schema);
599597

600598
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -612,22 +610,22 @@ mod test {
612610

613611
#[test]
614612
fn basic_expr_doesnt_prevent_pushdown() {
615-
let table_schema = get_basic_table_schema();
613+
let table_schema = Arc::new(get_basic_table_schema());
616614

617615
let expr = col("string_col").is_null();
618-
let expr = logical2physical(&expr, &table_schema);
616+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
619617

620618
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
621619
}
622620

623621
#[test]
624622
fn complex_expr_doesnt_prevent_pushdown() {
625-
let table_schema = get_basic_table_schema();
623+
let table_schema = Arc::new(get_basic_table_schema());
626624

627625
let expr = col("string_col")
628626
.is_not_null()
629627
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
630-
let expr = logical2physical(&expr, &table_schema);
628+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
631629

632630
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
633631
}

0 commit comments

Comments
 (0)