Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public boolean contains(LockRange widenedRange) {
return range.contains(widenedRange.range);
}

public boolean contains(Text row) {
return range.contains(row);
}

public static LockRange of(String startRow, String endRow) {
return of(startRow == null ? null : new Text(startRow),
endRow == null ? null : new Text(endRow));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private void resetSource() {

// ensure the previous tablet still exists in the metadata table
if (Iterators.size(iteratorFactory.apply(new Range(prevMetaRow))) == 0) {
throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating");
throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating",
prevTablet.getTableId(), prevTablet.getEndRow());
}

// start scanning at next possible row in metadata table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,34 @@
*/
package org.apache.accumulo.core.metadata.schema;

import java.util.Optional;

import org.apache.accumulo.core.data.TableId;
import org.apache.hadoop.io.Text;

public class TabletDeletedException extends RuntimeException {

private static final long serialVersionUID = 1L;
private final TableId tableId;
private final Text deletedEndRow;

public TabletDeletedException(String msg, TableId tableId, Text deletedEndRow) {
super(msg);
this.tableId = tableId;
this.deletedEndRow = deletedEndRow;
}

public TabletDeletedException(String msg) {
super(msg);
tableId = null;
deletedEndRow = null;
}

public Optional<TableId> getDeletedTableId() {
return Optional.ofNullable(tableId);
}

public Optional<Text> getDeletedEndRow() {
return Optional.ofNullable(deletedEndRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock;
import org.apache.accumulo.core.fate.zookeeper.LockRange;
import org.apache.accumulo.core.file.FilePrefix;
import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.PeekingIterator;
Expand Down Expand Up @@ -131,7 +132,7 @@ static void validateLoadMapping(String tableId, LoadMappingIterator lmi,
PeekingIterator<KeyExtent> pi =
new PeekingIterator<>(tabletIterFactory.newTabletIter(startRow));

try {
try (tabletIterFactory) {
KeyExtent currTablet = pi.next();

var fileCounts = new HashMap<String,Integer>();
Expand Down Expand Up @@ -204,8 +205,6 @@ && equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
+ new TreeMap<>(fileCounts));
}
}
} finally {
tabletIterFactory.close();
}
}

Expand Down Expand Up @@ -254,8 +253,42 @@ public void close() {

int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId)
.getCount(Property.TABLE_BULK_SKIP_THRESHOLD);
validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets,
maxFilesPerTablet, fateId, skip);
try {
validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets,
maxFilesPerTablet, fateId, skip);
} catch (TabletDeletedException tde) {
boolean sameTableId = tde.getDeletedTableId().map(bulkInfo.tableId::equals).orElse(false);
if (!sameTableId) {
throw tde;
}
var lockRange = LockRange.of(bulkInfo.firstSplit, bulkInfo.lastSplit);
boolean insideRange = tde.getDeletedEndRow().map(lockRange::contains).orElse(true);
if (insideRange) {
// The deleted split was inside the lock range or its unknown. Splits should not be
// deleted concurrently inside the lock range.
throw tde;
}

// The bulk operation locks a portion of the table that it is importing to prevent merges.
// However, this code may scan past the range it locked and have a concurrent merge delete
// tablets while its scanning. If this happens, report it as a concurrent merge.
//
// Below is an example how this can happen.
//
// 1. Client sees splits c,d,e,m,p,x in table and creates bulk load into d,e,m
// 2. A merge delete splits e,m from table. This runs after the load plan was created and
// before the bulk load fate operation starts.
// 3. Bulk fate operation locks range (d,m]
// 4. Another merge operation starts over range (o,z] and deletes splits p,x. This merge can
// run concurrently with the bulk operation because they are locking different parts of the
// table.
// 5. The bulk operation scans past m because it does not exist and when scanning p,x gets a
// TabletDeletedException because of the concurrent merge.
log.debug("{} discarding and translating exception ", fateId, tde);
throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_CONCURRENT_MERGE,
"Concurrent merge happened");
}
}
}

Expand Down