Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@
import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

Expand Down Expand Up @@ -90,13 +97,19 @@ public enum RangeType {
* row and end row can be null. The start row is exclusive and the end row is inclusive (like
* Accumulo tablets). A common use case for this would be when files were partitioned using a
* table's splits. When using this range type, the start and end row must exist as splits in the
* table or an exception will be thrown at load time.
* table or an exception will be thrown at load time. This RangeType is the most efficient for
* accumulo to load, and it enables only loading files to tablets that overlap data in the file.
*/
TABLE,
/**
* Range that correspond to known rows in a file. For this range type, the start row and end row
* must be non-null. The start row and end row are both considered inclusive. At load time,
* these data ranges will be mapped to table ranges.
* Range that corresponds to the minimum and maximum rows in a file. For this range type, the
* start row and end row must be non-null. The start row and end row are both considered
* inclusive. At load time, these data ranges will be mapped to table ranges. For this RangeType
* Accumulo has to do more work at load to map the file range to tablets. Also, this will map a
* file to all tablets in the range even if the file has no data for that tablet. For example if
* a range overlapped 10 tablets but the file only had data for 8 of those tablets, the file
* would still be loaded to all 10. This will not cause problems for scans or compactions other
* than the unnecessary work of opening a file and finding it has no data for the tablet.
*/
FILE
}
Expand Down Expand Up @@ -459,6 +472,7 @@ static SplitResolver from(SortedSet<Text> splits) {
* Computes a load plan for a given rfile. This will open the rfile and find every
* {@link TableSplits} that overlaps rows in the file and add those to the returned load plan.
*
* @return a load plan of type {@link RangeType#TABLE}
* @since 2.1.4
*/
public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException {
Expand All @@ -475,6 +489,7 @@ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOE
*
* @param properties used when opening the rfile, see
* {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
* @return a load plan of type {@link RangeType#TABLE}
* @since 2.1.4
*/
public static LoadPlan compute(URI file, Map<String,String> properties,
Expand Down Expand Up @@ -510,4 +525,41 @@ public static LoadPlan compute(URI file, Map<String,String> properties,
return builder.build();
}
}

/**
* Computes a load plan for a rfile based on the minimum and maximum row present across all
* locality groups.
*
* @param properties used when opening the rfile, see
* {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
*
* @return a load plan of type {@link RangeType#FILE}
* @since 2.1.5
*/
public static LoadPlan compute(URI file, Map<String,String> properties) throws IOException {
var path = new Path(file);
var conf = new Configuration();
var fs = FileSystem.get(path.toUri(), conf);
CryptoService cs =
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties);
var tableConf = SiteConfiguration.empty().withOverrides(properties).build();
try (var reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) {
var firstRow = reader.getFirstKey().getRow();
var lastRow = reader.getLastKey().getRow();
return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow)
.build();
}
}

/**
* Computes a load plan for a rfile based on the minimum and maximum row present across all
* locality groups.
*
* @return a load plan of type {@link RangeType#FILE}
* @since 2.1.5
*/
public static LoadPlan compute(URI file) throws IOException {
return compute(file, Map.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,47 @@ public void testLoadPlanLocalityGroupsNoSplits() throws Exception {
var expectedLoadPlan =
LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build();
assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson());

// put the first row in the default LG and last row in the first LG
testFile = createTmpTestFile();
var writer2 = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
try (writer2) {
writer2.startNewLocalityGroup("LG1", "F1");
writer2.append(new Key("007", "F1"), "V1");
writer2.append(new Key("009", "F1"), "V2");
writer2.startNewLocalityGroup("LG2", "F3");
writer2.append(new Key("003", "F3"), "V3");
writer2.append(new Key("004", "F3"), "V4");
writer2.startDefaultLocalityGroup();
writer2.append(new Key("002", "F4"), "V5");
writer2.append(new Key("008", "F4"), "V6");
}

filename = new Path(testFile).getName();
loadPlan = writer2.getLoadPlan(filename);
assertEquals(1, loadPlan.getDestinations().size());
expectedLoadPlan =
LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "002", "009").build();
assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson());

// create a file w/ a single LG
testFile = createTmpTestFile();
var writer3 = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
try (writer3) {
writer3.startDefaultLocalityGroup();
writer3.append(new Key("003", "F4"), "V5");
writer3.append(new Key("008", "F4"), "V6");
}

filename = new Path(testFile).getName();
loadPlan = writer3.getLoadPlan(filename);
assertEquals(1, loadPlan.getDestinations().size());
expectedLoadPlan =
LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "003", "008").build();
assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new URI(testFile)).toJson());
}

@Test
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
Expand All @@ -67,6 +70,7 @@
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.spi.crypto.AESCryptoService;
Expand All @@ -85,6 +89,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -357,6 +362,22 @@ public void testRFileClientEncryption() throws Exception {
assertEquals(1, summary.getStatistics().size());
assertEquals(0, summary.getFileStatistics().getInaccurate());
assertEquals(1, summary.getFileStatistics().getTotal());

// test computing load plan for encrypted files
var absUri = new Path(file).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri();
var loadPlan = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties());
var expectedLoadPlan =
LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.FILE, "a", "a3").build();
assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());

var splits =
Stream.of("a", "b", "c").map(Text::new).collect(Collectors.toCollection(TreeSet::new));
var resolver = LoadPlan.SplitResolver.from(splits);
var loadPlan2 = LoadPlan.compute(absUri, cryptoOnConf.getAllCryptoProperties(), resolver);
var expectedLoadPlan2 =
LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, null, "a")
.loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, "a", "b").build();
assertEquals(expectedLoadPlan2.toJson(), loadPlan2.toJson());
}

@Test
Expand Down