diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java index d610a8f6ebb..50d731ed1c4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java @@ -27,12 +27,15 @@ final class Batch { private final List results; private final Key continueKey; private final long numBytes; + private final int duplicatesToSkip; - Batch(boolean skipContinueKey, List results, Key continueKey, long numBytes) { + Batch(boolean skipContinueKey, List results, Key continueKey, long numBytes, + int duplicatesToSkip) { this.skipContinueKey = skipContinueKey; this.results = results; this.continueKey = continueKey; this.numBytes = numBytes; + this.duplicatesToSkip = duplicatesToSkip; } public boolean isSkipContinueKey() { @@ -50,4 +53,8 @@ public Key getContinueKey() { public long getNumBytes() { return numBytes; } + + public int getDuplicatesToSkip() { + return duplicatesToSkip; + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 9e2a7c7af03..6289727c812 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -62,6 +62,7 @@ public class Scanner { private final AtomicBoolean interruptFlag; private boolean readInProgress = false; + private int duplicatesToSkip = 0; Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) { this.tablet = tablet; @@ -138,7 +139,8 @@ private Pair readInternal() throws IOException, Tablet iter = new SourceSwitchingIterator(dataSource, false); } - results = tablet.nextBatch(iter, range, scanParams); + results = tablet.nextBatch(iter, range, scanParams, duplicatesToSkip); + duplicatesToSkip = 0; if (results.getResults() == null) { range = null; @@ -148,6 +150,7 @@ private Pair readInternal() throws IOException, Tablet } else { range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive()); + duplicatesToSkip = results.getDuplicatesToSkip(); return new Pair<>(new ScanBatch(results.getResults(), true), dataSource); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 0ac8f6bd141..a4f80abf70d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -60,6 +60,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.MemKey; import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; @@ -276,8 +277,8 @@ void recordScanTrace(Span span, List batch, ScanParameters scanParamete } } - Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams) - throws IOException { + Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams, + int duplicatesToSkip) throws IOException { // log.info("In nextBatch.."); @@ -299,7 +300,8 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM); Key continueKey = null; - boolean skipContinueKey = false; + boolean skipContinueKey = true; + boolean resumeOnSameKey = false; YieldCallback yield = new YieldCallback<>(); @@ -314,6 +316,15 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true); } + skipReturnedDuplicates(iter, duplicatesToSkip, range); + + Key rangeStartKey = range.getStartKey(); + Key currentKey = null; + boolean resumingOnSameKey = + iter.hasTop() && rangeStartKey != null && rangeStartKey.equals(iter.getTopKey()); + int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0; + int duplicatesReturnedForCurrentKey = 0; + while (iter.hasTop()) { if (yield.hasYielded()) { throw new IOException( @@ -321,17 +332,29 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } value = iter.getTopValue(); key = iter.getTopKey(); + if (currentKey == null || !key.equals(currentKey)) { + currentKey = copyResumeKey(key); + if (resumingOnSameKey && rangeStartKey != null && key.equals(rangeStartKey)) { + duplicatesReturnedForCurrentKey = previousDuplicates; + } else { + duplicatesReturnedForCurrentKey = 0; + resumingOnSameKey = false; + } + } KVEntry kvEntry = new KVEntry(key, value); // copies key and value results.add(kvEntry); resultSize += kvEntry.estimateMemoryUsed(); resultBytes += kvEntry.numBytes(); + duplicatesReturnedForCurrentKey++; + boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) { - continueKey = new Key(key); - skipContinueKey = true; + continueKey = copyResumeKey(key); + resumeOnSameKey = true; + skipContinueKey = false; break; } @@ -339,7 +362,8 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } if (yield.hasYielded()) { - continueKey = new Key(yield.getPositionAndReset()); + continueKey = copyResumeKey(yield.getPositionAndReset()); + resumeOnSameKey = false; skipContinueKey = true; if (!range.contains(continueKey)) { throw new IOException("Underlying iterator yielded to a position outside of its range: " @@ -362,7 +386,9 @@ Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParamet } } - return new Batch(skipContinueKey, results, continueKey, resultBytes); + int duplicatesToSkipForNextBatch = resumeOnSameKey ? duplicatesReturnedForCurrentKey : 0; + return new Batch(skipContinueKey, results, continueKey, resultBytes, + duplicatesToSkipForNextBatch); } private Tablet.LookupResult lookup(SortedKeyValueIterator mmfi, List ranges, @@ -515,7 +541,8 @@ private void handleTabletClosedDuringScan(List results, Tablet.LookupRe private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) { if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { - Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive()); + Key copy = copyResumeKey(key); + Range nlur = new Range(copy, false, range.getEndKey(), range.isEndKeyInclusive()); lookupResult.unfinishedRanges.add(nlur); } } @@ -526,4 +553,30 @@ public synchronized void updateQueryStats(int size, long numBytes) { this.queryResultBytes.addAndGet(numBytes); this.server.getScanMetrics().incrementQueryResultBytes(numBytes); } + + private Key copyResumeKey(Key key) { + if (key instanceof MemKey) { + MemKey memKey = (MemKey) key; + return new MemKey(memKey, memKey.getKVCount()); + } + return new Key(key); + } + + private void skipReturnedDuplicates(SortedKeyValueIterator iter, int duplicatesToSkip, + Range range) throws IOException { + if (duplicatesToSkip <= 0 || !range.isStartKeyInclusive()) { + return; + } + + Key startKey = range.getStartKey(); + if (startKey == null) { + return; + } + + int skipped = 0; + while (skipped < duplicatesToSkip && iter.hasTop() && iter.getTopKey().equals(startKey)) { + iter.next(); + skipped++; + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java b/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java new file mode 100644 index 00000000000..ce926cdd020 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/DuplicateKeyEdgeCasesIT.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.OfflineScanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Edge-case regression tests for duplicate-key scans. These are expected to fail until the noted + * issues are addressed. + */ +public class DuplicateKeyEdgeCasesIT extends AccumuloClusterHarness { + + private AccumuloClient client; + private String tableName; + SecureRandom random = new SecureRandom(); + + @BeforeEach + public void setupInstance() { + client = Accumulo.newClient().from(getClientProps()).build(); + tableName = getUniqueNames(1)[0]; + } + + @AfterEach + public void teardown() { + try { + client.tableOperations().delete(tableName); + } catch (Exception e) { + // ignore + } + client.close(); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(2); + } + + /** + * Test that identical keys with different values are not dropped. + */ + @Test + public void testDuplicateScanLosesKeys() throws Exception { + final int numRows = 100; + final int mutationsPerRow = 10; + final int expectedEntries = numRows * mutationsPerRow; + byte[] randomValue = new byte[8192]; + + client.tableOperations().create(tableName); + + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < mutationsPerRow; j++) { + Mutation m = new Mutation("row" + i); + random.nextBytes(randomValue); + m.put("cf" + i, "cq" + i, 100L, new Value(randomValue)); + bw.addMutation(m); + } + } + } + client.tableOperations().flush(tableName, null, null, true); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + long onlineCount; + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + onlineCount = scanner.stream().count(); + } + + assertEquals(expectedEntries, offlineValues.size()); + assertEquals(offlineValues.size(), onlineCount, + "Online scan lost keys compared to direct RFile scan"); + } + + /** + * Test that multiple entries with the same key but different values is properly read when the + * underlying files change midway through a scan + */ + @Test + public void duplicateOrderChangesAcrossFilesDropsValues() throws Exception { + final int totalWrites = 150; + final int writesPerFlush = 50; + + client.tableOperations().create(tableName); + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + // Write duplicates in three flushes to get multiple files + int written = 0; + int flushIndex = 0; + try (BatchWriter bw = client.createBatchWriter(tableName)) { + while (written < totalWrites) { + for (int i = 0; i < writesPerFlush && written < totalWrites; i++) { + String value = "v-" + flushIndex + "-" + written; + Mutation m = new Mutation("row"); + m.put("cf", "cq", 100L, new Value(value.getBytes(UTF_8))); + bw.addMutation(m); + written++; + } + bw.flush(); + client.tableOperations().flush(tableName, null, null, true); + flushIndex++; + } + } + assertEquals(totalWrites, written); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + List seen = new ArrayList<>(); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setBatchSize(8); + + var iter = scanner.iterator(); + for (int i = 0; i < 16 && iter.hasNext(); i++) { + seen.add(new String(iter.next().getValue().get(), UTF_8)); + } + + // Rewrite files to change the source ordering. + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + iter.forEachRemaining(e -> seen.add(new String(e.getValue().get(), UTF_8))); + } + + assertEquals(offlineValues.size(), seen.size()); + Collections.sort(offlineValues); + Collections.sort(seen); + assertEquals(offlineValues, seen); + } + + /** + * Test that if a tserver dies part way through a scan on a table with identical keys, nothing is + * lost when the scan completes after a tserver comes back uo. + */ + @Test + public void duplicateSkipStateLostOnTserverFailover() throws Exception { + final int totalWrites = 90; + final int writesPerFlush = 30; + + client.tableOperations().create(tableName); + client.tableOperations().modifyProperties(tableName, properties -> { + properties.remove("table.iterator.scan.vers"); + properties.remove("table.iterator.minc.vers"); + properties.remove("table.iterator.majc.vers"); + properties.put(Property.TABLE_SCAN_MAXMEM.getKey(), "32k"); + }); + + int written = 0; + int flushIndex = 0; + while (written < totalWrites) { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < writesPerFlush && written < totalWrites; i++) { + String value = "v-" + flushIndex + "-" + written; + Mutation m = new Mutation("row"); + m.put("cf", "cq", 100L, new Value(value.getBytes(UTF_8))); + bw.addMutation(m); + written++; + } + } + flushIndex++; + } + assertEquals(totalWrites, written); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + List offlineValues = getOfflineValues(); + + List seen = new ArrayList<>(); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setBatchSize(8); + + var iter = scanner.iterator(); + for (int i = 0; i < 12 && iter.hasNext(); i++) { + seen.add(new String(iter.next().getValue().get(), UTF_8)); + } + + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + Wait.waitFor(() -> { + System.out.println("waiting for tservers"); + return client.instanceOperations().getTabletServers().size() == 2; + }, 60_000); + + iter.forEachRemaining(e -> seen.add(new String(e.getValue().get(), UTF_8))); + } + + assertEquals(offlineValues.size(), seen.size()); + + Collections.sort(offlineValues); + Collections.sort(seen); + assertEquals(offlineValues, seen); + } + + /** + * Offline the test table, read all values with an OfflineScanner, then bring it back online and + * return the values. + */ + private List getOfflineValues() + throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + ClientContext context = (ClientContext) client; + client.tableOperations().offline(tableName, true); + List offlineValues; + try (OfflineScanner offlineScanner = + new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) { + offlineValues = offlineScanner.stream().map(e -> new String(e.getValue().get(), UTF_8)) + .collect(Collectors.toCollection(ArrayList::new)); + } + client.tableOperations().online(tableName, true); + return offlineValues; + } +}