From 7bdc0f6759986cc2d0e9d891cc5a237a2a27a63c Mon Sep 17 00:00:00 2001 From: dailiang Date: Tue, 13 Jan 2026 20:35:51 +0800 Subject: [PATCH] Fix wrong merge order of increment diff split read due to concurrent write of a key with non-deterministic sequence number. --- .../splitread/IncrementalDiffSplitRead.java | 6 ++--- .../paimon/flink/BatchFileStoreITCase.java | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index ec4408fb9737..9390a25e1080 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -213,10 +213,8 @@ public KeyValue getResult() { } else if (kvs.size() == 2) { KeyValue before = kvs.get(0); KeyValue after = kvs.get(1); - if (after.level() == AFTER_LEVEL) { - if (!valueAndRowKindEquals(before, after)) { - toReturn = after; - } + if (!valueAndRowKindEquals(before, after)) { + toReturn = after.level() == AFTER_LEVEL ? after : before; } } else { throw new IllegalArgumentException("Illegal kv number: " + kvs.size()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 22271c8a7ebe..9e254812371b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -1019,6 +1019,32 @@ public void testIncrementScanMode() throws Exception { Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), Row.of("+I", 3, "C")); } + @Test + public void testIncrementScanModeWithInsertOverwrite() throws Exception { + + sql("CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, v STRING)"); + + // snapshot 1 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'A'), (1, 'B'), (1, 'C')"); + // snapshot 2 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'C'), (1, 'D')"); + + List result = + sql( + "SELECT * FROM `test_scan_mode$audit_log` " + + "/*+ OPTIONS('incremental-between'='1,2','incremental-between-scan-mode'='diff') */"); + assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 1, "D")); + + // snapshot 3 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'D')"); + + result = + sql( + "SELECT * FROM `test_scan_mode$audit_log` " + + "/*+ OPTIONS('incremental-between'='2,2','incremental-between-scan-mode'='diff') */"); + assertThat(result).isEmpty(); + } + @Test public void testAuditLogTableWithComputedColumn() throws Exception { sql("CREATE TABLE test_table (a int, b int, c AS a + b);");