From de8bbe205e7bae48d242ab15f32d644e554d1577 Mon Sep 17 00:00:00 2001 From: Lietong Liu Date: Thu, 5 Feb 2026 20:49:20 +0800 Subject: [PATCH] [core] Fix LookupMergeFunction to use sequence.field for picking high level records When sequence.field is configured, LookupMergeFunction.pickHighLevel() should select the record with the highest sequence value instead of the lowest level number. This ensures correct behavior when out-of-order data arrives. Previously, pickHighLevel() only compared level numbers, which could lead to incorrect results when: - L1 has sequence=7 (older) - L2 has sequence=8 (newer) - L0 has sequence=6 (oldest, out-of-order arrival) The old logic would pick L1 (level 1 < level 2), but the correct behavior should pick L2 (sequence 8 > 7). This fix: 1. Adds sequenceComparator field to LookupMergeFunction 2. Modifies pickHighLevel() to use sequence comparator when available 3. Modifies getResult() to sort records by sequence before adding to merge function 4. Only sets sequenceComparator when user-defined sequence field is configured, preserving original behavior when sequence.field is not set 5. Adds test cases to verify the fix, backward compatibility, and descending sort order Co-Authored-By: Claude Opus 4.5 --- .../LookupChangelogMergeFunctionWrapper.java | 8 + .../compact/LookupMergeFunction.java | 37 +++- ...okupChangelogMergeFunctionWrapperTest.java | 186 ++++++++++++++++++ 3 files changed, 225 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index 7283a3030d01..04d9141602ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -88,6 +88,14 @@ public LookupChangelogMergeFunctionWrapper( this.lookupStrategy = lookupStrategy; this.deletionVectorsMaintainer = deletionVectorsMaintainer; this.comparator = createSequenceComparator(userDefinedSeqComparator); + // Only set sequence comparator when user-defined sequence field is configured + // to preserve original behavior (pick by level) when sequence.field is not set. + // Note: We use the same comparator for both insertInto and pickHighLevel. + // The comparator's semantics (ascending/descending) are already handled correctly + // by UserDefinedSeqComparator based on sequence.field.sort-order configuration. + if (userDefinedSeqComparator != null) { + this.mergeFunction.setSequenceComparator(this.comparator); + } } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java index 1bd9aaa84339..8d01bb4956a7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java @@ -27,7 +27,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; /** * A {@link MergeFunction} for lookup, this wrapper only considers the latest high level record, @@ -41,6 +43,7 @@ public class LookupMergeFunction implements MergeFunction { private final KeyValueBuffer candidates; private boolean containLevel0; private InternalRow currentKey; + @Nullable private Comparator sequenceComparator; public LookupMergeFunction( MergeFunction mergeFunction, @@ -52,6 +55,11 @@ public LookupMergeFunction( this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType, valueType, ioManager); } + /** Set the sequence comparator for picking high level records. */ + public void setSequenceComparator(@Nullable Comparator sequenceComparator) { + this.sequenceComparator = sequenceComparator; + } + @Override public void reset() { candidates.reset(); @@ -83,9 +91,16 @@ public KeyValue pickHighLevel() { if (kv.level() <= 0) { continue; } - // For high-level comparison logic (not involving Level 0), only the value of the - // minimum Level should be selected - if (highLevel == null || kv.level() < highLevel.level()) { + if (highLevel == null) { + highLevel = kv; + } else if (sequenceComparator != null) { + // When sequence comparator is set, use it to pick the record with highest + // sequence value, which represents the latest record + if (sequenceComparator.compare(kv, highLevel) > 0) { + highLevel = kv; + } + } else if (kv.level() < highLevel.level()) { + // Without sequence comparator, fall back to picking the minimum level highLevel = kv; } } @@ -107,18 +122,28 @@ public void insertInto(KeyValue highLevel, Comparator comparator) { public KeyValue getResult() { mergeFunction.reset(); KeyValue highLevel = pickHighLevel(); + + // Collect records to merge: level-0 records and the picked high level record + List toMerge = new ArrayList<>(); try (CloseableIterator iterator = candidates.iterator()) { while (iterator.hasNext()) { KeyValue kv = iterator.next(); - // records that has not been stored on the disk yet, such as the data in the write - // buffer being at level -1 if (kv.level() <= 0 || kv == highLevel) { - mergeFunction.add(kv); + toMerge.add(kv); } } } catch (Exception e) { throw new RuntimeException(e); } + + // When sequence comparator is set, sort by sequence so highest sequence is added last + if (sequenceComparator != null) { + toMerge.sort(sequenceComparator); + } + + for (KeyValue kv : toMerge) { + mergeFunction.add(kv); + } return mergeFunction.getResult(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index 57d99557ca5f..470e84525ab0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -556,4 +556,190 @@ public void testKeepLowestHighLevel() { kv = result.result(); assertThat(kv.value().getInt(0)).isEqualTo(3); } + + /** + * Test that sequence.field is correctly used to pick the high level record with the highest + * sequence value, even when it's at a higher level number. + * + *

Scenario: L1 has older sequence (7), L2 has newer sequence (8), L0 has oldest (6). The + * correct behavior should pick L2 (sequence=8) as the high level record. + */ + @Test + public void testSequenceFieldWithMultipleLevels() { + // Define value type with sequence field as the second column + RowType valueType = + RowType.builder() + .fields( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, + new String[] {"value", "sequence"}) + .build(); + + // Create user-defined sequence comparator on the second field + UserDefinedSeqComparator userDefinedSeqComparator = + UserDefinedSeqComparator.create( + valueType, + CoreOptions.fromMap(ImmutableMap.of("sequence.field", "sequence"))); + assertThat(userDefinedSeqComparator).isNotNull(); + + Map highLevel = new HashMap<>(); + + LookupChangelogMergeFunctionWrapper function = + new LookupChangelogMergeFunctionWrapper( + LookupMergeFunction.wrap( + DeduplicateMergeFunction.factory(), null, null, null), + highLevel::get, + null, + LookupStrategy.from(false, true, false, false), + null, + userDefinedSeqComparator); + + // Test scenario: + // L1: (key=1, value=100, sequence=7) <- Level 1, but older sequence + // L2: (key=1, value=200, sequence=8) <- Level 2, but newer sequence (should be picked!) + // L0: (key=1, value=50, sequence=6) <- Level 0, oldest sequence + + function.reset(); + function.add( + new KeyValue() + .replace(row(1), 1, INSERT, row(100, 7)) + .setLevel(1)); // Level 1, seq=7 + function.add( + new KeyValue() + .replace(row(1), 1, INSERT, row(200, 8)) + .setLevel(2)); // Level 2, seq=8 + function.add( + new KeyValue() + .replace(row(1), 2, INSERT, row(50, 6)) + .setLevel(0)); // Level 0, seq=6 + + ChangelogResult result = function.getResult(); + assertThat(result).isNotNull(); + + KeyValue kv = result.result(); + assertThat(kv).isNotNull(); + + // Should return the record with highest sequence (seq=8 from L2) + int actualSequence = kv.value().getInt(1); + int actualValue = kv.value().getInt(0); + + assertThat(actualSequence) + .as("Should return record with highest sequence field (8)") + .isEqualTo(8); + assertThat(actualValue).isEqualTo(200); + + // Verify changelog: before should be L2 (seq=8), after should be merged result + List changelogs = result.changelogs(); + assertThat(changelogs).hasSize(2); + assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE); + assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(8); // before is L2 + assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER); + } + + /** + * Test that without sequence.field, the original behavior is preserved: pick the record with + * the lowest level number. + */ + @Test + public void testWithoutSequenceFieldPreservesOriginalBehavior() { + Map highLevel = new HashMap<>(); + + // No userDefinedSeqComparator (null) + LookupChangelogMergeFunctionWrapper function = + new LookupChangelogMergeFunctionWrapper( + LookupMergeFunction.wrap( + DeduplicateMergeFunction.factory(), null, null, null), + highLevel::get, + null, + LookupStrategy.from(false, true, false, false), + null, + null); // No sequence comparator + + // L1: value=100, L2: value=200 + // Without sequence.field, should pick L1 (level 1 < level 2) + function.reset(); + function.add(new KeyValue().replace(row(1), 1, INSERT, row(100)).setLevel(1)); + function.add(new KeyValue().replace(row(1), 1, INSERT, row(200)).setLevel(2)); + function.add(new KeyValue().replace(row(1), 2, INSERT, row(50)).setLevel(0)); + + ChangelogResult result = function.getResult(); + assertThat(result).isNotNull(); + + // Without sequence.field, L1 is picked as highLevel, and L0 is the latest + // So the result should be L0's value (50) since DeduplicateMergeFunction keeps the last + KeyValue kv = result.result(); + assertThat(kv).isNotNull(); + assertThat(kv.value().getInt(0)).isEqualTo(50); + + // Changelog: before=L1(100), after=L0(50) + List changelogs = result.changelogs(); + assertThat(changelogs).hasSize(2); + assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(100); // before is L1 + assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(50); // after is L0 + } + + /** + * Test sequence.field with descending sort order. When sort-order=descending, smaller sequence + * values are considered "newer". + * + *

Note: We use a 3-field schema with sequence at index 2 to avoid cache collision with other + * tests, because CodeGenUtils caches comparators by field types and indices without considering + * sort order. + */ + @Test + public void testSequenceFieldWithDescendingSortOrder() { + RowType valueType = + RowType.builder() + .fields( + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, + new String[] {"value", "extra", "sequence"}) + .build(); + + // Create comparator with descending order + UserDefinedSeqComparator userDefinedSeqComparator = + UserDefinedSeqComparator.create( + valueType, + CoreOptions.fromMap( + ImmutableMap.of( + "sequence.field", "sequence", + "sequence.field.sort-order", "descending"))); + assertThat(userDefinedSeqComparator).isNotNull(); + + Map highLevel = new HashMap<>(); + + LookupChangelogMergeFunctionWrapper function = + new LookupChangelogMergeFunctionWrapper( + LookupMergeFunction.wrap( + DeduplicateMergeFunction.factory(), null, null, null), + highLevel::get, + null, + LookupStrategy.from(false, true, false, false), + null, + userDefinedSeqComparator); + + // With descending order, smaller sequence = newer + // L1: (key=1, value=100, extra=0, sequence=7) <- Level 1, newer (7 < 8) + // L2: (key=1, value=200, extra=0, sequence=8) <- Level 2, older (8 > 7) + // L0: (key=1, value=50, extra=0, sequence=9) <- Level 0, oldest (9 > 8 > 7) + + function.reset(); + function.add(new KeyValue().replace(row(1), 1, INSERT, row(100, 0, 7)).setLevel(1)); + function.add(new KeyValue().replace(row(1), 1, INSERT, row(200, 0, 8)).setLevel(2)); + function.add(new KeyValue().replace(row(1), 2, INSERT, row(50, 0, 9)).setLevel(0)); + + ChangelogResult result = function.getResult(); + assertThat(result).isNotNull(); + + KeyValue kv = result.result(); + assertThat(kv).isNotNull(); + + int actualSequence = kv.value().getInt(2); + int actualValue = kv.value().getInt(0); + + // With descending order, L1 (seq=7) is the newest high-level record + // The result should be L1's value (100) since it's the newest + assertThat(actualSequence) + .as("With descending order, should return record with smallest sequence (7)") + .isEqualTo(7); + assertThat(actualValue).isEqualTo(100); + } }