From 6f474ad2380f6a3a7e03dac916569a34eacc5a14 Mon Sep 17 00:00:00 2001 From: rockyyin Date: Thu, 5 Feb 2026 13:16:42 +0800 Subject: [PATCH] feat(paimon-core): Add test cases for TableScan partition, bucket, level filters and list APIs --- .../paimon/table/source/TableScanTest.java | 330 ++++++++++++++++++ 1 file changed, 330 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 32f604130715..90b772604845 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.table.source; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.options.Options; import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -36,7 +38,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST; import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; @@ -539,4 +543,330 @@ public void testPushDownTopNOnlyNull() throws Exception { assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions)) .isNull(); } + + @Test + public void testPartitionFilter() throws Exception { + // Test partition filter functionality + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to multiple partitions + write.write(rowData(1, 10, 100L)); // partition pt=1 + write.write(rowData(1, 20, 200L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(2, 30, 300L)); // partition pt=2 + write.write(rowData(2, 40, 400L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(3, 50, 500L)); // partition pt=3 + commit.commit(2, write.prepareCommit(true, 2)); + + // Without partition filter - should return all data + TableScan.Plan planAll = table.newScan().plan(); + List resultAll = getResult(table.newRead(), planAll.splits()); + assertThat(resultAll.size()).isEqualTo(5); + + // Specify partition filter using Map + Map partitionSpec = new HashMap<>(); + partitionSpec.put("pt", "1"); + TableScan.Plan plan1 = table.newScan().withPartitionFilter(partitionSpec).plan(); + List result1 = getResult(table.newRead(), plan1.splits()); + assertThat(result1.size()).isEqualTo(2); + assertThat(result1).allMatch(s -> s.contains("1|")); + + // Specify partition filter using BinaryRow + TableScan.Plan plan2 = + table.newScan().withPartitionFilter(Collections.singletonList(binaryRow(2))).plan(); + List result2 = getResult(table.newRead(), plan2.splits()); + assertThat(result2.size()).isEqualTo(2); + assertThat(result2).allMatch(s -> s.contains("2|")); + + write.close(); + commit.close(); + } + + @Test + public void testBucketFilter() throws Exception { + // Create append-only table with multiple buckets directly + Options conf = new Options(); + conf.set(org.apache.paimon.CoreOptions.BUCKET, 3); + conf.set(org.apache.paimon.CoreOptions.BUCKET_KEY, "a"); + + // Use a new path to avoid schema conflict with the default primary key table + java.nio.file.Path newTempDir = java.nio.file.Files.createTempDirectory("junit"); + tablePath = new org.apache.paimon.fs.Path( + org.apache.paimon.utils.TraceableFileIO.SCHEME + "://" + newTempDir.toString()); + fileIO = org.apache.paimon.fs.FileIOFinder.find(tablePath); + table = createFileStoreTable(false, conf, tablePath); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to different buckets + for (int i = 0; i < 10; i++) { + write.write(rowData(1, i, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + + // Without bucket filter - should return all data + TableScan.Plan planAll = table.newScan().plan(); + assertThat(planAll.splits().size()).isEqualTo(10); + + // Use bucket filter - only return data from specified bucket + TableScan.Plan planBucket0 = table.newScan().withBucket(0).plan(); + assertThat(planBucket0.splits()).allMatch(split -> ((DataSplit) split).bucket() == 0); + + // Use bucketFilter - filter out specific buckets + TableScan.Plan planBucketFilter = + table.newScan().withBucketFilter(bucket -> bucket == 1 || bucket == 2).plan(); + assertThat(planBucketFilter.splits()) + .allMatch( + split -> { + int bucket = ((DataSplit) split).bucket(); + return bucket == 1 || bucket == 2; + }); + + write.close(); + commit.close(); + } + + @Test + public void testLevelFilter() throws Exception { + // Test level filter for primary key table + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to trigger compaction and produce files at different levels + for (int i = 0; i < 10; i++) { + write.write(rowData(1, i, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + + // Without level filter + TableScan.Plan planAll = table.newScan().plan(); + assertThat(planAll.splits().size()).isGreaterThan(0); + + // Use level filter - only return level 0 data + TableScan.Plan planLevel0 = table.newScan().withLevelFilter(level -> level == 0).plan(); + for (Split split : planLevel0.splits()) { + DataSplit dataSplit = (DataSplit) split; + assertThat(dataSplit.dataFiles()).allMatch(file -> file.level() == 0); + } + + write.close(); + commit.close(); + } + + @Test + public void testListPartitionEntries() throws Exception { + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to multiple partitions + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(2, 20, 200L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(3, 30, 300L)); + commit.commit(2, write.prepareCommit(true, 2)); + + // Test listPartitionEntries + List partitionEntries = table.newScan().listPartitionEntries(); + assertThat(partitionEntries.size()).isEqualTo(3); + + // Verify partition values + List partitionValues = + partitionEntries.stream() + .map(entry -> entry.partition().getInt(0)) + .sorted() + .collect(java.util.stream.Collectors.toList()); + assertThat(partitionValues).containsExactly(1, 2, 3); + + // Test listPartitions (convenience method) + List partitions = table.newScan().listPartitions(); + assertThat(partitions.size()).isEqualTo(3); + + write.close(); + commit.close(); + } + + @Test + public void testPrimaryKeyTableScan() throws Exception { + // Use existing primary key table (default table is primary key table) + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 20, 200L)); + commit.commit(0, write.prepareCommit(true, 0)); + + // Update data (primary key is pt, a) + write.write(rowData(1, 10, 101L)); // Update data for (1, 10) + commit.commit(1, write.prepareCommit(true, 1)); + + // Verify scan result - should only have the latest values + TableScan.Plan plan = table.newScan().plan(); + List result = getResult(table.newRead(), plan.splits()); + assertThat(result.size()).isEqualTo(2); + assertThat(result).containsExactlyInAnyOrder("+I 1|10|101", "+I 1|20|200"); + + // Delete data + write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 200L)); + commit.commit(2, write.prepareCommit(true, 2)); + + // Verify result after deletion + TableScan.Plan planAfterDelete = table.newScan().plan(); + List resultAfterDelete = getResult(table.newRead(), planAfterDelete.splits()); + assertThat(resultAfterDelete.size()).isEqualTo(1); + assertThat(resultAfterDelete).containsExactly("+I 1|10|101"); + + write.close(); + commit.close(); + } + + @Test + public void testEmptyTableScan() throws Exception { + // Test empty table scan + TableScan.Plan plan = table.newScan().plan(); + assertThat(plan.splits()).isEmpty(); + + // Partition list for empty table + List partitionEntries = table.newScan().listPartitionEntries(); + assertThat(partitionEntries).isEmpty(); + } + + @Test + public void testScanWithMultipleFilters() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write test data + for (int pt = 1; pt <= 3; pt++) { + for (int a = 1; a <= 10; a++) { + write.write(rowData(pt, a * 10, (long) pt * 1000 + a * 100)); + commit.commit(pt * 100 + a, write.prepareCommit(true, pt * 100 + a)); + } + } + + // Combine partition filter and column filter + Map partitionSpec = new HashMap<>(); + partitionSpec.put("pt", "2"); + + Predicate filter = + new PredicateBuilder(table.schema().logicalRowType()) + .greaterOrEqual(1, 50); // a >= 50 + + TableScan.Plan plan = + table.newScan().withPartitionFilter(partitionSpec).withFilter(filter).plan(); + + List result = getResult(table.newRead(), plan.splits()); + + // Verify result: only data with pt=2 and a >= 50 + assertThat(result).allMatch(s -> s.contains("2|")); + for (String r : result) { + String[] parts = r.split("\\|"); + int aValue = Integer.parseInt(parts[1].trim()); + assertThat(aValue).isGreaterThanOrEqualTo(50); + } + + write.close(); + commit.close(); + } + + @Test + public void testLimitWithPartitionFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to different partitions + for (int pt = 1; pt <= 3; pt++) { + for (int i = 0; i < 10; i++) { + write.write(rowData(pt, i, (long) pt * 1000 + i * 100)); + commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i)); + } + } + + // Use partition filter + limit + Map partitionSpec = new HashMap<>(); + partitionSpec.put("pt", "2"); + + TableScan.Plan plan = + table.newScan().withPartitionFilter(partitionSpec).withLimit(5).plan(); + + // Should return at most 5 splits (1 row per split) + assertThat(plan.splits().size()).isLessThanOrEqualTo(5); + + // All data should come from partition 2 + List result = getResult(table.newRead(), plan.splits()); + assertThat(result).allMatch(s -> s.contains("2|")); + + write.close(); + commit.close(); + } + + @Test + public void testScanAfterCompaction() throws Exception { + // Test scan after compaction for primary key table + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data with same primary key multiple times to trigger compaction + for (int i = 0; i < 5; i++) { + write.write(rowData(1, 10, 100L + i)); + commit.commit(i, write.prepareCommit(true, i)); + } + + // Scan result should only have the latest value + TableScan.Plan plan = table.newScan().plan(); + List result = getResult(table.newRead(), plan.splits()); + assertThat(result.size()).isEqualTo(1); + assertThat(result).containsExactly("+I 1|10|104"); // latest value + + write.close(); + commit.close(); + } + + @Test + public void testTopNWithPartitionFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write data to different partitions + for (int pt = 1; pt <= 2; pt++) { + for (int i = 1; i <= 5; i++) { + write.write(rowData(pt, i * 10, (long) pt * 1000 + i * 100)); + commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i)); + } + } + + // Combine partition filter and TopN + Map partitionSpec = new HashMap<>(); + partitionSpec.put("pt", "1"); + + DataField field = table.schema().fields().get(1); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + + TableScan.Plan plan = + table.newScan() + .withPartitionFilter(partitionSpec) + .withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 2)) + .plan(); + + // Verify result: only pt=1 data, and top 2 + List splits = plan.splits(); + assertThat(splits.size()).isLessThanOrEqualTo(2); + + write.close(); + commit.close(); + } }