diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/LockRange.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/LockRange.java index a7105d0d16d..5342e5087f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/LockRange.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/LockRange.java @@ -51,6 +51,10 @@ public boolean contains(LockRange widenedRange) { return range.contains(widenedRange.range); } + public boolean contains(Text row) { + return range.contains(row); + } + public static LockRange of(String startRow, String endRow) { return of(startRow == null ? null : new Text(startRow), endRow == null ? null : new Text(endRow)); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java index 2358d32f61c..2260543813c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java @@ -124,7 +124,8 @@ private void resetSource() { // ensure the previous tablet still exists in the metadata table if (Iterators.size(iteratorFactory.apply(new Range(prevMetaRow))) == 0) { - throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating"); + throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating", + prevTablet.getTableId(), prevTablet.getEndRow()); } // start scanning at next possible row in metadata table diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java index 1064757d589..e173b89f579 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java @@ -18,11 +18,34 @@ */ package org.apache.accumulo.core.metadata.schema; +import java.util.Optional; + +import org.apache.accumulo.core.data.TableId; +import org.apache.hadoop.io.Text; + public class TabletDeletedException extends RuntimeException { private static final long serialVersionUID = 1L; + private final TableId tableId; + private final Text deletedEndRow; + + public TabletDeletedException(String msg, TableId tableId, Text deletedEndRow) { + super(msg); + this.tableId = tableId; + this.deletedEndRow = deletedEndRow; + } public TabletDeletedException(String msg) { super(msg); + tableId = null; + deletedEndRow = null; + } + + public Optional getDeletedTableId() { + return Optional.ofNullable(tableId); + } + + public Optional getDeletedEndRow() { + return Optional.ofNullable(deletedEndRow); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 1cfd946e45c..44fc5973af0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; import org.apache.accumulo.core.fate.zookeeper.LockRange; import org.apache.accumulo.core.file.FilePrefix; +import org.apache.accumulo.core.metadata.schema.TabletDeletedException; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.PeekingIterator; @@ -131,7 +132,7 @@ static void validateLoadMapping(String tableId, LoadMappingIterator lmi, PeekingIterator pi = new PeekingIterator<>(tabletIterFactory.newTabletIter(startRow)); - try { + try (tabletIterFactory) { KeyExtent currTablet = pi.next(); var fileCounts = new HashMap(); @@ -204,8 +205,6 @@ && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { + new TreeMap<>(fileCounts)); } } - } finally { - tabletIterFactory.close(); } } @@ -254,8 +253,42 @@ public void close() { int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId) .getCount(Property.TABLE_BULK_SKIP_THRESHOLD); - validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, - maxFilesPerTablet, fateId, skip); + try { + validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, + maxFilesPerTablet, fateId, skip); + } catch (TabletDeletedException tde) { + boolean sameTableId = tde.getDeletedTableId().map(bulkInfo.tableId::equals).orElse(false); + if (!sameTableId) { + throw tde; + } + var lockRange = LockRange.of(bulkInfo.firstSplit, bulkInfo.lastSplit); + boolean insideRange = tde.getDeletedEndRow().map(lockRange::contains).orElse(true); + if (insideRange) { + // The deleted split was inside the lock range or its unknown. Splits should not be + // deleted concurrently inside the lock range. + throw tde; + } + + // The bulk operation locks a portion of the table that it is importing to prevent merges. + // However, this code may scan past the range it locked and have a concurrent merge delete + // tablets while its scanning. If this happens, report it as a concurrent merge. + // + // Below is an example how this can happen. + // + // 1. Client sees splits c,d,e,m,p,x in table and creates bulk load into d,e,m + // 2. A merge delete splits e,m from table. This runs after the load plan was created and + // before the bulk load fate operation starts. + // 3. Bulk fate operation locks range (d,m] + // 4. Another merge operation starts over range (o,z] and deletes splits p,x. This merge can + // run concurrently with the bulk operation because they are locking different parts of the + // table. + // 5. The bulk operation scans past m because it does not exist and when scanning p,x gets a + // TabletDeletedException because of the concurrent merge. + log.debug("{} discarding and translating exception ", fateId, tde); + throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, + TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_CONCURRENT_MERGE, + "Concurrent merge happened"); + } } }