From 2fe1a3e212e51910f1487ec91af5cc93118d2749 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 7 Oct 2025 14:31:05 -0400 Subject: [PATCH 1/5] Add test case that proves bug --- .../accumulo/test/ClientSideIteratorIT.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java index 824284ee981..66cce8a8902 100644 --- a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -39,8 +40,10 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.OfflineScanner; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.PartialKey; @@ -263,4 +266,55 @@ public void testPluginEnv() throws Exception { client.tableOperations().removeProperty(tableName, "table.custom.testRegex"); runPluginEnvTest(rows); } + + @Test + public void testDuplicateTimestampScanLosesKeys() throws Exception { + ClientContext context = (ClientContext) client; + final int numRows = 100; + final int mutationsPerRow = 10; + final int expectedEntries = numRows * mutationsPerRow; + SecureRandom random = new SecureRandom(); + byte[] randomValue = new byte[8192]; + + client.tableOperations().create(tableName); + + client.tableOperations().modifyProperties(tableName, properties -> { + // remove the versioning iterators + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + + // make the scan returns cut more often + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < mutationsPerRow; j++) { + Mutation m = new Mutation("row" + i); + random.nextBytes(randomValue); + m.put("cf" + i, "cq" + i, 100L, new Value(randomValue)); + bw.addMutation(m); + } + } + } + client.tableOperations().flush(tableName, null, null, true); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + client.tableOperations().offline(tableName, true); + long offlineCount; + try (OfflineScanner offlineScanner = + new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) { + offlineCount = offlineScanner.stream().count(); + } + + client.tableOperations().online(tableName, true); + long onlineCount; + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + onlineCount = scanner.stream().count(); + } + + assertEquals(expectedEntries, offlineCount); + assertEquals(offlineCount, onlineCount, "Online scan lost keys compared to direct RFile scan"); + } } From ccbea5232e11d4367e43ef1a3ff55997590c2447 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Fri, 10 Oct 2025 15:18:46 -0400 Subject: [PATCH 2/5] Preserve duplicate keys for scan resumptions --- .../apache/accumulo/tserver/tablet/Batch.java | 9 ++- .../accumulo/tserver/tablet/Scanner.java | 5 +- .../accumulo/tserver/tablet/TabletBase.java | 61 ++++++++++++++++--- .../accumulo/test/ClientSideIteratorIT.java | 2 +- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java index d610a8f6ebb..50d731ed1c4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java @@ -27,12 +27,15 @@ final class Batch { private final List results; private final Key continueKey; private final long numBytes; + private final int duplicatesToSkip; - Batch(boolean skipContinueKey, List results, Key continueKey, long numBytes) { + Batch(boolean skipContinueKey, List results, Key continueKey, long numBytes, + int duplicatesToSkip) { this.skipContinueKey = skipContinueKey; this.results = results; this.continueKey = continueKey; this.numBytes = numBytes; + this.duplicatesToSkip = duplicatesToSkip; } public boolean isSkipContinueKey() { @@ -50,4 +53,8 @@ public Key getContinueKey() { public long getNumBytes() { return numBytes; } + + public int getDuplicatesToSkip() { + return duplicatesToSkip; + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 2b89a2005b1..606bfeb4205 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -58,6 +58,7 @@ public class Scanner { private final AtomicBoolean interruptFlag; private boolean readInProgress = false; + private int duplicatesToSkip = 0; Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) { this.tablet = tablet; @@ -117,7 +118,8 @@ public ScanBatch read() throws IOException, TabletClosedException { iter = new SourceSwitchingIterator(dataSource, false); } - results = tablet.nextBatch(iter, range, scanParams); + results = tablet.nextBatch(iter, range, scanParams, duplicatesToSkip); + duplicatesToSkip = 0; if (results.getResults() == null) { range = null; @@ -127,6 +129,7 @@ public ScanBatch read() throws IOException, TabletClosedException { } else { range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive()); + duplicatesToSkip = results.getDuplicatesToSkip(); return new ScanBatch(results.getResults(), true); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index dcc54e9da71..9b8830f2eb6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -55,6 +55,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.MemKey; import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; @@ -236,8 +237,8 @@ public Tablet.LookupResult lookup(List ranges, List results, } } - Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams) - throws IOException { + Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams, + int duplicatesToSkip) throws IOException { // log.info("In nextBatch.."); @@ -259,7 +260,8 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM); Key continueKey = null; - boolean skipContinueKey = false; + boolean skipContinueKey = true; + boolean resumeOnSameKey = false; YieldCallback yield = new YieldCallback<>(); @@ -274,6 +276,11 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true); } + skipReturnedDuplicates(iter, duplicatesToSkip, range); + + Key lastKey = null; + int duplicatesSeenForLastKey = 0; + while (iter.hasTop()) { if (yield.hasYielded()) { throw new IOException( @@ -287,11 +294,19 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet resultSize += kvEntry.estimateMemoryUsed(); resultBytes += kvEntry.numBytes(); + if (key.equals(lastKey)) { + duplicatesSeenForLastKey++; + } else { + lastKey = copyResumeKey(key); + duplicatesSeenForLastKey = 1; + } + boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) { - continueKey = new Key(key); - skipContinueKey = true; + continueKey = copyResumeKey(key); + resumeOnSameKey = true; + skipContinueKey = false; break; } @@ -299,7 +314,8 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } if (yield.hasYielded()) { - continueKey = new Key(yield.getPositionAndReset()); + continueKey = copyResumeKey(yield.getPositionAndReset()); + resumeOnSameKey = false; skipContinueKey = true; if (!range.contains(continueKey)) { throw new IOException("Underlying iterator yielded to a position outside of its range: " @@ -322,7 +338,9 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } } - return new Batch(skipContinueKey, results, continueKey, resultBytes); + int duplicatesToSkipForNextBatch = resumeOnSameKey ? 1 : 0; + return new Batch(skipContinueKey, results, continueKey, resultBytes, + duplicatesToSkipForNextBatch); } private Tablet.LookupResult lookup(SortedKeyValueIterator mmfi, List ranges, @@ -475,7 +493,8 @@ private void handleTabletClosedDuringScan(List results, Tablet.LookupRe private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) { if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { - Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive()); + Key copy = copyResumeKey(key); + Range nlur = new Range(copy, false, range.getEndKey(), range.isEndKeyInclusive()); lookupResult.unfinishedRanges.add(nlur); } } @@ -486,4 +505,30 @@ public synchronized void updateQueryStats(int size, long numBytes) { this.queryResultBytes.addAndGet(numBytes); this.server.getScanMetrics().incrementQueryResultBytes(numBytes); } + + private Key copyResumeKey(Key key) { + if (key instanceof MemKey) { + MemKey memKey = (MemKey) key; + return new MemKey(memKey, memKey.getKVCount()); + } + return new Key(key); + } + + private void skipReturnedDuplicates(SortedKeyValueIterator iter, int duplicatesToSkip, + Range range) throws IOException { + if (duplicatesToSkip <= 0 || !range.isStartKeyInclusive()) { + return; + } + + Key startKey = range.getStartKey(); + if (startKey == null) { + return; + } + + int skipped = 0; + while (skipped < duplicatesToSkip && iter.hasTop() && iter.getTopKey().equals(startKey)) { + iter.next(); + skipped++; + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java index 66cce8a8902..a112f3b1428 100644 --- a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java @@ -268,7 +268,7 @@ public void testPluginEnv() throws Exception { } @Test - public void testDuplicateTimestampScanLosesKeys() throws Exception { + public void testDuplicateScanLosesKeys() throws Exception { ClientContext context = (ClientContext) client; final int numRows = 100; final int mutationsPerRow = 10; From fc8b6c96e69ab0f1a131e2c726d5f57f2a5f8bf2 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Wed, 3 Dec 2025 15:07:58 -0500 Subject: [PATCH 3/5] Fix hanging bug, keep dup keys across boundaries --- .../ConfigurableScanServerHostSelector.java | 7 +++-- .../accumulo/tserver/tablet/TabletBase.java | 29 +++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java index fef92c80430..cfc21b018fe 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java @@ -155,13 +155,14 @@ private List getServersForHostAttempt(int hostAttempt, TabletId tablet, int selectServers(SelectorParameters params, Profile profile, RendezvousHasher rhasher, Map serversToUse) { - int maxHostAttempt = 0; + int maxAttempts = 0; for (TabletId tablet : params.getTablets()) { + int attempts = params.getAttempts(tablet).size(); + maxAttempts = Math.max(attempts, maxAttempts); Map> prevFailures = computeFailuresByHost(tablet, params); for (int hostAttempt = 0; hostAttempt < profile.getAttemptPlans().size(); hostAttempt++) { - maxHostAttempt = Math.max(hostAttempt, maxHostAttempt); List scanServers = getServersForHostAttempt(hostAttempt, tablet, profile, rhasher, prevFailures); if (!scanServers.isEmpty()) { @@ -183,6 +184,6 @@ int selectServers(SelectorParameters params, Profile profile, RendezvousHasher r } } - return maxHostAttempt; + return maxAttempts; } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 9b8830f2eb6..4f4c78a2118 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -278,8 +278,12 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet skipReturnedDuplicates(iter, duplicatesToSkip, range); - Key lastKey = null; - int duplicatesSeenForLastKey = 0; + Key rangeStartKey = range.getStartKey(); + + Key startKey = null; + boolean resumingOnSameKey = + iter.hasTop() && rangeStartKey != null && rangeStartKey.equals(iter.getTopKey()); + int duplicatesReturnedForStartKey = resumingOnSameKey ? duplicatesToSkip : 0; while (iter.hasTop()) { if (yield.hasYielded()) { @@ -288,18 +292,25 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } value = iter.getTopValue(); key = iter.getTopKey(); + if (startKey == null) { + startKey = copyResumeKey(key); + } + + if (!key.equals(startKey)) { + // Batch limit reached before finishing duplicates for start key + // resume on the start key and skip what we have already returned + continueKey = copyResumeKey(startKey); + resumeOnSameKey = true; + skipContinueKey = false; + break; + } KVEntry kvEntry = new KVEntry(key, value); // copies key and value results.add(kvEntry); resultSize += kvEntry.estimateMemoryUsed(); resultBytes += kvEntry.numBytes(); - if (key.equals(lastKey)) { - duplicatesSeenForLastKey++; - } else { - lastKey = copyResumeKey(key); - duplicatesSeenForLastKey = 1; - } + duplicatesReturnedForStartKey++; boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; @@ -338,7 +349,7 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } } - int duplicatesToSkipForNextBatch = resumeOnSameKey ? 1 : 0; + int duplicatesToSkipForNextBatch = resumeOnSameKey ? duplicatesReturnedForStartKey : 0; return new Batch(skipContinueKey, results, continueKey, resultBytes, duplicatesToSkipForNextBatch); } From 0f01109d41a4b67d7d161743958a0662b5b24ff0 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 4 Dec 2025 13:32:12 -0500 Subject: [PATCH 4/5] Revert unrelated change --- .../core/spi/scan/ConfigurableScanServerHostSelector.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java index cfc21b018fe..fef92c80430 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java @@ -155,14 +155,13 @@ private List getServersForHostAttempt(int hostAttempt, TabletId tablet, int selectServers(SelectorParameters params, Profile profile, RendezvousHasher rhasher, Map serversToUse) { - int maxAttempts = 0; + int maxHostAttempt = 0; for (TabletId tablet : params.getTablets()) { - int attempts = params.getAttempts(tablet).size(); - maxAttempts = Math.max(attempts, maxAttempts); Map> prevFailures = computeFailuresByHost(tablet, params); for (int hostAttempt = 0; hostAttempt < profile.getAttemptPlans().size(); hostAttempt++) { + maxHostAttempt = Math.max(hostAttempt, maxHostAttempt); List scanServers = getServersForHostAttempt(hostAttempt, tablet, profile, rhasher, prevFailures); if (!scanServers.isEmpty()) { @@ -184,6 +183,6 @@ int selectServers(SelectorParameters params, Profile profile, RendezvousHasher r } } - return maxAttempts; + return maxHostAttempt; } } From dd3b3d3a8891b89511b1c614a2fc62eff5249ebd Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Wed, 10 Dec 2025 13:48:03 -0500 Subject: [PATCH 5/5] add new test class --- .../accumulo/test/ClientSideIteratorIT.java | 54 ---- .../test/DuplicateKeyEdgeCasesIT.java | 270 ++++++++++++++++++ 2 files changed, 270 insertions(+), 54 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java index a112f3b1428..824284ee981 100644 --- a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java @@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -40,10 +39,8 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.OfflineScanner; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.PartialKey; @@ -266,55 +263,4 @@ public void testPluginEnv() throws Exception { client.tableOperations().removeProperty(tableName, "table.custom.testRegex"); runPluginEnvTest(rows); } - - @Test - public void testDuplicateScanLosesKeys() throws Exception { - ClientContext context = (ClientContext) client; - final int numRows = 100; - final int mutationsPerRow = 10; - final int expectedEntries = numRows * mutationsPerRow; - SecureRandom random = new SecureRandom(); - byte[] randomValue = new byte[8192]; - - client.tableOperations().create(tableName); - - client.tableOperations().modifyProperties(tableName, properties -> { - // remove the versioning iterators - properties.remove("table.iterator.scan.vers"); - properties.remove("table.iterator.minc.vers"); - properties.remove("table.iterator.majc.vers"); - - // make the scan returns cut more often - properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); - }); - - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < mutationsPerRow; j++) { - Mutation m = new Mutation("row" + i); - random.nextBytes(randomValue); - m.put("cf" + i, "cq" + i, 100L, new Value(randomValue)); - bw.addMutation(m); - } - } - } - client.tableOperations().flush(tableName, null, null, true); - client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - - client.tableOperations().offline(tableName, true); - long offlineCount; - try (OfflineScanner offlineScanner = - new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) { - offlineCount = offlineScanner.stream().count(); - } - - client.tableOperations().online(tableName, true); - long onlineCount; - try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { - onlineCount = scanner.stream().count(); - } - - assertEquals(expectedEntries, offlineCount); - assertEquals(offlineCount, onlineCount, "Online scan lost keys compared to direct RFile scan"); - } } diff --git a/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java b/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java new file mode 100644 index 00000000000..ce926cdd020 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.OfflineScanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Edge-case regression tests for duplicate-key scans. These are expected to fail until the noted + * issues are addressed. + */ +public class DuplicateKeyEdgeCasesIT extends AccumuloClusterHarness { + + private AccumuloClient client; + private String tableName; + SecureRandom random = new SecureRandom(); + + @BeforeEach + public void setupInstance() { + client = Accumulo.newClient().from(getClientProps()).build(); + tableName = getUniqueNames(1)[0]; + } + + @AfterEach + public void teardown() { + try { + client.tableOperations().delete(tableName); + } catch (Exception e) { + // ignore + } + client.close(); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(2); + } + + /** + * Test that identical keys with different values are not dropped. + */ + @Test + public void testDuplicateScanLosesKeys() throws Exception { + final int numRows = 100; + final int mutationsPerRow = 10; + final int expectedEntries = numRows * mutationsPerRow; + byte[] randomValue = new byte[8192]; + + client.tableOperations().create(tableName); + + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < mutationsPerRow; j++) { + Mutation m = new Mutation("row" + i); + random.nextBytes(randomValue); + m.put("cf" + i, "cq" + i, 100L, new Value(randomValue)); + bw.addMutation(m); + } + } + } + client.tableOperations().flush(tableName, null, null, true); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + long onlineCount; + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + onlineCount = scanner.stream().count(); + } + + assertEquals(expectedEntries, offlineValues.size()); + assertEquals(offlineValues.size(), onlineCount, + "Online scan lost keys compared to direct RFile scan"); + } + + /** + * Test that multiple entries with the same key but different values is properly read when the + * underlying files change midway through a scan + */ + @Test + public void duplicateOrderChangesAcrossFilesDropsValues() throws Exception { + final int totalWrites = 150; + final int writesPerFlush = 50; + + client.tableOperations().create(tableName); + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + // Write duplicates in three flushes to get multiple files + int written = 0; + int flushIndex = 0; + try (BatchWriter bw = client.createBatchWriter(tableName)) { + while (written < totalWrites) { + for (int i = 0; i < writesPerFlush && written < totalWrites; i++) { + String value = "v-" + flushIndex + "-" + written; + Mutation m = new Mutation("row"); + m.put("cf", "cq", 100L, new Value(value.getBytes(UTF_8))); + bw.addMutation(m); + written++; + } + bw.flush(); + client.tableOperations().flush(tableName, null, null, true); + flushIndex++; + } + } + assertEquals(totalWrites, written); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + List seen = new ArrayList<>(); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setBatchSize(8); + + var iter = scanner.iterator(); + for (int i = 0; i < 16 && iter.hasNext(); i++) { + seen.add(new String(iter.next().getValue().get(), UTF_8)); + } + + // Rewrite files to change the source ordering. + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + iter.forEachRemaining(e -> seen.add(new String(e.getValue().get(), UTF_8))); + } + + assertEquals(offlineValues.size(), seen.size()); + Collections.sort(offlineValues); + Collections.sort(seen); + assertEquals(offlineValues, seen); + } + + /** + * Test that if a tserver dies part way through a scan on a table with identical keys, nothing is + * lost when the scan completes after a tserver comes back uo. + */ + @Test + public void duplicateSkipStateLostOnTserverFailover() throws Exception { + final int totalWrites = 90; + final int writesPerFlush = 30; + + client.tableOperations().create(tableName); + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + int written = 0; + int flushIndex = 0; + while (written < totalWrites) { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < writesPerFlush && written < totalWrites; i++) { + String value = "v-" + flushIndex + "-" + written; + Mutation m = new Mutation("row"); + m.put("cf", "cq", 100L, new Value(value.getBytes(UTF_8))); + bw.addMutation(m); + written++; + } + } + flushIndex++; + } + assertEquals(totalWrites, written); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + List seen = new ArrayList<>(); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setBatchSize(8); + + var iter = scanner.iterator(); + for (int i = 0; i < 12 && iter.hasNext(); i++) { + seen.add(new String(iter.next().getValue().get(), UTF_8)); + } + + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + Wait.waitFor(() -> { + System.out.println("waiting for tservers"); + return client.instanceOperations().getTabletServers().size() == 2; + }, 60_000); + + iter.forEachRemaining(e -> seen.add(new String(e.getValue().get(), UTF_8))); + } + + assertEquals(offlineValues.size(), seen.size()); + + Collections.sort(offlineValues); + Collections.sort(seen); + assertEquals(offlineValues, seen); + } + + /** + * Offline the test table, read all values with an OfflineScanner, then bring it back online and + * return the values. + */ + private List getOfflineValues() + throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + ClientContext context = (ClientContext) client; + client.tableOperations().offline(tableName, true); + List offlineValues; + try (OfflineScanner offlineScanner = + new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) { + offlineValues = offlineScanner.stream().map(e -> new String(e.getValue().get(), UTF_8)) + .collect(Collectors.toCollection(ArrayList::new)); + } + client.tableOperations().online(tableName, true); + return offlineValues; + } +}