diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 2d1fd04e453..a874ae952e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -38,7 +38,14 @@ import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; +import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -90,13 +97,19 @@ public enum RangeType { * row and end row can be null. The start row is exclusive and the end row is inclusive (like * Accumulo tablets). A common use case for this would be when files were partitioned using a * table's splits. When using this range type, the start and end row must exist as splits in the - * table or an exception will be thrown at load time. + * table or an exception will be thrown at load time. This RangeType is the most efficient for + * accumulo to load, and it enables only loading files to tablets that overlap data in the file. */ TABLE, /** - * Range that correspond to known rows in a file. For this range type, the start row and end row - * must be non-null. The start row and end row are both considered inclusive. At load time, - * these data ranges will be mapped to table ranges. + * Range that corresponds to the minimum and maximum rows in a file. For this range type, the + * start row and end row must be non-null. The start row and end row are both considered + * inclusive. At load time, these data ranges will be mapped to table ranges. For this RangeType + * Accumulo has to do more work at load to map the file range to tablets. Also, this will map a + * file to all tablets in the range even if the file has no data for that tablet. For example if + * a range overlapped 10 tablets but the file only had data for 8 of those tablets, the file + * would still be loaded to all 10. This will not cause problems for scans or compactions other + * than the unnecessary work of opening a file and finding it has no data for the tablet. */ FILE } @@ -459,6 +472,7 @@ static SplitResolver from(SortedSet splits) { * Computes a load plan for a given rfile. This will open the rfile and find every * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. * + * @return a load plan of type {@link RangeType#TABLE} * @since 2.1.4 */ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { @@ -475,6 +489,7 @@ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOE * * @param properties used when opening the rfile, see * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * @return a load plan of type {@link RangeType#TABLE} * @since 2.1.4 */ public static LoadPlan compute(URI file, Map properties, @@ -510,4 +525,41 @@ public static LoadPlan compute(URI file, Map properties, return builder.build(); } } + + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file, Map properties) throws IOException { + var path = new Path(file); + var conf = new Configuration(); + var fs = FileSystem.get(path.toUri(), conf); + CryptoService cs = + CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); + var tableConf = SiteConfiguration.empty().withOverrides(properties).build(); + try (var reader = FileOperations.getInstance().newReaderBuilder() + .forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) { + var firstRow = reader.getFirstKey().getRow(); + var lastRow = reader.getLastKey().getRow(); + return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow) + .build(); + } + } + + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file) throws IOException { + return compute(file, Map.of()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 58e56abf0a6..ae3dccaecb2 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -948,6 +948,47 @@ public void testLoadPlanLocalityGroupsNoSplits() throws Exception { var expectedLoadPlan = LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); + + // put the first row in the default LG and last row in the first LG + testFile = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer2) { + writer2.startNewLocalityGroup("LG1", "F1"); + writer2.append(new Key("007", "F1"), "V1"); + writer2.append(new Key("009", "F1"), "V2"); + writer2.startNewLocalityGroup("LG2", "F3"); + writer2.append(new Key("003", "F3"), "V3"); + writer2.append(new Key("004", "F3"), "V4"); + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("002", "F4"), "V5"); + writer2.append(new Key("008", "F4"), "V6"); + } + + filename = new Path(testFile).getName(); + loadPlan = writer2.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "002", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); + + // create a file w/ a single LG + testFile = createTmpTestFile(); + var writer3 = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer3) { + writer3.startDefaultLocalityGroup(); + writer3.append(new Key("003", "F4"), "V5"); + writer3.append(new Key("008", "F4"), "V6"); + } + + filename = new Path(testFile).getName(); + loadPlan = writer3.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "003", "008").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); } @Test diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java index 08ef7f660fd..a27e99a2276 100644 --- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java +++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java @@ -48,8 +48,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; @@ -67,6 +70,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.spi.crypto.AESCryptoService; @@ -85,6 +89,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -357,6 +362,22 @@ public void testRFileClientEncryption() throws Exception { assertEquals(1, summary.getStatistics().size()); assertEquals(0, summary.getFileStatistics().getInaccurate()); assertEquals(1, summary.getFileStatistics().getTotal()); + + // test computing load plan for encrypted files + var absUri = new Path(file).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri(); + var loadPlan = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties()); + var expectedLoadPlan = + LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.FILE, "a", "a3").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + + var splits = + Stream.of("a", "b", "c").map(Text::new).collect(Collectors.toCollection(TreeSet::new)); + var resolver = LoadPlan.SplitResolver.from(splits); + var loadPlan2 = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties(), resolver); + var expectedLoadPlan2 = + LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, null, "a") + .loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, "a", "b").build(); + assertEquals(expectedLoadPlan2.toJson(), loadPlan2.toJson()); } @Test