Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.table.source;

import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand All @@ -36,7 +38,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
Expand Down Expand Up @@ -539,4 +543,330 @@ public void testPushDownTopNOnlyNull() throws Exception {
assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions))
.isNull();
}

@Test
public void testPartitionFilter() throws Exception {
// Test partition filter functionality
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to multiple partitions
write.write(rowData(1, 10, 100L)); // partition pt=1
write.write(rowData(1, 20, 200L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(2, 30, 300L)); // partition pt=2
write.write(rowData(2, 40, 400L));
commit.commit(1, write.prepareCommit(true, 1));

write.write(rowData(3, 50, 500L)); // partition pt=3
commit.commit(2, write.prepareCommit(true, 2));

// Without partition filter - should return all data
TableScan.Plan planAll = table.newScan().plan();
List<String> resultAll = getResult(table.newRead(), planAll.splits());
assertThat(resultAll.size()).isEqualTo(5);

// Specify partition filter using Map
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("pt", "1");
TableScan.Plan plan1 = table.newScan().withPartitionFilter(partitionSpec).plan();
List<String> result1 = getResult(table.newRead(), plan1.splits());
assertThat(result1.size()).isEqualTo(2);
assertThat(result1).allMatch(s -> s.contains("1|"));

// Specify partition filter using BinaryRow
TableScan.Plan plan2 =
table.newScan().withPartitionFilter(Collections.singletonList(binaryRow(2))).plan();
List<String> result2 = getResult(table.newRead(), plan2.splits());
assertThat(result2.size()).isEqualTo(2);
assertThat(result2).allMatch(s -> s.contains("2|"));

write.close();
commit.close();
}

@Test
public void testBucketFilter() throws Exception {
// Create append-only table with multiple buckets directly
Options conf = new Options();
conf.set(org.apache.paimon.CoreOptions.BUCKET, 3);
conf.set(org.apache.paimon.CoreOptions.BUCKET_KEY, "a");

// Use a new path to avoid schema conflict with the default primary key table
java.nio.file.Path newTempDir = java.nio.file.Files.createTempDirectory("junit");
tablePath = new org.apache.paimon.fs.Path(
org.apache.paimon.utils.TraceableFileIO.SCHEME + "://" + newTempDir.toString());
fileIO = org.apache.paimon.fs.FileIOFinder.find(tablePath);
table = createFileStoreTable(false, conf, tablePath);

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to different buckets
for (int i = 0; i < 10; i++) {
write.write(rowData(1, i, (long) i * 100));
commit.commit(i, write.prepareCommit(true, i));
}

// Without bucket filter - should return all data
TableScan.Plan planAll = table.newScan().plan();
assertThat(planAll.splits().size()).isEqualTo(10);

// Use bucket filter - only return data from specified bucket
TableScan.Plan planBucket0 = table.newScan().withBucket(0).plan();
assertThat(planBucket0.splits()).allMatch(split -> ((DataSplit) split).bucket() == 0);

// Use bucketFilter - filter out specific buckets
TableScan.Plan planBucketFilter =
table.newScan().withBucketFilter(bucket -> bucket == 1 || bucket == 2).plan();
assertThat(planBucketFilter.splits())
.allMatch(
split -> {
int bucket = ((DataSplit) split).bucket();
return bucket == 1 || bucket == 2;
});

write.close();
commit.close();
}

@Test
public void testLevelFilter() throws Exception {
// Test level filter for primary key table
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to trigger compaction and produce files at different levels
for (int i = 0; i < 10; i++) {
write.write(rowData(1, i, (long) i * 100));
commit.commit(i, write.prepareCommit(true, i));
}

// Without level filter
TableScan.Plan planAll = table.newScan().plan();
assertThat(planAll.splits().size()).isGreaterThan(0);

// Use level filter - only return level 0 data
TableScan.Plan planLevel0 = table.newScan().withLevelFilter(level -> level == 0).plan();
for (Split split : planLevel0.splits()) {
DataSplit dataSplit = (DataSplit) split;
assertThat(dataSplit.dataFiles()).allMatch(file -> file.level() == 0);
}

write.close();
commit.close();
}

@Test
public void testListPartitionEntries() throws Exception {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to multiple partitions
write.write(rowData(1, 10, 100L));
commit.commit(0, write.prepareCommit(true, 0));

write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(true, 1));

write.write(rowData(3, 30, 300L));
commit.commit(2, write.prepareCommit(true, 2));

// Test listPartitionEntries
List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries();
assertThat(partitionEntries.size()).isEqualTo(3);

// Verify partition values
List<Integer> partitionValues =
partitionEntries.stream()
.map(entry -> entry.partition().getInt(0))
.sorted()
.collect(java.util.stream.Collectors.toList());
assertThat(partitionValues).containsExactly(1, 2, 3);

// Test listPartitions (convenience method)
List<org.apache.paimon.data.BinaryRow> partitions = table.newScan().listPartitions();
assertThat(partitions.size()).isEqualTo(3);

write.close();
commit.close();
}

@Test
public void testPrimaryKeyTableScan() throws Exception {
// Use existing primary key table (default table is primary key table)
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 20, 200L));
commit.commit(0, write.prepareCommit(true, 0));

// Update data (primary key is pt, a)
write.write(rowData(1, 10, 101L)); // Update data for (1, 10)
commit.commit(1, write.prepareCommit(true, 1));

// Verify scan result - should only have the latest values
TableScan.Plan plan = table.newScan().plan();
List<String> result = getResult(table.newRead(), plan.splits());
assertThat(result.size()).isEqualTo(2);
assertThat(result).containsExactlyInAnyOrder("+I 1|10|101", "+I 1|20|200");

// Delete data
write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 200L));
commit.commit(2, write.prepareCommit(true, 2));

// Verify result after deletion
TableScan.Plan planAfterDelete = table.newScan().plan();
List<String> resultAfterDelete = getResult(table.newRead(), planAfterDelete.splits());
assertThat(resultAfterDelete.size()).isEqualTo(1);
assertThat(resultAfterDelete).containsExactly("+I 1|10|101");

write.close();
commit.close();
}

@Test
public void testEmptyTableScan() throws Exception {
// Test empty table scan
TableScan.Plan plan = table.newScan().plan();
assertThat(plan.splits()).isEmpty();

// Partition list for empty table
List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries();
assertThat(partitionEntries).isEmpty();
}

@Test
public void testScanWithMultipleFilters() throws Exception {
createAppendOnlyTable();

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write test data
for (int pt = 1; pt <= 3; pt++) {
for (int a = 1; a <= 10; a++) {
write.write(rowData(pt, a * 10, (long) pt * 1000 + a * 100));
commit.commit(pt * 100 + a, write.prepareCommit(true, pt * 100 + a));
}
}

// Combine partition filter and column filter
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("pt", "2");

Predicate filter =
new PredicateBuilder(table.schema().logicalRowType())
.greaterOrEqual(1, 50); // a >= 50

TableScan.Plan plan =
table.newScan().withPartitionFilter(partitionSpec).withFilter(filter).plan();

List<String> result = getResult(table.newRead(), plan.splits());

// Verify result: only data with pt=2 and a >= 50
assertThat(result).allMatch(s -> s.contains("2|"));
for (String r : result) {
String[] parts = r.split("\\|");
int aValue = Integer.parseInt(parts[1].trim());
assertThat(aValue).isGreaterThanOrEqualTo(50);
}

write.close();
commit.close();
}

@Test
public void testLimitWithPartitionFilter() throws Exception {
createAppendOnlyTable();

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to different partitions
for (int pt = 1; pt <= 3; pt++) {
for (int i = 0; i < 10; i++) {
write.write(rowData(pt, i, (long) pt * 1000 + i * 100));
commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i));
}
}

// Use partition filter + limit
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("pt", "2");

TableScan.Plan plan =
table.newScan().withPartitionFilter(partitionSpec).withLimit(5).plan();

// Should return at most 5 splits (1 row per split)
assertThat(plan.splits().size()).isLessThanOrEqualTo(5);

// All data should come from partition 2
List<String> result = getResult(table.newRead(), plan.splits());
assertThat(result).allMatch(s -> s.contains("2|"));

write.close();
commit.close();
}

@Test
public void testScanAfterCompaction() throws Exception {
// Test scan after compaction for primary key table
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data with same primary key multiple times to trigger compaction
for (int i = 0; i < 5; i++) {
write.write(rowData(1, 10, 100L + i));
commit.commit(i, write.prepareCommit(true, i));
}

// Scan result should only have the latest value
TableScan.Plan plan = table.newScan().plan();
List<String> result = getResult(table.newRead(), plan.splits());
assertThat(result.size()).isEqualTo(1);
assertThat(result).containsExactly("+I 1|10|104"); // latest value

write.close();
commit.close();
}

@Test
public void testTopNWithPartitionFilter() throws Exception {
createAppendOnlyTable();

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// Write data to different partitions
for (int pt = 1; pt <= 2; pt++) {
for (int i = 1; i <= 5; i++) {
write.write(rowData(pt, i * 10, (long) pt * 1000 + i * 100));
commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i));
}
}

// Combine partition filter and TopN
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("pt", "1");

DataField field = table.schema().fields().get(1);
FieldRef ref = new FieldRef(field.id(), field.name(), field.type());

TableScan.Plan plan =
table.newScan()
.withPartitionFilter(partitionSpec)
.withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 2))
.plan();

// Verify result: only pt=1 data, and top 2
List<Split> splits = plan.splits();
assertThat(splits.size()).isLessThanOrEqualTo(2);

write.close();
commit.close();
}
}
Loading