From 88784f96da43df13bb1cc4949ac6243e5c3fa392 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 18 Nov 2025 19:48:23 +0000 Subject: [PATCH 1/4] adds function to compute load plans when splits are unknown fixes #5971 --- .../apache/accumulo/core/data/LoadPlan.java | 45 +++++++++++++++++++ .../core/client/rfile/RFileClientTest.java | 41 +++++++++++++++++ .../accumulo/core/crypto/CryptoTest.java | 21 +++++++++ 3 files changed, 107 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 2d1fd04e453..5e89b68aed6 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -38,7 +38,13 @@ import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; +import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -459,6 +465,7 @@ static SplitResolver from(SortedSet splits) { * Computes a load plan for a given rfile. This will open the rfile and find every * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. * + * @return a load plan of type {@link RangeType#TABLE} * @since 2.1.4 */ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { @@ -475,6 +482,7 @@ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOE * * @param properties used when opening the rfile, see * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * @return a load plan of type {@link RangeType#TABLE} * @since 2.1.4 */ public static LoadPlan compute(URI file, Map properties, @@ -510,4 +518,41 @@ public static LoadPlan compute(URI file, Map properties, return builder.build(); } } + + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file, Map properties) throws IOException { + var path = new Path(file); + var conf = new Configuration(); + var fs = FileSystem.get(path.toUri(), conf); + CryptoService cs = + CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); + CachableBlockFile.CachableBuilder cb = + new CachableBlockFile.CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); + try (var reader = new org.apache.accumulo.core.file.rfile.RFile.Reader(cb)) { + var firstRow = reader.getFirstKey().getRow(); + var lastRow = reader.getLastKey().getRow(); + return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow) + .build(); + } + } + + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file) throws IOException { + return compute(file, Map.of()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 58e56abf0a6..ae3dccaecb2 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -948,6 +948,47 @@ public void testLoadPlanLocalityGroupsNoSplits() throws Exception { var expectedLoadPlan = LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); + + // put the first row in the default LG and last row in the first LG + testFile = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer2) { + writer2.startNewLocalityGroup("LG1", "F1"); + writer2.append(new Key("007", "F1"), "V1"); + writer2.append(new Key("009", "F1"), "V2"); + writer2.startNewLocalityGroup("LG2", "F3"); + writer2.append(new Key("003", "F3"), "V3"); + writer2.append(new Key("004", "F3"), "V4"); + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("002", "F4"), "V5"); + writer2.append(new Key("008", "F4"), "V6"); + } + + filename = new Path(testFile).getName(); + loadPlan = writer2.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "002", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); + + // create a file w/ a single LG + testFile = createTmpTestFile(); + var writer3 = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer3) { + writer3.startDefaultLocalityGroup(); + writer3.append(new Key("003", "F4"), "V5"); + writer3.append(new Key("008", "F4"), "V6"); + } + + filename = new Path(testFile).getName(); + loadPlan = writer3.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "003", "008").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson()); } @Test diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java index 08ef7f660fd..a27e99a2276 100644 --- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java +++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java @@ -48,8 +48,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; @@ -67,6 +70,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.spi.crypto.AESCryptoService; @@ -85,6 +89,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -357,6 +362,22 @@ public void testRFileClientEncryption() throws Exception { assertEquals(1, summary.getStatistics().size()); assertEquals(0, summary.getFileStatistics().getInaccurate()); assertEquals(1, summary.getFileStatistics().getTotal()); + + // test computing load plan for encrypted files + var absUri = new Path(file).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri(); + var loadPlan = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties()); + var expectedLoadPlan = + LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.FILE, "a", "a3").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + + var splits = + Stream.of("a", "b", "c").map(Text::new).collect(Collectors.toCollection(TreeSet::new)); + var resolver = LoadPlan.SplitResolver.from(splits); + var loadPlan2 = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties(), resolver); + var expectedLoadPlan2 = + LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, null, "a") + .loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, "a", "b").build(); + assertEquals(expectedLoadPlan2.toJson(), loadPlan2.toJson()); } @Test From f15be8f6b0936ed4748b081e8d79d0a11246ee9f Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 18 Nov 2025 20:26:09 +0000 Subject: [PATCH 2/4] update docs --- .../org/apache/accumulo/core/data/LoadPlan.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 5e89b68aed6..fecd27397c6 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -96,13 +96,19 @@ public enum RangeType { * row and end row can be null. The start row is exclusive and the end row is inclusive (like * Accumulo tablets). A common use case for this would be when files were partitioned using a * table's splits. When using this range type, the start and end row must exist as splits in the - * table or an exception will be thrown at load time. + * table or an exception will be thrown at load time. This RangeType is the most efficient for + * accumulo to load, and it enables only loading files to tablets that overlap data in the file. */ TABLE, /** - * Range that correspond to known rows in a file. For this range type, the start row and end row - * must be non-null. The start row and end row are both considered inclusive. At load time, - * these data ranges will be mapped to table ranges. + * Range that corresponds to the minimum and maximum rows in a file. For this range type, the + * start row and end row must be non-null. The start row and end row are both considered + * inclusive. At load time, these data ranges will be mapped to table ranges. For this RangeType + * accumulo has to do more work at load to map the file range to tablets. Also, this will map a + * file to all tablets in the range even if the file has no data for that tablet. For example if + * a range overlapped 10 tablets but the file only had data for 8 of those tablets, the file + * would still be loaded to all 10. This will not cause problems for scans or compactions other + * than the unnecessary work of opening a file and finding it has no data for the tablet. */ FILE } From ea9f8cd08f8873a03b80fcdff2e0dc231de35ef4 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 4 Dec 2025 08:51:19 -0800 Subject: [PATCH 3/4] Update core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java Co-authored-by: Dave Marion --- core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index fecd27397c6..1ab35bc3b03 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -104,7 +104,7 @@ public enum RangeType { * Range that corresponds to the minimum and maximum rows in a file. For this range type, the * start row and end row must be non-null. The start row and end row are both considered * inclusive. At load time, these data ranges will be mapped to table ranges. For this RangeType - * accumulo has to do more work at load to map the file range to tablets. Also, this will map a + * Accumulo has to do more work at load to map the file range to tablets. Also, this will map a * file to all tablets in the range even if the file has no data for that tablet. For example if * a range overlapped 10 tablets but the file only had data for 8 of those tablets, the file * would still be loaded to all 10. This will not cause problems for scans or compactions other From a10cd12aabfd613936633f67b87b26988f8cc000 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 4 Dec 2025 17:44:35 +0000 Subject: [PATCH 4/4] code review update --- .../java/org/apache/accumulo/core/data/LoadPlan.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 1ab35bc3b03..a874ae952e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -38,9 +38,10 @@ import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.hadoop.conf.Configuration; @@ -541,9 +542,9 @@ public static LoadPlan compute(URI file, Map properties) throws I var fs = FileSystem.get(path.toUri(), conf); CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); - CachableBlockFile.CachableBuilder cb = - new CachableBlockFile.CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); - try (var reader = new org.apache.accumulo.core.file.rfile.RFile.Reader(cb)) { + var tableConf = SiteConfiguration.empty().withOverrides(properties).build(); + try (var reader = FileOperations.getInstance().newReaderBuilder() + .forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) { var firstRow = reader.getFirstKey().getRow(); var lastRow = reader.getLastKey().getRow(); return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow)