Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,9 @@ public enum Property {
"1.3.5"),
TABLE_SPLIT_THRESHOLD("table.split.threshold", "1G", PropertyType.BYTES,
"A tablet is split when the combined size of RFiles exceeds this amount.", "1.3.5"),
TABLE_MAX_FILES_BEFORE_SPLIT("table.split.maxfiles", "1000", PropertyType.COUNT,
"The maximum number of files a tablet can have before split, regardless of size. this helps tablets with many small files that haven't reached the split threshold to split so they can be compacted. A value of 0 disables this check.",
"4.0.0"),
TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10k", PropertyType.BYTES,
"Maximum size of end row.", "1.7.0"),
TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.age", "10m", PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public class UnSplittableMetadata {
private final HashCode hashOfSplitParameters;

private UnSplittableMetadata(KeyExtent keyExtent, long splitThreshold, long maxEndRowSize,
int maxFilesToOpen, Set<StoredTabletFile> files) {
this(calculateSplitParamsHash(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, files));
int maxFilesToOpen, int maxFilesBeforeSplit, Set<StoredTabletFile> files) {
this(calculateSplitParamsHash(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen,
maxFilesBeforeSplit, files));
}

private UnSplittableMetadata(HashCode hashOfSplitParameters) {
Expand Down Expand Up @@ -75,26 +76,30 @@ public String toBase64() {
}

private static HashCode calculateSplitParamsHash(KeyExtent keyExtent, long splitThreshold,
long maxEndRowSize, int maxFilesToOpen, Set<StoredTabletFile> files) {
long maxEndRowSize, int maxFilesToOpen, int maxFilesBeforeSplit,
Set<StoredTabletFile> files) {
Preconditions.checkArgument(splitThreshold > 0, "splitThreshold must be greater than 0");
Preconditions.checkArgument(maxEndRowSize > 0, "maxEndRowSize must be greater than 0");
Preconditions.checkArgument(maxFilesToOpen > 0, "maxFilesToOpen must be greater than 0");
Preconditions.checkArgument(maxFilesBeforeSplit > 0,
"maxFilesBeforeSplit must be greater than 0");

// Use static call to murmur3_128() so the seed is always the same
// Hashing.goodFastHash will seed with the current time, and we need the seed to be
// the same across restarts and instances
var hasher = Hashing.murmur3_128().newHasher();
hasher.putBytes(serializeKeyExtent(keyExtent)).putLong(splitThreshold).putLong(maxEndRowSize)
.putInt(maxFilesToOpen);
.putInt(maxFilesToOpen).putInt(maxFilesBeforeSplit);
files.stream().map(StoredTabletFile::getMetadata).sorted()
.forEach(path -> hasher.putString(path, UTF_8));
return hasher.hash();
}

public static UnSplittableMetadata toUnSplittable(KeyExtent keyExtent, long splitThreshold,
long maxEndRowSize, int maxFilesToOpen, Set<StoredTabletFile> files) {
long maxEndRowSize, int maxFilesToOpen, int maxFilesBeforeSplit,
Set<StoredTabletFile> files) {
return new UnSplittableMetadata(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen,
files);
maxFilesBeforeSplit, files);
}

public static UnSplittableMetadata toUnSplittable(String base64HashOfSplitParameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testAllColumns() {
FateId userCompactFateId = FateId.from(type, UUID.randomUUID());
mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, userCompactFateId.canonical(), "");
var unsplittableMeta =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2));
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2));
SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta.toBase64()));

SteadyTime suspensionTime = SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -537,15 +537,16 @@ public void testUnsplittableColumn() {

// Test with files
var unsplittableMeta1 =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf3));
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2, sf3));
Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta1.toBase64()));
TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
EnumSet.of(UNSPLITTABLE), true, false);
assertUnsplittable(unsplittableMeta1, tm.getUnSplittable(), true);

// Test empty file set
var unsplittableMeta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of());
var unsplittableMeta2 =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of());
mutation = TabletColumnFamily.createPrevRowMutation(extent);
SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta2.toBase64()));
tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
Expand All @@ -558,7 +559,7 @@ public void testUnsplittableColumn() {
// Test with ranges
// use sf4 which includes sf4 instead of sf3 which has a range
var unsplittableMeta3 =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf4));
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2, sf4));
mutation = TabletColumnFamily.createPrevRowMutation(extent);
SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta3.toBase64()));
tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
Expand Down Expand Up @@ -592,9 +593,9 @@ public void testUnsplittableWithRange() {
StoredTabletFile sf3 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"),
new Range("a", false, "d", true));

var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1));
var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf2));
var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf3));
var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1));
var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf2));
var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf3));

// compare each against the others to make sure not equal
assertUnsplittable(meta1, meta2, false);
Expand Down Expand Up @@ -760,7 +761,7 @@ public void testBuilder() {
SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, selFilesFateId,
SteadyTime.from(100_000, TimeUnit.NANOSECONDS));
var unsplittableMeta =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2));
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1, sf2));

TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm)
.putSuspension(ser1, SteadyTime.from(45L, TimeUnit.MILLISECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private static class SplitConfig {
long splitThreshold;
long maxEndRowSize;
int maxFilesToOpen;
int maxFilesBeforeSplit;

void update(TableId tableId, Configuration tableConfig) {
if (!tableId.equals(this.tableId)) {
Expand All @@ -90,6 +91,8 @@ void update(TableId tableId, Configuration tableConfig) {
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey()));
maxFilesToOpen = (int) ConfigurationTypeHelper
.getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey()));
maxFilesBeforeSplit = (int) ConfigurationTypeHelper
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_FILES_BEFORE_SPLIT.getKey()));
}
}
}
Expand All @@ -108,13 +111,14 @@ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
// which gives a chance to clean up the marker and recheck.
var unsplittable = tm.getUnSplittable();
if (unsplittable != null) {
return !unsplittable
.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), splitConfig.splitThreshold,
splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, tm.getFiles()));
return !unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(),
splitConfig.splitThreshold, splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen,
splitConfig.maxFilesBeforeSplit, tm.getFiles()));
}

// If unsplittable is not set at all then check if over split threshold
final boolean shouldSplit = SplitUtils.needsSplit(splitConfig.splitThreshold, tm);
final boolean shouldSplit =
SplitUtils.needsSplit(splitConfig.splitThreshold, splitConfig.maxFilesBeforeSplit, tm);
LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(),
tm.getFileSize(), splitConfig.splitThreshold, shouldSplit);
return shouldSplit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ public static SortedSet<Text> findSplits(ServerContext context, TabletMetadata t
}

if (tabletMetadata.getFiles().size() >= maxFilesToOpen) {
log.warn("Tablet {} has {} files which exceeds the max to open for split, so can not split.",
log.warn("Tablet {} has {} files, using batched approach to compute splits.",
tabletMetadata.getExtent(), tabletMetadata.getFiles().size());
return new TreeSet<>();
return findSplitsWithBatching(context, tableConf, tabletMetadata, estimatedSize, threshold,
maxEndRowSize, maxFilesToOpen);
}

try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(),
Expand All @@ -225,6 +226,63 @@ public static SortedSet<Text> findSplits(ServerContext context, TabletMetadata t
}
}

private static SortedSet<Text> findSplitsWithBatching(ServerContext context,
TableConfiguration tableConf, TabletMetadata tabletMetadata, long estimatedSize,
long threshold, long maxEndRowSize, int maxFilesToOpen) {

final int BATCH_SIZE = Math.min(200, maxFilesToOpen);
var allFiles = new ArrayList<>(tabletMetadata.getFiles());

log.info("Computing splits for {} with {} files using batch size {}",
tabletMetadata.getExtent(), allFiles.size(), BATCH_SIZE);

try {
// Collect all index keys from batches
List<Key> allIndexKeys = new ArrayList<>();

for (int i = 0; i < allFiles.size(); i += BATCH_SIZE) {
int endIdx = Math.min(i + BATCH_SIZE, allFiles.size());
var batch = allFiles.subList(i, endIdx);

log.debug("Processing batch {}-{} of {} for tablet {}", i, endIdx, allFiles.size(),
tabletMetadata.getExtent());

// Read index keys from this batch
try (var indexIterable = new IndexIterable(context, tableConf, batch,
tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) {

for (Key key : indexIterable) {
allIndexKeys.add(new Key(key)); // Create defensive copy
}
}
}

log.info("Collected {} index keys from {} files for tablet {}", allIndexKeys.size(),
allFiles.size(), tabletMetadata.getExtent());

// Sort all collected keys
allIndexKeys.sort(Key::compareTo);

// Now compute splits from the merged view
Predicate<ByteSequence> splitPredicate = splitCandidate -> {
if (splitCandidate.length() >= maxEndRowSize) {
log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(),
splitCandidate.length());
return false;
}
return true;
};

int desiredSplits = calculateDesiredSplits(estimatedSize, threshold);
return findSplits(allIndexKeys, desiredSplits, splitPredicate);

} catch (Exception e) {
log.error("Error computing splits with batching for tablet {}", tabletMetadata.getExtent(),
e);
return new TreeSet<>();
}
}

private static int longestCommonLength(ByteSequence bs1, ByteSequence bs2) {
int common = 0;
while (common < bs1.length() && common < bs2.length()
Expand Down Expand Up @@ -291,11 +349,22 @@ public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterable, int
public static boolean needsSplit(ServerContext context, TabletMetadata tabletMetadata) {
var tableConf = context.getTableConfiguration(tabletMetadata.getTableId());
var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
return needsSplit(splitThreshold, tabletMetadata);
int maxFilesBeforeSplit = tableConf.getCount(Property.TABLE_MAX_FILES_BEFORE_SPLIT);
return needsSplit(splitThreshold, maxFilesBeforeSplit, tabletMetadata);
}

public static boolean needsSplit(long splitThreshold, TabletMetadata tabletMetadata) {
return tabletMetadata.getFileSize() > splitThreshold;
public static boolean needsSplit(long splitThreshold, int maxFilesBeforeSplit,
TabletMetadata tabletMetadata) {
if (tabletMetadata.getFileSize() > splitThreshold) {
return true;
}
int fileCount = tabletMetadata.getFiles().size();
if (maxFilesBeforeSplit > 0 && fileCount > maxFilesBeforeSplit) {
log.info("Tablet {} needs split due to file count: {} > {}", tabletMetadata.getExtent(),
fileCount, maxFilesBeforeSplit);
return true;
}
return false;
}

public static UnSplittableMetadata toUnSplittable(ServerContext context,
Expand All @@ -304,9 +373,11 @@ public static UnSplittableMetadata toUnSplittable(ServerContext context,
var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE);
int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN);
int maxFilesBeforeSplit = tableConf.getCount(Property.TABLE_MAX_FILES_BEFORE_SPLIT);

var unSplittableMetadata = UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(),
splitThreshold, maxEndRowSize, maxFilesToOpen, tabletMetadata.getFiles());
var unSplittableMetadata =
UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(), splitThreshold,
maxEndRowSize, maxFilesToOpen, maxFilesBeforeSplit, tabletMetadata.getFiles());

log.trace(
"Created unsplittable metadata for tablet {}. splitThreshold: {}, maxEndRowSize:{}, maxFilesToOpen: {}, hashCode: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void testUnsplittableColumn() {

StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"));
var unsplittableMeta = UnSplittableMetadata
.toUnSplittable(KeyExtent.fromMetaRow(new Text("0;foo")), 100, 110, 120, Set.of(sf1));
.toUnSplittable(KeyExtent.fromMetaRow(new Text("0;foo")), 100, 110, 120, 1000, Set.of(sf1));

m = new Mutation(new Text("0;foo"));
SplitColumnFamily.UNSPLITTABLE_COLUMN.put(m, new Value(unsplittableMeta.toBase64()));
Expand All @@ -603,17 +603,20 @@ public void testUnsplittableColumn() {
// test invalid args
KeyExtent extent = KeyExtent.fromMetaRow(new Text("0;foo"));
assertThrows(IllegalArgumentException.class,
() -> UnSplittableMetadata.toUnSplittable(extent, -100, 110, 120, Set.of(sf1)));
() -> UnSplittableMetadata.toUnSplittable(extent, -100, 110, 120, 1000, Set.of(sf1)));
assertThrows(IllegalArgumentException.class,
() -> UnSplittableMetadata.toUnSplittable(extent, 100, -110, 120, Set.of(sf1)));
() -> UnSplittableMetadata.toUnSplittable(extent, 100, -110, 120, 1000, Set.of(sf1)));
assertThrows(IllegalArgumentException.class,
() -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, -120, Set.of(sf1)));
() -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, -120, 1000, Set.of(sf1)));
assertThrows(NullPointerException.class,
() -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, null));
() -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, null));
assertThrows(IllegalArgumentException.class,
() -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, -1000, Set.of(sf1)));

// Test metadata constraints validate invalid hashcode
m = new Mutation(new Text("0;foo"));
unsplittableMeta = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1));
unsplittableMeta =
UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, 1000, Set.of(sf1));
// partial hashcode is invalid
var invalidHashCode =
unsplittableMeta.toBase64().substring(0, unsplittableMeta.toBase64().length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testManyColumns() throws Exception {
var tabletFiles = Map.of(file1, dfv1, file2, dfv2);

var unsplittableMeta =
UnSplittableMetadata.toUnSplittable(ke3, 1000, 1001, 10, tabletFiles.keySet());
UnSplittableMetadata.toUnSplittable(ke3, 1000, 1001, 10, 1000, tabletFiles.keySet());

// Setup the metadata for the last tablet in the merge range, this is that tablet that merge
// will modify.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ public void testManyColumns() throws Exception {
EasyMock.expect(tabletMeta.getHostingRequested()).andReturn(true).atLeastOnce();
EasyMock.expect(tabletMeta.getSuspend()).andReturn(suspendingTServer).atLeastOnce();
EasyMock.expect(tabletMeta.getLast()).andReturn(lastLocation).atLeastOnce();
UnSplittableMetadata usm =
UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet());
UnSplittableMetadata usm = UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002,
10000, tabletFiles.keySet());
EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce();
EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ public void testUnsplittable() {

var tabletMeta1 = TabletMetadata.builder(e1).build(UNSPLITTABLE);
// require the UNSPLITTABLE column to be absent when it is absent
var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32, Set.of());
var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32, 1000, Set.of());

try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, UNSPLITTABLE)
Expand All @@ -1587,7 +1587,7 @@ public void testUnsplittable() {
}
assertEquals(usm1.toBase64(), context.getAmple().readTablet(e1).getUnSplittable().toBase64());

var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33, Set.of());
var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33, 1000, Set.of());
var tabletMeta3 = TabletMetadata.builder(e1).setUnSplittable(usm2).build(UNSPLITTABLE);
// require the UNSPLITTABLE column to be usm2 when it is actually usm1
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
Expand Down