Skip to content

Commit cd8bbb4

Browse files
committed
Don't clone the schema in logical2physical
1 parent 12aba71 commit cd8bbb4

File tree

9 files changed

+197
-156
lines changed

9 files changed

+197
-156
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/src/opener.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,7 @@ mod test {
903903

904904
// A filter on "a" should not exclude any rows even if it matches the data
905905
let expr = col("a").eq(lit(1));
906-
let predicate = logical2physical(&expr, &schema);
906+
let predicate = logical2physical(&expr, Arc::clone(&schema));
907907
let opener = make_opener(predicate);
908908
let stream = opener.open(file.clone()).unwrap().await.unwrap();
909909
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -912,7 +912,7 @@ mod test {
912912

913913
// A filter on `b = 5.0` should exclude all rows
914914
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
915-
let predicate = logical2physical(&expr, &schema);
915+
let predicate = logical2physical(&expr, Arc::clone(&schema));
916916
let opener = make_opener(predicate);
917917
let stream = opener.open(file).unwrap().await.unwrap();
918918
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -978,7 +978,8 @@ mod test {
978978
let expr = col("part").eq(lit(1));
979979
// Mark the expression as dynamic even if it's not to force partition pruning to happen
980980
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
981-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
981+
let predicate =
982+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
982983
let opener = make_opener(predicate);
983984
let stream = opener.open(file.clone()).unwrap().await.unwrap();
984985
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -989,7 +990,7 @@ mod test {
989990
let expr = col("part").eq(lit(2));
990991
// Mark the expression as dynamic even if it's not to force partition pruning to happen
991992
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
992-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
993+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
993994
let opener = make_opener(predicate);
994995
let stream = opener.open(file).unwrap().await.unwrap();
995996
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1065,7 +1066,7 @@ mod test {
10651066

10661067
// Filter should match the partition value and file statistics
10671068
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1068-
let predicate = logical2physical(&expr, &table_schema);
1069+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10691070
let opener = make_opener(predicate);
10701071
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10711072
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1074,7 +1075,7 @@ mod test {
10741075

10751076
// Should prune based on partition value but not file statistics
10761077
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1077-
let predicate = logical2physical(&expr, &table_schema);
1078+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10781079
let opener = make_opener(predicate);
10791080
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10801081
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1083,7 +1084,7 @@ mod test {
10831084

10841085
// Should prune based on file statistics but not partition value
10851086
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1086-
let predicate = logical2physical(&expr, &table_schema);
1087+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10871088
let opener = make_opener(predicate);
10881089
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10891090
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1092,7 +1093,7 @@ mod test {
10921093

10931094
// Should prune based on both partition value and file statistics
10941095
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1095-
let predicate = logical2physical(&expr, &table_schema);
1096+
let predicate = logical2physical(&expr, table_schema);
10961097
let opener = make_opener(predicate);
10971098
let stream = opener.open(file).unwrap().await.unwrap();
10981099
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1157,7 +1158,7 @@ mod test {
11571158

11581159
// Filter should match the partition value and data value
11591160
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1160-
let predicate = logical2physical(&expr, &table_schema);
1161+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11611162
let opener = make_opener(predicate);
11621163
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11631164
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1166,7 +1167,7 @@ mod test {
11661167

11671168
// Filter should match the partition value but not the data value
11681169
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1169-
let predicate = logical2physical(&expr, &table_schema);
1170+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11701171
let opener = make_opener(predicate);
11711172
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11721173
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1175,7 +1176,7 @@ mod test {
11751176

11761177
// Filter should not match the partition value but match the data value
11771178
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1178-
let predicate = logical2physical(&expr, &table_schema);
1179+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11791180
let opener = make_opener(predicate);
11801181
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11811182
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1184,7 +1185,7 @@ mod test {
11841185

11851186
// Filter should not match the partition value or the data value
11861187
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1187-
let predicate = logical2physical(&expr, &table_schema);
1188+
let predicate = logical2physical(&expr, table_schema);
11881189
let opener = make_opener(predicate);
11891190
let stream = opener.open(file).unwrap().await.unwrap();
11901191
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1249,15 +1250,15 @@ mod test {
12491250

12501251
// Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
12511252
let expr = col("part").eq(lit(2));
1252-
let predicate = logical2physical(&expr, &table_schema);
1253+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
12531254
let opener = make_opener(predicate);
12541255
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12551256
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
12561257
assert_eq!(num_batches, 1);
12571258
assert_eq!(num_rows, 3);
12581259

12591260
// If we make the filter dynamic, it should prune
1260-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1261+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
12611262
let opener = make_opener(predicate);
12621263
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12631264
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1396,7 +1397,8 @@ mod test {
13961397
max_predicate_cache_size: None,
13971398
};
13981399

1399-
let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
1400+
let predicate =
1401+
logical2physical(&col("a").eq(lit(1u64)), Arc::clone(&table_schema));
14001402
let opener = make_opener(predicate);
14011403
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14021404
let batches = collect_batches(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -512,19 +512,18 @@ mod test {
512512

513513
let metadata = reader.metadata();
514514

515-
let table_schema =
515+
let table_schema = Arc::new(
516516
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
517-
.expect("parsing schema");
517+
.expect("parsing schema"),
518+
);
518519

519520
let expr = col("int64_list").is_not_null();
520-
let expr = logical2physical(&expr, &table_schema);
521+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
521522

522523
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
523-
let table_schema = Arc::new(table_schema.clone());
524-
525524
let candidate = FilterCandidateBuilder::new(
526525
expr,
527-
table_schema.clone(),
526+
Arc::clone(&table_schema),
528527
table_schema,
529528
schema_adapter_factory,
530529
)
@@ -547,24 +546,23 @@ mod test {
547546

548547
// This is the schema we would like to coerce to,
549548
// which is different from the physical schema of the file.
550-
let table_schema = Schema::new(vec![Field::new(
549+
let table_schema = Arc::new(Schema::new(vec![Field::new(
551550
"timestamp_col",
552551
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
553552
false,
554-
)]);
553+
)]));
555554

556555
// Test all should fail
557556
let expr = col("timestamp_col").lt(Expr::Literal(
558557
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
559558
None,
560559
));
561-
let expr = logical2physical(&expr, &table_schema);
560+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
562561
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
563-
let table_schema = Arc::new(table_schema.clone());
564562
let candidate = FilterCandidateBuilder::new(
565563
expr,
566-
file_schema.clone(),
567-
table_schema.clone(),
564+
Arc::clone(&file_schema),
565+
Arc::clone(&table_schema),
568566
schema_adapter_factory,
569567
)
570568
.build(&metadata)
@@ -599,7 +597,7 @@ mod test {
599597
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
600598
None,
601599
));
602-
let expr = logical2physical(&expr, &table_schema);
600+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
603601
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
604602
let candidate = FilterCandidateBuilder::new(
605603
expr,
@@ -629,7 +627,7 @@ mod test {
629627
let table_schema = Arc::new(get_lists_table_schema());
630628

631629
let expr = col("utf8_list").is_not_null();
632-
let expr = logical2physical(&expr, &table_schema);
630+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
633631
check_expression_can_evaluate_against_schema(&expr, &table_schema);
634632

635633
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -647,22 +645,22 @@ mod test {
647645

648646
#[test]
649647
fn basic_expr_doesnt_prevent_pushdown() {
650-
let table_schema = get_basic_table_schema();
648+
let table_schema = Arc::new(get_basic_table_schema());
651649

652650
let expr = col("string_col").is_null();
653-
let expr = logical2physical(&expr, &table_schema);
651+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
654652

655653
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
656654
}
657655

658656
#[test]
659657
fn complex_expr_doesnt_prevent_pushdown() {
660-
let table_schema = get_basic_table_schema();
658+
let table_schema = Arc::new(get_basic_table_schema());
661659

662660
let expr = col("string_col")
663661
.is_not_null()
664662
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
665-
let expr = logical2physical(&expr, &table_schema);
663+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
666664

667665
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
668666
}

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ mod tests {
518518
let schema =
519519
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
520520
let expr = col("c1").gt(lit(15));
521-
let expr = logical2physical(&expr, &schema);
521+
let expr = logical2physical(&expr, Arc::clone(&schema));
522522
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
523523

524524
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
@@ -563,7 +563,7 @@ mod tests {
563563
let schema =
564564
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
565565
let expr = col("c1").gt(lit(15));
566-
let expr = logical2physical(&expr, &schema);
566+
let expr = logical2physical(&expr, Arc::clone(&schema));
567567
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
568568

569569
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
@@ -606,7 +606,7 @@ mod tests {
606606
Field::new("c2", DataType::Int32, false),
607607
]));
608608
let expr = col("c1").gt(lit(15)).and(col("c2").rem(lit(2)).eq(lit(0)));
609-
let expr = logical2physical(&expr, &schema);
609+
let expr = logical2physical(&expr, Arc::clone(&schema));
610610
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
611611

612612
let schema_descr = get_test_schema_descr(vec![
@@ -645,7 +645,7 @@ mod tests {
645645
// if conditions in predicate are joined with OR and an unsupported expression is used
646646
// this bypasses the entire predicate expression and no row groups are filtered out
647647
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
648-
let expr = logical2physical(&expr, &schema);
648+
let expr = logical2physical(&expr, Arc::clone(&schema));
649649
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
650650

651651
// if conditions in predicate are joined with OR and an unsupported expression is used
@@ -671,7 +671,7 @@ mod tests {
671671
Field::new("c2", DataType::Int32, false),
672672
]));
673673
let expr = col("c1").gt(lit(0));
674-
let expr = logical2physical(&expr, &table_schema);
674+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
675675
let pruning_predicate =
676676
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
677677

@@ -749,7 +749,7 @@ mod tests {
749749
]));
750750
let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
751751
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
752-
let expr = logical2physical(&expr, &schema);
752+
let expr = logical2physical(&expr, Arc::clone(&schema));
753753
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
754754
let groups = gen_row_group_meta_data_for_pruning_predicate();
755755

@@ -780,7 +780,7 @@ mod tests {
780780
let expr = col("c1")
781781
.gt(lit(15))
782782
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
783-
let expr = logical2physical(&expr, &schema);
783+
let expr = logical2physical(&expr, Arc::clone(&schema));
784784
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
785785
let groups = gen_row_group_meta_data_for_pruning_predicate();
786786

@@ -818,7 +818,7 @@ mod tests {
818818
.with_precision(9);
819819
let schema_descr = get_test_schema_descr(vec![field]);
820820
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
821-
let expr = logical2physical(&expr, &schema);
821+
let expr = logical2physical(&expr, Arc::clone(&schema));
822822
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
823823
let rgm1 = get_row_group_meta_data(
824824
&schema_descr,
@@ -889,7 +889,7 @@ mod tests {
889889
lit(ScalarValue::Decimal128(Some(500), 5, 2)),
890890
Decimal128(11, 2),
891891
));
892-
let expr = logical2physical(&expr, &schema);
892+
let expr = logical2physical(&expr, Arc::clone(&schema));
893893
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
894894
let rgm1 = get_row_group_meta_data(
895895
&schema_descr,
@@ -981,7 +981,7 @@ mod tests {
981981
.with_precision(18);
982982
let schema_descr = get_test_schema_descr(vec![field]);
983983
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
984-
let expr = logical2physical(&expr, &schema);
984+
let expr = logical2physical(&expr, Arc::clone(&schema));
985985
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
986986
let rgm1 = get_row_group_meta_data(
987987
&schema_descr,
@@ -1042,7 +1042,7 @@ mod tests {
10421042
// cast the type of c1 to decimal(28,3)
10431043
let left = cast(col("c1"), Decimal128(28, 3));
10441044
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1045-
let expr = logical2physical(&expr, &schema);
1045+
let expr = logical2physical(&expr, Arc::clone(&schema));
10461046
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
10471047
// we must use the big-endian when encode the i128 to bytes or vec[u8].
10481048
let rgm1 = get_row_group_meta_data(
@@ -1120,7 +1120,7 @@ mod tests {
11201120
// cast the type of c1 to decimal(28,3)
11211121
let left = cast(col("c1"), Decimal128(28, 3));
11221122
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1123-
let expr = logical2physical(&expr, &schema);
1123+
let expr = logical2physical(&expr, Arc::clone(&schema));
11241124
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
11251125
// we must use the big-endian when encode the i128 to bytes or vec[u8].
11261126
let rgm1 = get_row_group_meta_data(
@@ -1283,17 +1283,20 @@ mod tests {
12831283
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
12841284

12851285
// generate pruning predicate
1286-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1286+
let schema = Arc::new(Schema::new(vec![Field::new(
1287+
"String",
1288+
DataType::Utf8,
1289+
false,
1290+
)]));
12871291

12881292
let expr = col(r#""String""#).in_list(
12891293
(1..25)
12901294
.map(|i| lit(format!("Hello_Not_Exists{i}")))
12911295
.collect::<Vec<_>>(),
12921296
false,
12931297
);
1294-
let expr = logical2physical(&expr, &schema);
1295-
let pruning_predicate =
1296-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1298+
let expr = logical2physical(&expr, Arc::clone(&schema));
1299+
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
12971300

12981301
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
12991302
file_name,
@@ -1511,9 +1514,9 @@ mod tests {
15111514
let path = format!("{testdata}/{file_name}");
15121515
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
15131516

1514-
let expr = logical2physical(&expr, &schema);
1515-
let pruning_predicate =
1516-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1517+
let schema = Arc::new(schema);
1518+
let expr = logical2physical(&expr, Arc::clone(&schema));
1519+
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
15171520

15181521
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
15191522
&file_name,

0 commit comments

Comments
 (0)