From 4409e3cecaea31ddc8d6f1455189d9efdf7c7d76 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Tue, 22 Jul 2025 11:30:09 -0400 Subject: [PATCH 1/2] Added new split option --- .../tserver/tablet/SplitPointUtil.java | 134 ++++++++++++++++++ .../accumulo/tserver/tablet/Tablet.java | 105 ++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java new file mode 100644 index 00000000000..4e08140eaff --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SplitPointUtil { + private static final Logger log = LoggerFactory.getLogger(SplitPointUtil.class); + private static final int DEFAULT_BATCH_SIZE = 200; + + public static Optional findSplitPoint(List files, int maxOpen, + FileSystem fs, Configuration hadoopConf, TableConfiguration tableConf) throws IOException { + + if (files.size() <= maxOpen) { + return computeMidpoint(files, fs, hadoopConf, tableConf); + } + + log.info("Tablet has {} files, reducing using batch size {}", files.size(), DEFAULT_BATCH_SIZE); + List reduced = + reduceFiles(files, DEFAULT_BATCH_SIZE, fs, hadoopConf, tableConf); + + return computeMidpoint(reduced, fs, hadoopConf, tableConf); + } + + private static Optional computeMidpoint(List files, FileSystem fs, + Configuration conf, TableConfiguration tableConf) throws IOException { + List allKeys = new ArrayList<>(); + + for (StoredTabletFile file : files) { + try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()).withTableConfiguration(tableConf) + .build()) { + + reader.seek(new Range(), Collections.emptyList(), false); + + while (reader.hasTop()) { + allKeys.add(new Key(reader.getTopKey())); + reader.next(); + } + } + } + + if (allKeys.isEmpty()) { + return Optional.empty(); + } + + Collections.sort(allKeys); + return Optional.of(allKeys.get(allKeys.size() / 2)); + } + + private static List reduceFiles(List files, int batchSize, + FileSystem fs, Configuration conf, TableConfiguration tableConf) throws IOException { + List reduced = new ArrayList<>(); + + for (int i = 0; i < files.size(); i += batchSize) { + List batch = files.subList(i, Math.min(i + batchSize, files.size())); + StoredTabletFile merged = mergeBatch(batch, fs, conf, tableConf); + reduced.add(merged); + } + + if (reduced.size() > batchSize) { + return reduceFiles(reduced, batchSize, fs, conf, tableConf); + } + + return reduced; + } + + private static StoredTabletFile mergeBatch(List batch, FileSystem fs, + Configuration conf, TableConfiguration tableConf) throws IOException { + Path tmpDir = new Path("/tmp/idxReduce_" + System.nanoTime()); + fs.mkdirs(tmpDir); + Path mergedPath = new Path(tmpDir, "merged_" + UUID.randomUUID() + ".rf"); + + Map props = new HashMap<>(); + tableConf.iterator().forEachRemaining(e -> props.put(e.getKey(), e.getValue())); + + try (RFileWriter writer = RFile.newWriter().to(mergedPath.toString()).withFileSystem(fs) + .withTableProperties(props).build()) { + + for (StoredTabletFile file : batch) { + try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()).withTableConfiguration(tableConf) + .build()) { + + reader.seek(new Range(), Collections.emptyList(), false); + while (reader.hasTop()) { + writer.append(reader.getTopKey(), reader.getTopValue()); + reader.next(); + } + } + } + + } + + return new StoredTabletFile(mergedPath.toString()); + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 4325be7d3ad..c38232e72d4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import static org.apache.accumulo.server.tablets.TabletNameGenerator.createTabletDirectoryName; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletsMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -87,6 +89,7 @@ import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; @@ -106,6 +109,7 @@ import org.apache.accumulo.tserver.scan.ScanParameters; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; @@ -409,6 +413,11 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil bringMinorCompactionOnline(tmpDatafile, newDatafile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, flushId, mincReason); + try { + maybeSplitTablet(); + } catch (IOException e) { + log.error("Error while attempting to split tablet after minor compaction", e); + } } catch (Exception e) { final ServiceLock tserverLock = tabletServer.getLock(); if (tserverLock == null || !tserverLock.verifyLockAtSource()) { @@ -1125,6 +1134,102 @@ public Map getDatafiles() { return getMetadata().getFilesMap(); } + public long estimateTabletSize() { + long size = 0L; + + for (DataFileValue sz : getDatafiles().values()) { + size += sz.getSize(); + } + + return size; + } + + private boolean isSplitPossible() { + + long splitThreshold = tableConfiguration.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + + return !extent.isRootTablet() && !isFindSplitsSuppressed() + && estimateTabletSize() > splitThreshold; + } + + public boolean needsSplit() { + var files = getDatafiles().keySet(); + return files.size() > 1 && !supressFindSplits; + } + + private boolean supressFindSplits = false; + private long timeOfLastMinCWhenFindSplitsWasSupressed = 0; + + private boolean isFindSplitsSuppressed() { + if (supressFindSplits) { + if (timeOfLastMinCWhenFindSplitsWasSupressed != lastMinorCompactionFinishTime) { + supressFindSplits = false; + } else { + // nothing changed, do not split + return true; + } + } + + return false; + } + + private void suppressFindSplits() { + supressFindSplits = true; + timeOfLastMinCWhenFindSplitsWasSupressed = lastMinorCompactionFinishTime; + } + + public void maybeSplitTablet() throws IOException { + if (!isSplitPossible() || !needsSplit()) { + return; + } + + Optional optSplit = computeSplitPoint(); + if (optSplit.isEmpty()) { + suppressFindSplits(); + return; + } + + Text midRow = optSplit + .orElseThrow(() -> new IllegalStateException("Split point should be present")).getRow(); + KeyExtent low = new KeyExtent(extent.tableId(), midRow, extent.prevEndRow()); + KeyExtent high = new KeyExtent(extent.tableId(), extent.endRow(), midRow); + + ServerContext context = tabletServer.getContext(); + Ample.TabletsMutator mutator = context.getAmple().mutateTablets(); + + String lowDir = createTabletDirectoryName(context, midRow); + String highDir = getMetadata().getDirName(); + MetadataTime time = tabletTime.getMetadataTime(); + + mutator.mutateTablet(low).putDirName(lowDir).putTime(time); + + mutator.mutateTablet(high).putDirName(highDir).putTime(time); + + mutator.close(); + + log.info("Split tablet {} at {} into {} and {}", extent, midRow, low, high); + } + + public Optional computeSplitPoint() { + try { + List files = new ArrayList<>(getDatafiles().keySet()); + if (files.isEmpty()) { + return Optional.empty(); + } + + var context = tabletServer.getContext(); + var fs = context.getVolumeManager().getFileSystemByPath(files.get(0).getPath()); + var conf = context.getHadoopConf(); + var maxOpen = tableConfiguration.getCount(Property.SPLIT_MAXOPEN); + + return SplitPointUtil.findSplitPoint(files, maxOpen, fs, conf, tableConfiguration); + + } catch (IOException e) { + log.warn("Error computing split point for tablet {}", extent, e); + return Optional.empty(); + } + } + @Override public void addToYieldMetric(int i) { getTabletServer().getScanMetrics().addYield(i); From 49a6dc62a77b69a96c1da9ec15e56afb67cf3df0 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Tue, 21 Oct 2025 10:51:37 -0400 Subject: [PATCH 2/2] removed splitting from Tablet and into SplitUtils --- .../apache/accumulo/core/conf/Property.java | 3 + .../metadata/schema/UnSplittableMetadata.java | 17 ++- .../metadata/schema/TabletMetadataTest.java | 17 +-- .../state/TabletManagementIterator.java | 12 +- .../accumulo/server/split/SplitUtils.java | 85 ++++++++++- .../constraints/MetadataConstraintsTest.java | 15 +- .../tableOps/merge/MergeTabletsTest.java | 2 +- .../tableOps/split/UpdateTabletsTest.java | 4 +- .../tserver/tablet/SplitPointUtil.java | 134 ------------------ .../accumulo/tserver/tablet/Tablet.java | 105 -------------- .../functional/AmpleConditionalWriterIT.java | 4 +- 11 files changed, 123 insertions(+), 275 deletions(-) delete mode 100644 server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 8139e31ff3b..d750ed2bf85 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -935,6 +935,9 @@ public enum Property { "1.3.5"), TABLE_SPLIT_THRESHOLD("table.split.threshold", "1G", PropertyType.BYTES, "A tablet is split when the combined size of RFiles exceeds this amount.", "1.3.5"), + TABLE_MAX_FILES_BEFORE_SPLIT("table.split.maxfiles", "1000", PropertyType.COUNT, + "The maximum number of files a tablet can have before split, regardless of size. this helps tablets with many small files that haven't reached the split threshold to split so they can be compacted. A value of 0 disables this check.", + "4.0.0"), TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10k", PropertyType.BYTES, "Maximum size of end row.", "1.7.0"), TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.age", "10m", PropertyType.TIMEDURATION, diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java index a63b09b29af..b73ff844bbf 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java @@ -40,8 +40,9 @@ public class UnSplittableMetadata { private final HashCode hashOfSplitParameters; private UnSplittableMetadata(KeyExtent keyExtent, long splitThreshold, long maxEndRowSize, - int maxFilesToOpen, Set files) { - this(calculateSplitParamsHash(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, files)); + int maxFilesToOpen, int maxFilesBeforeSplit, Set files) { + this(calculateSplitParamsHash(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, + maxFilesBeforeSplit, files)); } private UnSplittableMetadata(HashCode hashOfSplitParameters) { @@ -75,26 +76,30 @@ public String toBase64() { } private static HashCode calculateSplitParamsHash(KeyExtent keyExtent, long splitThreshold, - long maxEndRowSize, int maxFilesToOpen, Set files) { + long maxEndRowSize, int maxFilesToOpen, int maxFilesBeforeSplit, + Set files) { Preconditions.checkArgument(splitThreshold > 0, "splitThreshold must be greater than 0"); Preconditions.checkArgument(maxEndRowSize > 0, "maxEndRowSize must be greater than 0"); Preconditions.checkArgument(maxFilesToOpen > 0, "maxFilesToOpen must be greater than 0"); + Preconditions.checkArgument(maxFilesBeforeSplit > 0, + "maxFilesBeforeSplit must be greater than 0"); // Use static call to murmur3_128() so the seed is always the same // Hashing.goodFastHash will seed with the current time, and we need the seed to be // the same across restarts and instances var hasher = Hashing.murmur3_128().newHasher(); hasher.putBytes(serializeKeyExtent(keyExtent)).putLong(splitThreshold).putLong(maxEndRowSize) - .putInt(maxFilesToOpen); + .putInt(maxFilesToOpen).putInt(maxFilesBeforeSplit); files.stream().map(StoredTabletFile::getMetadata).sorted() .forEach(path -> hasher.putString(path, UTF_8)); return hasher.hash(); } public static UnSplittableMetadata toUnSplittable(KeyExtent keyExtent, long splitThreshold, - long maxEndRowSize, int maxFilesToOpen, Set files) { + long maxEndRowSize, int maxFilesToOpen, int maxFilesBeforeSplit, + Set files) { return new UnSplittableMetadata(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, - files); + maxFilesBeforeSplit, files); } public static UnSplittableMetadata toUnSplittable(String base64HashOfSplitParameters) { diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 4bbb4199ef7..c143ebd7476 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -150,7 +150,7 @@ public void testAllColumns() { FateId userCompactFateId = FateId.from(type, UUID.randomUUID()); mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, userCompactFateId.canonical(), ""); var unsplittableMeta = - UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2)); + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2)); SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta.toBase64())); SteadyTime suspensionTime = SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS); @@ -537,7 +537,7 @@ public void testUnsplittableColumn() { // Test with files var unsplittableMeta1 = - UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf3)); + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2, sf3)); Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta1.toBase64())); TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), @@ -545,7 +545,8 @@ public void testUnsplittableColumn() { assertUnsplittable(unsplittableMeta1, tm.getUnSplittable(), true); // Test empty file set - var unsplittableMeta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of()); + var unsplittableMeta2 = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of()); mutation = TabletColumnFamily.createPrevRowMutation(extent); SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta2.toBase64())); tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), @@ -558,7 +559,7 @@ public void testUnsplittableColumn() { // Test with ranges // use sf4 which includes sf4 instead of sf3 which has a range var unsplittableMeta3 = - UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf4)); + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2, sf4)); mutation = TabletColumnFamily.createPrevRowMutation(extent); SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta3.toBase64())); tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), @@ -592,9 +593,9 @@ public void testUnsplittableWithRange() { StoredTabletFile sf3 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"), new Range("a", false, "d", true)); - var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1)); - var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf2)); - var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf3)); + var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1)); + var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf2)); + var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf3)); // compare each against the others to make sure not equal assertUnsplittable(meta1, meta2, false); @@ -760,7 +761,7 @@ public void testBuilder() { SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, selFilesFateId, SteadyTime.from(100_000, TimeUnit.NANOSECONDS)); var unsplittableMeta = - UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2)); + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2)); TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) .putSuspension(ser1, SteadyTime.from(45L, TimeUnit.MILLISECONDS)) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index ca5b0bafbce..6ba900457ba 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -80,6 +80,7 @@ private static class SplitConfig { long splitThreshold; long maxEndRowSize; int maxFilesToOpen; + int maxFilesBeforeSplit; void update(TableId tableId, Configuration tableConfig) { if (!tableId.equals(this.tableId)) { @@ -90,6 +91,8 @@ void update(TableId tableId, Configuration tableConfig) { .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); maxFilesToOpen = (int) ConfigurationTypeHelper .getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey())); + maxFilesBeforeSplit = (int) ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_FILES_BEFORE_SPLIT.getKey())); } } } @@ -108,13 +111,14 @@ private static boolean shouldReturnDueToSplit(final TabletMetadata tm, // which gives a chance to clean up the marker and recheck. var unsplittable = tm.getUnSplittable(); if (unsplittable != null) { - return !unsplittable - .equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), splitConfig.splitThreshold, - splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, tm.getFiles())); + return !unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), + splitConfig.splitThreshold, splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, + splitConfig.maxFilesBeforeSplit, tm.getFiles())); } // If unsplittable is not set at all then check if over split threshold - final boolean shouldSplit = SplitUtils.needsSplit(splitConfig.splitThreshold, tm); + final boolean shouldSplit = + SplitUtils.needsSplit(splitConfig.splitThreshold, splitConfig.maxFilesBeforeSplit, tm); LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), tm.getFileSize(), splitConfig.splitThreshold, shouldSplit); return shouldSplit; diff --git a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java index ac4d03a53eb..8e07ea2de52 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java @@ -202,9 +202,10 @@ public static SortedSet findSplits(ServerContext context, TabletMetadata t } if (tabletMetadata.getFiles().size() >= maxFilesToOpen) { - log.warn("Tablet {} has {} files which exceeds the max to open for split, so can not split.", + log.warn("Tablet {} has {} files, using batched approach to compute splits.", tabletMetadata.getExtent(), tabletMetadata.getFiles().size()); - return new TreeSet<>(); + return findSplitsWithBatching(context, tableConf, tabletMetadata, estimatedSize, threshold, + maxEndRowSize, maxFilesToOpen); } try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(), @@ -225,6 +226,63 @@ public static SortedSet findSplits(ServerContext context, TabletMetadata t } } + private static SortedSet findSplitsWithBatching(ServerContext context, + TableConfiguration tableConf, TabletMetadata tabletMetadata, long estimatedSize, + long threshold, long maxEndRowSize, int maxFilesToOpen) { + + final int BATCH_SIZE = Math.min(200, maxFilesToOpen); + var allFiles = new ArrayList<>(tabletMetadata.getFiles()); + + log.info("Computing splits for {} with {} files using batch size {}", + tabletMetadata.getExtent(), allFiles.size(), BATCH_SIZE); + + try { + // Collect all index keys from batches + List allIndexKeys = new ArrayList<>(); + + for (int i = 0; i < allFiles.size(); i += BATCH_SIZE) { + int endIdx = Math.min(i + BATCH_SIZE, allFiles.size()); + var batch = allFiles.subList(i, endIdx); + + log.debug("Processing batch {}-{} of {} for tablet {}", i, endIdx, allFiles.size(), + tabletMetadata.getExtent()); + + // Read index keys from this batch + try (var indexIterable = new IndexIterable(context, tableConf, batch, + tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) { + + for (Key key : indexIterable) { + allIndexKeys.add(new Key(key)); // Create defensive copy + } + } + } + + log.info("Collected {} index keys from {} files for tablet {}", allIndexKeys.size(), + allFiles.size(), tabletMetadata.getExtent()); + + // Sort all collected keys + allIndexKeys.sort(Key::compareTo); + + // Now compute splits from the merged view + Predicate splitPredicate = splitCandidate -> { + if (splitCandidate.length() >= maxEndRowSize) { + log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(), + splitCandidate.length()); + return false; + } + return true; + }; + + int desiredSplits = calculateDesiredSplits(estimatedSize, threshold); + return findSplits(allIndexKeys, desiredSplits, splitPredicate); + + } catch (Exception e) { + log.error("Error computing splits with batching for tablet {}", tabletMetadata.getExtent(), + e); + return new TreeSet<>(); + } + } + private static int longestCommonLength(ByteSequence bs1, ByteSequence bs2) { int common = 0; while (common < bs1.length() && common < bs2.length() @@ -291,11 +349,22 @@ public static SortedSet findSplits(Iterable tabletIndexIterable, int public static boolean needsSplit(ServerContext context, TabletMetadata tabletMetadata) { var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); - return needsSplit(splitThreshold, tabletMetadata); + int maxFilesBeforeSplit = tableConf.getCount(Property.TABLE_MAX_FILES_BEFORE_SPLIT); + return needsSplit(splitThreshold, maxFilesBeforeSplit, tabletMetadata); } - public static boolean needsSplit(long splitThreshold, TabletMetadata tabletMetadata) { - return tabletMetadata.getFileSize() > splitThreshold; + public static boolean needsSplit(long splitThreshold, int maxFilesBeforeSplit, + TabletMetadata tabletMetadata) { + if (tabletMetadata.getFileSize() > splitThreshold) { + return true; + } + int fileCount = tabletMetadata.getFiles().size(); + if (maxFilesBeforeSplit > 0 && fileCount > maxFilesBeforeSplit) { + log.info("Tablet {} needs split due to file count: {} > {}", tabletMetadata.getExtent(), + fileCount, maxFilesBeforeSplit); + return true; + } + return false; } public static UnSplittableMetadata toUnSplittable(ServerContext context, @@ -304,9 +373,11 @@ public static UnSplittableMetadata toUnSplittable(ServerContext context, var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE); int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN); + int maxFilesBeforeSplit = tableConf.getCount(Property.TABLE_MAX_FILES_BEFORE_SPLIT); - var unSplittableMetadata = UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(), - splitThreshold, maxEndRowSize, maxFilesToOpen, tabletMetadata.getFiles()); + var unSplittableMetadata = + UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(), splitThreshold, + maxEndRowSize, maxFilesToOpen, maxFilesBeforeSplit, tabletMetadata.getFiles()); log.trace( "Created unsplittable metadata for tablet {}. splitThreshold: {}, maxEndRowSize:{}, maxFilesToOpen: {}, hashCode: {}", diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 2ed44cbf4b1..24531cb579f 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -585,7 +585,7 @@ public void testUnsplittableColumn() { StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); var unsplittableMeta = UnSplittableMetadata - .toUnSplittable(KeyExtent.fromMetaRow(new Text("0;foo")), 100, 110, 120, Set.of(sf1)); + .toUnSplittable(KeyExtent.fromMetaRow(new Text("0;foo")), 100, 110, 120, 1000, Set.of(sf1)); m = new Mutation(new Text("0;foo")); SplitColumnFamily.UNSPLITTABLE_COLUMN.put(m, new Value(unsplittableMeta.toBase64())); @@ -603,17 +603,20 @@ public void testUnsplittableColumn() { // test invalid args KeyExtent extent = KeyExtent.fromMetaRow(new Text("0;foo")); assertThrows(IllegalArgumentException.class, - () -> UnSplittableMetadata.toUnSplittable(extent, -100, 110, 120, Set.of(sf1))); + () -> UnSplittableMetadata.toUnSplittable(extent, -100, 110, 120, 1000, Set.of(sf1))); assertThrows(IllegalArgumentException.class, - () -> UnSplittableMetadata.toUnSplittable(extent, 100, -110, 120, Set.of(sf1))); + () -> UnSplittableMetadata.toUnSplittable(extent, 100, -110, 120, 1000, Set.of(sf1))); assertThrows(IllegalArgumentException.class, - () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, -120, Set.of(sf1))); + () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, -120, 1000, Set.of(sf1))); assertThrows(NullPointerException.class, - () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, null)); + () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, null)); + assertThrows(IllegalArgumentException.class, + () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, -1000, Set.of(sf1))); // Test metadata constraints validate invalid hashcode m = new Mutation(new Text("0;foo")); - unsplittableMeta = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1)); + unsplittableMeta = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1)); // partial hashcode is invalid var invalidHashCode = unsplittableMeta.toBase64().substring(0, unsplittableMeta.toBase64().length() - 1); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 9e3037dbe01..425c2693fea 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -170,7 +170,7 @@ public void testManyColumns() throws Exception { var tabletFiles = Map.of(file1, dfv1, file2, dfv2); var unsplittableMeta = - UnSplittableMetadata.toUnSplittable(ke3, 1000, 1001, 10, tabletFiles.keySet()); + UnSplittableMetadata.toUnSplittable(ke3, 1000, 1001, 10, 1000, tabletFiles.keySet()); // Setup the metadata for the last tablet in the merge range, this is that tablet that merge // will modify. diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index aba27739fe2..b14c43fb950 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -279,8 +279,8 @@ public void testManyColumns() throws Exception { EasyMock.expect(tabletMeta.getHostingRequested()).andReturn(true).atLeastOnce(); EasyMock.expect(tabletMeta.getSuspend()).andReturn(suspendingTServer).atLeastOnce(); EasyMock.expect(tabletMeta.getLast()).andReturn(lastLocation).atLeastOnce(); - UnSplittableMetadata usm = - UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet()); + UnSplittableMetadata usm = UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, + 10000, tabletFiles.keySet()); EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce(); EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java deleted file mode 100644 index 4e08140eaff..00000000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitPointUtil.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.tserver.tablet; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -import org.apache.accumulo.core.client.rfile.RFile; -import org.apache.accumulo.core.client.rfile.RFileWriter; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SplitPointUtil { - private static final Logger log = LoggerFactory.getLogger(SplitPointUtil.class); - private static final int DEFAULT_BATCH_SIZE = 200; - - public static Optional findSplitPoint(List files, int maxOpen, - FileSystem fs, Configuration hadoopConf, TableConfiguration tableConf) throws IOException { - - if (files.size() <= maxOpen) { - return computeMidpoint(files, fs, hadoopConf, tableConf); - } - - log.info("Tablet has {} files, reducing using batch size {}", files.size(), DEFAULT_BATCH_SIZE); - List reduced = - reduceFiles(files, DEFAULT_BATCH_SIZE, fs, hadoopConf, tableConf); - - return computeMidpoint(reduced, fs, hadoopConf, tableConf); - } - - private static Optional computeMidpoint(List files, FileSystem fs, - Configuration conf, TableConfiguration tableConf) throws IOException { - List allKeys = new ArrayList<>(); - - for (StoredTabletFile file : files) { - try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file, fs, conf, tableConf.getCryptoService()).withTableConfiguration(tableConf) - .build()) { - - reader.seek(new Range(), Collections.emptyList(), false); - - while (reader.hasTop()) { - allKeys.add(new Key(reader.getTopKey())); - reader.next(); - } - } - } - - if (allKeys.isEmpty()) { - return Optional.empty(); - } - - Collections.sort(allKeys); - return Optional.of(allKeys.get(allKeys.size() / 2)); - } - - private static List reduceFiles(List files, int batchSize, - FileSystem fs, Configuration conf, TableConfiguration tableConf) throws IOException { - List reduced = new ArrayList<>(); - - for (int i = 0; i < files.size(); i += batchSize) { - List batch = files.subList(i, Math.min(i + batchSize, files.size())); - StoredTabletFile merged = mergeBatch(batch, fs, conf, tableConf); - reduced.add(merged); - } - - if (reduced.size() > batchSize) { - return reduceFiles(reduced, batchSize, fs, conf, tableConf); - } - - return reduced; - } - - private static StoredTabletFile mergeBatch(List batch, FileSystem fs, - Configuration conf, TableConfiguration tableConf) throws IOException { - Path tmpDir = new Path("/tmp/idxReduce_" + System.nanoTime()); - fs.mkdirs(tmpDir); - Path mergedPath = new Path(tmpDir, "merged_" + UUID.randomUUID() + ".rf"); - - Map props = new HashMap<>(); - tableConf.iterator().forEachRemaining(e -> props.put(e.getKey(), e.getValue())); - - try (RFileWriter writer = RFile.newWriter().to(mergedPath.toString()).withFileSystem(fs) - .withTableProperties(props).build()) { - - for (StoredTabletFile file : batch) { - try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file, fs, conf, tableConf.getCryptoService()).withTableConfiguration(tableConf) - .build()) { - - reader.seek(new Range(), Collections.emptyList(), false); - while (reader.hasTop()) { - writer.append(reader.getTopKey(), reader.getTopValue()); - reader.next(); - } - } - } - - } - - return new StoredTabletFile(mergedPath.toString()); - } -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index c38232e72d4..4325be7d3ad 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import static org.apache.accumulo.server.tablets.TabletNameGenerator.createTabletDirectoryName; import java.io.FileNotFoundException; import java.io.IOException; @@ -76,7 +75,6 @@ import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletsMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -89,7 +87,6 @@ import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; @@ -109,7 +106,6 @@ import org.apache.accumulo.tserver.scan.ScanParameters; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; @@ -413,11 +409,6 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil bringMinorCompactionOnline(tmpDatafile, newDatafile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, flushId, mincReason); - try { - maybeSplitTablet(); - } catch (IOException e) { - log.error("Error while attempting to split tablet after minor compaction", e); - } } catch (Exception e) { final ServiceLock tserverLock = tabletServer.getLock(); if (tserverLock == null || !tserverLock.verifyLockAtSource()) { @@ -1134,102 +1125,6 @@ public Map getDatafiles() { return getMetadata().getFilesMap(); } - public long estimateTabletSize() { - long size = 0L; - - for (DataFileValue sz : getDatafiles().values()) { - size += sz.getSize(); - } - - return size; - } - - private boolean isSplitPossible() { - - long splitThreshold = tableConfiguration.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); - - return !extent.isRootTablet() && !isFindSplitsSuppressed() - && estimateTabletSize() > splitThreshold; - } - - public boolean needsSplit() { - var files = getDatafiles().keySet(); - return files.size() > 1 && !supressFindSplits; - } - - private boolean supressFindSplits = false; - private long timeOfLastMinCWhenFindSplitsWasSupressed = 0; - - private boolean isFindSplitsSuppressed() { - if (supressFindSplits) { - if (timeOfLastMinCWhenFindSplitsWasSupressed != lastMinorCompactionFinishTime) { - supressFindSplits = false; - } else { - // nothing changed, do not split - return true; - } - } - - return false; - } - - private void suppressFindSplits() { - supressFindSplits = true; - timeOfLastMinCWhenFindSplitsWasSupressed = lastMinorCompactionFinishTime; - } - - public void maybeSplitTablet() throws IOException { - if (!isSplitPossible() || !needsSplit()) { - return; - } - - Optional optSplit = computeSplitPoint(); - if (optSplit.isEmpty()) { - suppressFindSplits(); - return; - } - - Text midRow = optSplit - .orElseThrow(() -> new IllegalStateException("Split point should be present")).getRow(); - KeyExtent low = new KeyExtent(extent.tableId(), midRow, extent.prevEndRow()); - KeyExtent high = new KeyExtent(extent.tableId(), extent.endRow(), midRow); - - ServerContext context = tabletServer.getContext(); - Ample.TabletsMutator mutator = context.getAmple().mutateTablets(); - - String lowDir = createTabletDirectoryName(context, midRow); - String highDir = getMetadata().getDirName(); - MetadataTime time = tabletTime.getMetadataTime(); - - mutator.mutateTablet(low).putDirName(lowDir).putTime(time); - - mutator.mutateTablet(high).putDirName(highDir).putTime(time); - - mutator.close(); - - log.info("Split tablet {} at {} into {} and {}", extent, midRow, low, high); - } - - public Optional computeSplitPoint() { - try { - List files = new ArrayList<>(getDatafiles().keySet()); - if (files.isEmpty()) { - return Optional.empty(); - } - - var context = tabletServer.getContext(); - var fs = context.getVolumeManager().getFileSystemByPath(files.get(0).getPath()); - var conf = context.getHadoopConf(); - var maxOpen = tableConfiguration.getCount(Property.SPLIT_MAXOPEN); - - return SplitPointUtil.findSplitPoint(files, maxOpen, fs, conf, tableConfiguration); - - } catch (IOException e) { - log.warn("Error computing split point for tablet {}", extent, e); - return Optional.empty(); - } - } - @Override public void addToYieldMetric(int i) { getTabletServer().getScanMetrics().addYield(i); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index ee6771cc90f..0dc40edb340 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1569,7 +1569,7 @@ public void testUnsplittable() { var tabletMeta1 = TabletMetadata.builder(e1).build(UNSPLITTABLE); // require the UNSPLITTABLE column to be absent when it is absent - var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32, Set.of()); + var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32, 1000, Set.of()); try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, UNSPLITTABLE) @@ -1587,7 +1587,7 @@ public void testUnsplittable() { } assertEquals(usm1.toBase64(), context.getAmple().readTablet(e1).getUnSplittable().toBase64()); - var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33, Set.of()); + var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33, 1000, Set.of()); var tabletMeta3 = TabletMetadata.builder(e1).setUnSplittable(usm2).build(UNSPLITTABLE); // require the UNSPLITTABLE column to be usm2 when it is actually usm1 try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {