Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ final class Batch {
private final List<KVEntry> results;
private final Key continueKey;
private final long numBytes;
private final int duplicatesToSkip;

Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes) {
Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes,
int duplicatesToSkip) {
this.skipContinueKey = skipContinueKey;
this.results = results;
this.continueKey = continueKey;
this.numBytes = numBytes;
this.duplicatesToSkip = duplicatesToSkip;
}

public boolean isSkipContinueKey() {
Expand All @@ -50,4 +53,8 @@ public Key getContinueKey() {
public long getNumBytes() {
return numBytes;
}

public int getDuplicatesToSkip() {
return duplicatesToSkip;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Scanner {
private final AtomicBoolean interruptFlag;

private boolean readInProgress = false;
private int duplicatesToSkip = 0;

Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) {
this.tablet = tablet;
Expand Down Expand Up @@ -138,7 +139,8 @@ private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException, Tablet
iter = new SourceSwitchingIterator(dataSource, false);
}

results = tablet.nextBatch(iter, range, scanParams);
results = tablet.nextBatch(iter, range, scanParams, duplicatesToSkip);
duplicatesToSkip = 0;

if (results.getResults() == null) {
range = null;
Expand All @@ -148,6 +150,7 @@ private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException, Tablet
} else {
range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(),
range.isEndKeyInclusive());
duplicatesToSkip = results.getDuplicatesToSkip();
return new Pair<>(new ScanBatch(results.getResults(), true), dataSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.TooManyFilesException;
import org.apache.accumulo.tserver.InMemoryMap;
import org.apache.accumulo.tserver.MemKey;
import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.TabletServerResourceManager;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
Expand Down Expand Up @@ -276,8 +277,8 @@ void recordScanTrace(Span span, List<KVEntry> batch, ScanParameters scanParamete
}
}

Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams)
throws IOException {
Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams,
int duplicatesToSkip) throws IOException {

// log.info("In nextBatch..");

Expand All @@ -299,7 +300,8 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM);

Key continueKey = null;
boolean skipContinueKey = false;
boolean skipContinueKey = true;
boolean resumeOnSameKey = false;

YieldCallback<Key> yield = new YieldCallback<>();

Expand All @@ -314,32 +316,54 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true);
}

skipReturnedDuplicates(iter, duplicatesToSkip, range);

Key rangeStartKey = range.getStartKey();
Key currentKey = null;
boolean resumingOnSameKey =
iter.hasTop() && rangeStartKey != null && rangeStartKey.equals(iter.getTopKey());
int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
int duplicatesReturnedForCurrentKey = 0;

while (iter.hasTop()) {
if (yield.hasYielded()) {
throw new IOException(
"Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset());
}
value = iter.getTopValue();
key = iter.getTopKey();
if (currentKey == null || !key.equals(currentKey)) {
currentKey = copyResumeKey(key);
if (resumingOnSameKey && rangeStartKey != null && key.equals(rangeStartKey)) {
duplicatesReturnedForCurrentKey = previousDuplicates;
} else {
duplicatesReturnedForCurrentKey = 0;
resumingOnSameKey = false;
}
}

KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();

duplicatesReturnedForCurrentKey++;

boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;

if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
continueKey = new Key(key);
skipContinueKey = true;
continueKey = copyResumeKey(key);
resumeOnSameKey = true;
skipContinueKey = false;
break;
}

iter.next();
}

if (yield.hasYielded()) {
continueKey = new Key(yield.getPositionAndReset());
continueKey = copyResumeKey(yield.getPositionAndReset());
resumeOnSameKey = false;
skipContinueKey = true;
if (!range.contains(continueKey)) {
throw new IOException("Underlying iterator yielded to a position outside of its range: "
Expand All @@ -362,7 +386,9 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
}
}

return new Batch(skipContinueKey, results, continueKey, resultBytes);
int duplicatesToSkipForNextBatch = resumeOnSameKey ? duplicatesReturnedForCurrentKey : 0;
return new Batch(skipContinueKey, results, continueKey, resultBytes,
duplicatesToSkipForNextBatch);
}

private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
Expand Down Expand Up @@ -515,7 +541,8 @@ private void handleTabletClosedDuringScan(List<KVEntry> results, Tablet.LookupRe

private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) {
if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive());
Key copy = copyResumeKey(key);
Range nlur = new Range(copy, false, range.getEndKey(), range.isEndKeyInclusive());
lookupResult.unfinishedRanges.add(nlur);
}
}
Expand All @@ -526,4 +553,30 @@ public synchronized void updateQueryStats(int size, long numBytes) {
this.queryResultBytes.addAndGet(numBytes);
this.server.getScanMetrics().incrementQueryResultBytes(numBytes);
}

private Key copyResumeKey(Key key) {
if (key instanceof MemKey) {
MemKey memKey = (MemKey) key;
return new MemKey(memKey, memKey.getKVCount());
}
return new Key(key);
}

private void skipReturnedDuplicates(SortedKeyValueIterator<Key,Value> iter, int duplicatesToSkip,
Range range) throws IOException {
if (duplicatesToSkip <= 0 || !range.isStartKeyInclusive()) {
return;
}

Key startKey = range.getStartKey();
if (startKey == null) {
return;
}

int skipped = 0;
while (skipped < duplicatesToSkip && iter.hasTop() && iter.getTopKey().equals(startKey)) {
iter.next();
skipped++;
}
}
}
Loading