From 842dfb777ea86fb9b66c857d746ac343f04a0cb8 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 20 Oct 2024 19:55:39 -0700 Subject: [PATCH 01/14] add a draft of column renaming feature to ParquetRewriter --- .../hadoop/rewrite/ParquetRewriter.java | 104 ++++++++-- .../hadoop/rewrite/RewriteOptions.java | 40 +++- .../hadoop/rewrite/ParquetRewriterTest.java | 184 +++++++++++++----- 3 files changed, 264 insertions(+), 64 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 2ff9c0ea34..8fb4794428 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -109,7 +109,7 @@ * Please note the schema of all inputFiles must be the same, otherwise the rewrite will fail. *

*

Applying column transformations

- * Some supported column transformations: pruning, masking, encrypting, changing a codec. + * Some supported column transformations: pruning, masking, renaming, encrypting, changing a codec. * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full list with description. *

*

Joining with extra files with a different schema

@@ -145,15 +145,18 @@ public class ParquetRewriter implements Closeable { private final Queue inputFiles = new LinkedList<>(); private final Queue inputFilesToJoin = new LinkedList<>(); private final MessageType outSchema; + private final MessageType outSchemaWithRenamedColumns; // The index cache strategy private final IndexCache.CacheStrategy indexCacheStrategy; private final boolean overwriteInputWithJoinColumns; private final InternalFileEncryptor nullColumnEncryptor; + private final Map renamedColumns; public ParquetRewriter(RewriteOptions options) throws IOException { this.newCodecName = options.getNewCodecName(); this.indexCacheStrategy = options.getIndexCacheStrategy(); this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); + this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); OutputFile out = options.getParquetOutputFile(); inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); @@ -169,6 +172,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { out); this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); + // TODO check a requirement that all renamed column should be present in outSchema + this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema); this.extraMetaData = getExtraMetadata(options); if (options.getMaskColumns() != null) { @@ -186,7 +191,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter( out, - outSchema, + outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema, writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, @@ -222,6 +227,7 @@ public ParquetRewriter( MaskMode maskMode) { this.writer = writer; this.outSchema = outSchema; + this.outSchemaWithRenamedColumns = null; this.newCodecName = codecName; extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); extraMetaData.put( @@ -239,6 +245,7 @@ public ParquetRewriter( this.indexCacheStrategy = IndexCache.CacheStrategy.NONE; this.overwriteInputWithJoinColumns = false; this.nullColumnEncryptor = null; + this.renamedColumns = new HashMap<>(); } private MessageType getSchema() { @@ -266,6 +273,27 @@ private MessageType getSchema() { } } + private MessageType getSchemaWithRenamedColumns(MessageType schema) { + List fields = schema.getFields().stream() + .map(type -> { + if (renamedColumns == null || !renamedColumns.containsKey(type.getName())) { + return type; + } else if (type.isPrimitive()) { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.get(type.getName())); + } else { + return new GroupType( + type.getRepetition(), + renamedColumns.get(type.getName()), + type.asGroupType().getFields()); + } + }) + .collect(Collectors.toList()); + return new MessageType(schema.getName(), fields); + } + private Map getExtraMetadata(RewriteOptions options) { List allFiles; if (options.getIgnoreJoinFilesMetadata()) { @@ -421,6 +449,27 @@ public void processBlocks() throws IOException { if (readerToJoin != null) readerToJoin.close(); } + private ColumnPath renameFieldsInPath(ColumnPath path) { + if (renamedColumns == null) { + return path; + } else { + String[] pathArray = path.toArray(); + pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]); + return ColumnPath.get(pathArray); + } + } + + private PrimitiveType renameNameInType(PrimitiveType type) { + if (renamedColumns == null) { + return type; + } else { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.getOrDefault(type.getName(), type.getName())); + } + } + private void processBlock( TransParquetFileReader reader, int blockIdx, @@ -431,7 +480,27 @@ private void processBlock( if (chunk.isEncrypted()) { throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx); + + ColumnChunkMetaData chunkColumnsRenamed = chunk; + if (renamedColumns != null && !renamedColumns.isEmpty()) { + chunkColumnsRenamed = ColumnChunkMetaData.get( + renameFieldsInPath(chunk.getPath()), + renameNameInType(chunk.getPrimitiveType()), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + chunk.getFirstDataPageOffset(), + chunk.getDictionaryPageOffset(), + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize(), + chunk.getSizeStatistics()); + } + + ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); + ColumnDescriptor descriptorRenamed = + outSchemaWithRenamedColumns.getColumns().get(outColumnIdx); BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx); String originalCreatedBy = reader.getFileMetaData().getCreatedBy(); @@ -443,13 +512,21 @@ private void processBlock( // Mask column and compress it again. MaskMode maskMode = maskColumns.get(chunk.getPath()); if (maskMode.equals(MaskMode.NULLIFY)) { - Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition(); + Type.Repetition repetition = + descriptorOriginal.getPrimitiveType().getRepetition(); if (repetition.equals(Type.Repetition.REQUIRED)) { - throw new IOException( - "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified"); + throw new IOException("Required column [" + + descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified"); } nullifyColumn( - reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy); + reader, + blockIdx, + descriptorOriginal, + chunk, + writer, + newCodecName, + encryptColumn, + originalCreatedBy); } else { throw new UnsupportedOperationException("Only nullify is supported for now"); } @@ -462,7 +539,7 @@ private void processBlock( } // Translate compression and/or encryption - writer.startColumn(descriptor, chunk.getValueCount(), newCodecName); + writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName); processChunk( reader, blockMetaData.getRowCount(), @@ -480,7 +557,8 @@ private void processBlock( BloomFilter bloomFilter = indexCache.getBloomFilter(chunk); ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); - writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); + writer.appendColumnChunk( + descriptorRenamed, reader.getStream(), chunkColumnsRenamed, bloomFilter, columnIndex, offsetIndex); } } @@ -522,7 +600,7 @@ private void processChunk( } if (bloomFilter != null) { - writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter); + writer.addBloomFilter(renameFieldsInPath(chunk.getPath()).toDotString(), bloomFilter); } reader.setStreamPosition(chunk.getStartingPos()); @@ -580,7 +658,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + renameNameInType(chunk.getPrimitiveType()), headerV1.getStatistics(), columnIndex, pageOrdinal, @@ -648,7 +726,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + renameNameInType(chunk.getPrimitiveType()), headerV2.getStatistics(), columnIndex, pageOrdinal, @@ -887,7 +965,7 @@ private void nullifyColumn( CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName); // Create new schema that only has the current column - MessageType newSchema = newSchema(outSchema, descriptor); + MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor)); ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( compressor, newSchema, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index a69403f464..710109f6c2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -49,6 +49,7 @@ public class RewriteOptions { private final List pruneColumns; private final CompressionCodecName newCodecName; private final Map maskColumns; + private final Map renameColumns; private final List encryptColumns; private final FileEncryptionProperties fileEncryptionProperties; private final IndexCache.CacheStrategy indexCacheStrategy; @@ -63,6 +64,7 @@ private RewriteOptions( List pruneColumns, CompressionCodecName newCodecName, Map maskColumns, + Map renameColumns, List encryptColumns, FileEncryptionProperties fileEncryptionProperties, IndexCache.CacheStrategy indexCacheStrategy, @@ -75,6 +77,7 @@ private RewriteOptions( this.pruneColumns = pruneColumns; this.newCodecName = newCodecName; this.maskColumns = maskColumns; + this.renameColumns = renameColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; this.indexCacheStrategy = indexCacheStrategy; @@ -192,6 +195,10 @@ public Map getMaskColumns() { return maskColumns; } + public Map gerRenameColumns() { + return renameColumns; + } + public List getEncryptColumns() { return encryptColumns; } @@ -221,6 +228,7 @@ public static class Builder { private List pruneColumns; private CompressionCodecName newCodecName; private Map maskColumns; + private Map renameColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE; @@ -432,6 +440,19 @@ public Builder mask(Map maskColumns) { return this; } + /** + * Set the columns to be renamed. + *

+ * Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed. + * + * @param renameColumns map where keys are original names and values are new names + * @return self + */ + public Builder renameColumns(Map renameColumns) { + this.renameColumns = renameColumns; + return this; + } + /** * Set the columns to encrypt. *

@@ -561,13 +582,29 @@ public RewriteOptions build() { !maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column"); } } - if (encryptColumns != null) { for (String pruneColumn : pruneColumns) { Preconditions.checkArgument( !encryptColumns.contains(pruneColumn), "Cannot prune and encrypt same column"); } } + if (renameColumns != null) { + for (Map.Entry entry : renameColumns.entrySet()) { + Preconditions.checkArgument( + !encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column"); + } + } + } + + if (renameColumns != null && !renameColumns.isEmpty()) { + for (Map.Entry entry : renameColumns.entrySet()) { + Preconditions.checkArgument( + entry.getValue() != null && !entry.getValue().trim().isEmpty(), + "Renamed column target name can't be empty"); + Preconditions.checkArgument( + !entry.getKey().contains(".") && !entry.getValue().contains("."), + "Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed"); + } } if (encryptColumns != null && !encryptColumns.isEmpty()) { @@ -590,6 +627,7 @@ public RewriteOptions build() { pruneColumns, newCodecName, maskColumns, + renameColumns, encryptColumns, fileEncryptionProperties, indexCacheStrategy, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 34c90a4641..40c525339b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.rewrite; +import static java.util.Collections.emptyMap; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; @@ -181,10 +182,10 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -199,7 +200,7 @@ public void setUp() { @Test public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -210,8 +211,8 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { @Test public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -252,10 +253,10 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("Links.Forward"), false); + validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -264,7 +265,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except @Test public void testPruneNullifyTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -276,8 +277,8 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception { @Test public void testPruneNullifyTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -327,7 +328,8 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except fileDecryptionProperties); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false); + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, emptyMap()); // Verify column encryption ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -349,7 +351,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except @Test public void testPruneEncryptTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -361,8 +363,8 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception { @Test public void testPruneEncryptTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -488,10 +490,10 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception // Verify the data are not changed for non-encrypted and non-masked columns. // Also make sure the masked column is nullified. - validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false); + validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false); + validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false, emptyMap()); // Verify the column is encrypted ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -511,7 +513,7 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception @Test public void testNullifyEncryptSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -523,8 +525,8 @@ public void testNullifyEncryptSingleFile() throws Exception { @Test public void testNullifyEncryptTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -537,8 +539,8 @@ public void testNullifyEncryptTwoFiles() throws Exception { @Test public void testMergeTwoFilesOnly() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); // Only merge two files but do not change anything. List inputPaths = new ArrayList<>(); @@ -571,10 +573,59 @@ public void testMergeTwoFilesOnly() throws Exception { null); // Verify the merged data are not changed - validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false); + validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, emptyMap()); + + // Verify the page index + validatePageIndex(new HashSet<>(), false, emptyMap()); + + // Verify original.created.by is preserved + validateCreatedBy(); + validateRowGroupRowCount(); + } + + @Test + public void testMergeTwoFilesOnlyRenameColumn() throws Exception { + addGzipInputFile(); + addUncompressedInputFile(); + + // Only merge two files but do not change anything. + List inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); + RewriteOptions.Builder builder = createBuilder(inputPaths); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(ImmutableMap.of("Name", "NameRenamed")) + .build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema is not changed + ParquetMetadata pmd = + ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchemaWithRenamed(); + assertEquals(expectSchema, schema); + + // Verify codec has not been translated + verifyCodec( + outputFile, + new HashSet() { + { + add(CompressionCodecName.GZIP); + add(CompressionCodecName.UNCOMPRESSED); + } + }, + null); + + // Verify the merged data are not changed + validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, renameColumns); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, renameColumns); // Verify original.created.by is preserved validateCreatedBy(); @@ -648,7 +699,7 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput @Test public void testRewriteFileWithMultipleBlocks() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -823,12 +874,13 @@ public void testOneInputFileManyInputFilesToJoinSetup(boolean joinColumnsOverwri new HashSet<>(pruneColumns), maskColumns.keySet(), fileDecryptionProperties, - joinColumnsOverwrite); // Verify data + joinColumnsOverwrite, + emptyMap()); // Verify data validateSchemaWithGenderColumnPruned(true); // Verify schema validateCreatedBy(); // Verify original.created.by assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom filters verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), fileDecryptionProperties); // Verify codec - validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite); + validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite, emptyMap()); } private void testOneInputFileManyInputFilesToJoinSetup() throws IOException { @@ -884,11 +936,27 @@ private MessageType createSchemaToJoin() { new PrimitiveType(REPEATED, BINARY, "Forward"))); } + private MessageType createSchemaWithRenamed() { + return new MessageType( + "schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "NameRenamed"), + new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), + new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), + new GroupType( + OPTIONAL, + "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + } + private void validateColumnData( Set prunePaths, Set nullifiedPaths, FileDecryptionProperties fileDecryptionProperties, - Boolean joinColumnsOverwrite) + Boolean joinColumnsOverwrite, + Map renameColumns) throws IOException { ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) @@ -901,7 +969,7 @@ private void validateColumnData( List filesJoined = inputFilesToJoin.stream() .flatMap(x -> Arrays.stream(x.getFileContent())) .collect(Collectors.toList()); - BiFunction groups = (name, rowIdx) -> { + BiFunction groupsExpected = (name, rowIdx) -> { if (!filesMain.get(0).getType().containsField(name) || joinColumnsOverwrite && !filesJoined.isEmpty() @@ -915,50 +983,53 @@ private void validateColumnData( int totalRows = inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum(); for (int i = 0; i < totalRows; i++) { - Group group = reader.read(); - assertNotNull(group); + Group groupActual = reader.read(); + assertNotNull(groupActual); if (!prunePaths.contains("DocId")) { if (nullifiedPaths.contains("DocId")) { - assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0)); + assertThrows(RuntimeException.class, () -> groupActual.getLong("DocId", 0)); } else { assertEquals( - group.getLong("DocId", 0), groups.apply("DocId", i).getLong("DocId", 0)); + groupActual.getLong("DocId", 0), + groupsExpected.apply("DocId", i).getLong("DocId", 0)); } } if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) { + String colName = renameColumns.getOrDefault("Name", "Name"); assertArrayEquals( - group.getBinary("Name", 0).getBytes(), - groups.apply("Name", i).getBinary("Name", 0).getBytes()); + groupActual.getBinary(colName, 0).getBytes(), + groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes()); } if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) { assertArrayEquals( - group.getBinary("Gender", 0).getBytes(), - groups.apply("Gender", i).getBinary("Gender", 0).getBytes()); + groupActual.getBinary("Gender", 0).getBytes(), + groupsExpected.apply("Gender", i).getBinary("Gender", 0).getBytes()); } if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) { assertEquals( - group.getFloat("FloatFraction", 0), - groups.apply("FloatFraction", i).getFloat("FloatFraction", 0), + groupActual.getFloat("FloatFraction", 0), + groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction", 0), 0); } if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) { assertEquals( - group.getDouble("DoubleFraction", 0), - groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), + groupActual.getDouble("DoubleFraction", 0), + groupsExpected.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), 0); } - Group subGroup = group.getGroup("Links", 0); + Group subGroup = groupActual.getGroup("Links", 0); if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { assertArrayEquals( subGroup.getBinary("Backward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Backward", 0) .getBytes()); @@ -970,7 +1041,8 @@ private void validateColumnData( } else { assertArrayEquals( subGroup.getBinary("Forward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Forward", 0) .getBytes()); @@ -1014,13 +1086,22 @@ interface CheckedFunction { R apply(T t) throws IOException; } + private ColumnPath renameFieldsInPath(ColumnPath path, Map renameColumns) { + String[] pathArray = path.toArray(); + if (renameColumns != null) { + pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]); + } + return ColumnPath.get(pathArray); + } + /** * Verify the page index is correct. * * @param exclude the columns to exclude from comparison, for example because they were nullified. * @param joinColumnsOverwrite whether a join columns overwrote existing overlapping columns. */ - private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite) throws Exception { + private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite, Map renameColumns) + throws Exception { class BlockMeta { final TransParquetFileReader reader; final BlockMetaData blockMeta; @@ -1058,6 +1139,8 @@ class BlockMeta { List inBlocksJoined = blockMetaExtractor.apply( inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList())); List outBlocks = blockMetaExtractor.apply(ImmutableList.of(outputFile)); + Map renameColumnsInverted = + renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) { BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta; TransParquetFileReader outReader = outBlocks.get(blockIdx).reader; @@ -1066,17 +1149,18 @@ class BlockMeta { TransParquetFileReader inReader; BlockMetaData inBlockMeta; ColumnChunkMetaData inChunk; - if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath()) + ColumnPath colPath = renameFieldsInPath(outChunk.getPath(), renameColumnsInverted); + if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath) || joinColumnsOverwrite && !inBlocksJoined.isEmpty() - && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) { + && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) { inReader = inBlocksJoined.get(blockIdx).reader; inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta; - inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath); } else { inReader = inBlocksMain.get(blockIdx).reader; inBlockMeta = inBlocksMain.get(blockIdx).blockMeta; - inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath); } ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); @@ -1284,13 +1368,13 @@ conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER) assertEquals(expectSchema, actualSchema); } - private void ensureContainsGzipFile() { + private void addGzipInputFile() { if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn); } } - private void ensureContainsUncompressedFile() { + private void addUncompressedInputFile() { if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn); } From a482ca1d606d9d9ddc2ba1d7e08529720e39cb6b Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 20 Oct 2024 20:59:06 -0700 Subject: [PATCH 02/14] fix outSchemaWithRenamedColumns assignment that led to NPE in some tests --- .../java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 8fb4794428..e32672ea06 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -227,7 +227,7 @@ public ParquetRewriter( MaskMode maskMode) { this.writer = writer; this.outSchema = outSchema; - this.outSchemaWithRenamedColumns = null; + this.outSchemaWithRenamedColumns = outSchema; this.newCodecName = codecName; extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); extraMetaData.put( From 36d5a04d3b3bd554057ad2841e02e26b8c5563a9 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 15:05:24 -0700 Subject: [PATCH 03/14] add ensureRenamedColumnsCorrectness() to ParquetRewriter --- .../hadoop/rewrite/ParquetRewriter.java | 25 +++++++++++++++---- .../hadoop/rewrite/RewriteOptions.java | 7 +++++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index e32672ea06..5535699072 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -172,7 +172,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { out); this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); - // TODO check a requirement that all renamed column should be present in outSchema + ensureRenamedColumnsCorrectness(outSchema, renamedColumns); this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema); this.extraMetaData = getExtraMetadata(options); @@ -276,7 +276,7 @@ private MessageType getSchema() { private MessageType getSchemaWithRenamedColumns(MessageType schema) { List fields = schema.getFields().stream() .map(type -> { - if (renamedColumns == null || !renamedColumns.containsKey(type.getName())) { + if (!renamedColumns.containsKey(type.getName())) { return type; } else if (type.isPrimitive()) { return new PrimitiveType( @@ -366,6 +366,21 @@ private void ensureSameSchema(Queue inputFileReaders) { } } + private void ensureRenamedColumnsCorrectness(MessageType schema, Map renameMap) { + Set columns = schema.getFields().stream().map(Type::getName).collect(Collectors.toSet()); + renameMap.forEach((src, dst) -> { + if (!columns.contains(src)) { + String msg = String.format("Column to rename '%s' is not found in input files schema", src); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } else if (columns.contains(dst)) { + String msg = String.format("Renamed column target name '%s' is already present in a schema", dst); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + }); + } + @Override public void close() throws IOException { writer.end(extraMetaData); @@ -450,7 +465,7 @@ public void processBlocks() throws IOException { } private ColumnPath renameFieldsInPath(ColumnPath path) { - if (renamedColumns == null) { + if (renamedColumns.isEmpty()) { return path; } else { String[] pathArray = path.toArray(); @@ -460,7 +475,7 @@ private ColumnPath renameFieldsInPath(ColumnPath path) { } private PrimitiveType renameNameInType(PrimitiveType type) { - if (renamedColumns == null) { + if (renamedColumns.isEmpty()) { return type; } else { return new PrimitiveType( @@ -482,7 +497,7 @@ private void processBlock( } ColumnChunkMetaData chunkColumnsRenamed = chunk; - if (renamedColumns != null && !renamedColumns.isEmpty()) { + if (!renamedColumns.isEmpty()) { chunkColumnsRenamed = ColumnChunkMetaData.get( renameFieldsInPath(chunk.getPath()), renameNameInType(chunk.getPrimitiveType()), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 710109f6c2..922b345c59 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.curator.shaded.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; @@ -627,7 +628,11 @@ public RewriteOptions build() { pruneColumns, newCodecName, maskColumns, - renameColumns, + (renameColumns == null + ? ImmutableMap.of() + : renameColumns.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, x -> x.getValue().trim()))), encryptColumns, fileEncryptionProperties, indexCacheStrategy, From 82020221db48ed3b8bfb442c70565b841105d714 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 15:12:26 -0700 Subject: [PATCH 04/14] change rename to normalize in method names in ParquetRewriter --- .../hadoop/rewrite/ParquetRewriter.java | 25 +++++++++++-------- .../hadoop/rewrite/ParquetRewriterTest.java | 4 +-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 5535699072..c9586c37c6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -464,7 +464,7 @@ public void processBlocks() throws IOException { if (readerToJoin != null) readerToJoin.close(); } - private ColumnPath renameFieldsInPath(ColumnPath path) { + private ColumnPath normalizeFieldsInPath(ColumnPath path) { if (renamedColumns.isEmpty()) { return path; } else { @@ -474,7 +474,7 @@ private ColumnPath renameFieldsInPath(ColumnPath path) { } } - private PrimitiveType renameNameInType(PrimitiveType type) { + private PrimitiveType normalizeNameInType(PrimitiveType type) { if (renamedColumns.isEmpty()) { return type; } else { @@ -496,11 +496,11 @@ private void processBlock( throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnChunkMetaData chunkColumnsRenamed = chunk; + ColumnChunkMetaData chunkColumnsNormalized = chunk; if (!renamedColumns.isEmpty()) { - chunkColumnsRenamed = ColumnChunkMetaData.get( - renameFieldsInPath(chunk.getPath()), - renameNameInType(chunk.getPrimitiveType()), + chunkColumnsNormalized = ColumnChunkMetaData.get( + normalizeFieldsInPath(chunk.getPath()), + normalizeNameInType(chunk.getPrimitiveType()), chunk.getCodec(), chunk.getEncodingStats(), chunk.getEncodings(), @@ -573,7 +573,12 @@ private void processBlock( ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); writer.appendColumnChunk( - descriptorRenamed, reader.getStream(), chunkColumnsRenamed, bloomFilter, columnIndex, offsetIndex); + descriptorRenamed, + reader.getStream(), + chunkColumnsNormalized, + bloomFilter, + columnIndex, + offsetIndex); } } @@ -615,7 +620,7 @@ private void processChunk( } if (bloomFilter != null) { - writer.addBloomFilter(renameFieldsInPath(chunk.getPath()).toDotString(), bloomFilter); + writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(), bloomFilter); } reader.setStreamPosition(chunk.getStartingPos()); @@ -673,7 +678,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - renameNameInType(chunk.getPrimitiveType()), + normalizeNameInType(chunk.getPrimitiveType()), headerV1.getStatistics(), columnIndex, pageOrdinal, @@ -741,7 +746,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - renameNameInType(chunk.getPrimitiveType()), + normalizeNameInType(chunk.getPrimitiveType()), headerV2.getStatistics(), columnIndex, pageOrdinal, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 40c525339b..fc79a47b58 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -1086,7 +1086,7 @@ interface CheckedFunction { R apply(T t) throws IOException; } - private ColumnPath renameFieldsInPath(ColumnPath path, Map renameColumns) { + private ColumnPath normalizeFieldsInPath(ColumnPath path, Map renameColumns) { String[] pathArray = path.toArray(); if (renameColumns != null) { pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]); @@ -1149,7 +1149,7 @@ class BlockMeta { TransParquetFileReader inReader; BlockMetaData inBlockMeta; ColumnChunkMetaData inChunk; - ColumnPath colPath = renameFieldsInPath(outChunk.getPath(), renameColumnsInverted); + ColumnPath colPath = normalizeFieldsInPath(outChunk.getPath(), renameColumnsInverted); if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath) || joinColumnsOverwrite && !inBlocksJoined.isEmpty() From f5d3baf06cd8783660a166abb205cfd777237e9a Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 15:28:01 -0700 Subject: [PATCH 05/14] add copy() method into ColumnChunkMetaData --- .../hadoop/metadata/ColumnChunkMetaData.java | 23 +++++++++++++++++++ .../hadoop/rewrite/ParquetRewriter.java | 15 ++---------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 14a949b0e0..7174f17974 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -624,6 +624,29 @@ public Statistics getStatistics() { public SizeStatistics getSizeStatistics() { return sizeStatistics; } + + /** + * Copies this ColumnChunkMetaData with chunk's path and type changed to provided ones. + * + * @param path ColumnPath of resulting chunk + * @param type PrimitiveType of resulting chunk + * @return resulting chunk + */ + public ColumnChunkMetaData copy(ColumnPath path, PrimitiveType type) { + return ColumnChunkMetaData.get( + path, + type, + this.getCodec(), + this.getEncodingStats(), + this.getEncodings(), + this.getStatistics(), + this.getFirstDataPageOffset(), + this.getDictionaryPageOffset(), + this.getValueCount(), + this.getTotalSize(), + this.getTotalUncompressedSize(), + this.getSizeStatistics()); + } } class LongColumnChunkMetaData extends ColumnChunkMetaData { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index c9586c37c6..95203945ed 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -498,19 +498,8 @@ private void processBlock( ColumnChunkMetaData chunkColumnsNormalized = chunk; if (!renamedColumns.isEmpty()) { - chunkColumnsNormalized = ColumnChunkMetaData.get( - normalizeFieldsInPath(chunk.getPath()), - normalizeNameInType(chunk.getPrimitiveType()), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - chunk.getFirstDataPageOffset(), - chunk.getDictionaryPageOffset(), - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize(), - chunk.getSizeStatistics()); + chunkColumnsNormalized = + chunk.copy(normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType())); } ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); From 347d90d13847707c4e04580aa195cae17d3629fc Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 15:30:19 -0700 Subject: [PATCH 06/14] add copy() method into ColumnChunkMetaData --- .../hadoop/metadata/ColumnChunkMetaData.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 7174f17974..6c0967ad3e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -504,6 +504,29 @@ public boolean hasDictionaryPage() { public boolean isEncrypted() { return false; } + + /** + * Copies this ColumnChunkMetaData with path and type changed to provided ones. + * + * @param path a new ColumnPath of a chunk + * @param type a new PrimitiveType of a chunk + * @return resulting chunk + */ + public ColumnChunkMetaData copy(ColumnPath path, PrimitiveType type) { + return ColumnChunkMetaData.get( + path, + type, + this.getCodec(), + this.getEncodingStats(), + this.getEncodings(), + this.getStatistics(), + this.getFirstDataPageOffset(), + this.getDictionaryPageOffset(), + this.getValueCount(), + this.getTotalSize(), + this.getTotalUncompressedSize(), + this.getSizeStatistics()); + } } class IntColumnChunkMetaData extends ColumnChunkMetaData { @@ -624,29 +647,6 @@ public Statistics getStatistics() { public SizeStatistics getSizeStatistics() { return sizeStatistics; } - - /** - * Copies this ColumnChunkMetaData with chunk's path and type changed to provided ones. - * - * @param path ColumnPath of resulting chunk - * @param type PrimitiveType of resulting chunk - * @return resulting chunk - */ - public ColumnChunkMetaData copy(ColumnPath path, PrimitiveType type) { - return ColumnChunkMetaData.get( - path, - type, - this.getCodec(), - this.getEncodingStats(), - this.getEncodings(), - this.getStatistics(), - this.getFirstDataPageOffset(), - this.getDictionaryPageOffset(), - this.getValueCount(), - this.getTotalSize(), - this.getTotalUncompressedSize(), - this.getSizeStatistics()); - } } class LongColumnChunkMetaData extends ColumnChunkMetaData { From cff9f7f22389d8cd1fb8c28f134475b8ce345a74 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 16:08:01 -0700 Subject: [PATCH 07/14] change imports --- .../org/apache/parquet/hadoop/rewrite/RewriteOptions.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 922b345c59..f66062c986 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -18,12 +18,12 @@ */ package org.apache.parquet.hadoop.rewrite; -import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.ArrayList; import java.util.Map; +import java.util.HashMap; import java.util.stream.Collectors; -import org.apache.curator.shaded.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; @@ -629,7 +629,7 @@ public RewriteOptions build() { newCodecName, maskColumns, (renameColumns == null - ? ImmutableMap.of() + ? new HashMap<>() : renameColumns.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, x -> x.getValue().trim()))), From 2aec62420ffad41c7975e96fca87419ecf285cf9 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 27 Oct 2024 16:10:20 -0700 Subject: [PATCH 08/14] refactor RewriteOptions' builder --- .../parquet/hadoop/rewrite/RewriteOptions.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index f66062c986..66c68286d8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -18,11 +18,11 @@ */ package org.apache.parquet.hadoop.rewrite; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.ArrayList; import java.util.Map; -import java.util.HashMap; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -589,15 +589,15 @@ public RewriteOptions build() { !encryptColumns.contains(pruneColumn), "Cannot prune and encrypt same column"); } } - if (renameColumns != null) { + } + + if (renameColumns != null && !renameColumns.isEmpty()) { + if (encryptColumns != null && !encryptColumns.isEmpty()) { for (Map.Entry entry : renameColumns.entrySet()) { Preconditions.checkArgument( !encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column"); } } - } - - if (renameColumns != null && !renameColumns.isEmpty()) { for (Map.Entry entry : renameColumns.entrySet()) { Preconditions.checkArgument( entry.getValue() != null && !entry.getValue().trim().isEmpty(), From 169af05949721f2854ee16c03cbb9b6625165af0 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Thu, 31 Oct 2024 15:30:31 -0700 Subject: [PATCH 09/14] [GH-3035][ParquetRewriter] replace renamed schema variable with dynamic extraction --- .../hadoop/rewrite/ParquetRewriter.java | 20 ++++----- .../hadoop/rewrite/RewriteOptions.java | 41 ++++++++++--------- .../hadoop/rewrite/ParquetRewriterTest.java | 12 +++--- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 95203945ed..41242359c6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -145,7 +145,6 @@ public class ParquetRewriter implements Closeable { private final Queue inputFiles = new LinkedList<>(); private final Queue inputFilesToJoin = new LinkedList<>(); private final MessageType outSchema; - private final MessageType outSchemaWithRenamedColumns; // The index cache strategy private final IndexCache.CacheStrategy indexCacheStrategy; private final boolean overwriteInputWithJoinColumns; @@ -158,23 +157,21 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); - OutputFile out = options.getParquetOutputFile(); inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); + this.extraMetaData = getExtraMetadata(options); ensureSameSchema(inputFiles); ensureSameSchema(inputFilesToJoin); ensureRowCount(); + ensureRenamingCorrectness(outSchema, renamedColumns); + OutputFile out = options.getParquetOutputFile(); LOG.info( "Start rewriting {} input file(s) {} to {}", inputFiles.size() + inputFilesToJoin.size(), Stream.concat(options.getParquetInputFiles().stream(), options.getParquetInputFilesToJoin().stream()) .collect(Collectors.toList()), - out); - - this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); - ensureRenamedColumnsCorrectness(outSchema, renamedColumns); - this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema); - this.extraMetaData = getExtraMetadata(options); + options.getParquetOutputFile()); if (options.getMaskColumns() != null) { this.maskColumns = new HashMap<>(); @@ -191,7 +188,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter( out, - outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema, + renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema), writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, @@ -227,7 +224,6 @@ public ParquetRewriter( MaskMode maskMode) { this.writer = writer; this.outSchema = outSchema; - this.outSchemaWithRenamedColumns = outSchema; this.newCodecName = codecName; extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); extraMetaData.put( @@ -366,7 +362,7 @@ private void ensureSameSchema(Queue inputFileReaders) { } } - private void ensureRenamedColumnsCorrectness(MessageType schema, Map renameMap) { + private void ensureRenamingCorrectness(MessageType schema, Map renameMap) { Set columns = schema.getFields().stream().map(Type::getName).collect(Collectors.toSet()); renameMap.forEach((src, dst) -> { if (!columns.contains(src)) { @@ -504,7 +500,7 @@ private void processBlock( ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); ColumnDescriptor descriptorRenamed = - outSchemaWithRenamedColumns.getColumns().get(outColumnIdx); + getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx); BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx); String originalCreatedBy = reader.getFileMetaData().getCreatedBy(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 66c68286d8..2a39b0eaeb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -573,6 +573,28 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) { * @return a RewriterOptions */ public RewriteOptions build() { + checkPreconditions(); + return new RewriteOptions( + conf, + inputFiles, + (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), + outputFile, + pruneColumns, + newCodecName, + maskColumns, + (renameColumns == null + ? new HashMap<>() + : renameColumns.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, x -> x.getValue().trim()))), + encryptColumns, + fileEncryptionProperties, + indexCacheStrategy, + overwriteInputWithJoinColumns, + ignoreJoinFilesMetadata); + } + + private void checkPreconditions() { Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required"); Preconditions.checkArgument(outputFile != null, "Output file is required"); @@ -619,25 +641,6 @@ public RewriteOptions build() { encryptColumns != null && !encryptColumns.isEmpty(), "Encrypt columns is required when FileEncryptionProperties is set"); } - - return new RewriteOptions( - conf, - inputFiles, - (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), - outputFile, - pruneColumns, - newCodecName, - maskColumns, - (renameColumns == null - ? new HashMap<>() - : renameColumns.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, x -> x.getValue().trim()))), - encryptColumns, - fileEncryptionProperties, - indexCacheStrategy, - overwriteInputWithJoinColumns, - ignoreJoinFilesMetadata); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index fc79a47b58..dca7d42762 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -588,15 +588,14 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { addGzipInputFile(); addUncompressedInputFile(); - // Only merge two files but do not change anything. - List inputPaths = new ArrayList<>(); - for (EncryptionTestFile inputFile : inputFiles) { - inputPaths.add(new Path(inputFile.getFileName())); - } Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); + List pruneColumns = ImmutableList.of("Gender"); + List inputPaths = + inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()); RewriteOptions.Builder builder = createBuilder(inputPaths); RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) .renameColumns(ImmutableMap.of("Name", "NameRenamed")) + .prune(pruneColumns) .build(); rewriter = new ParquetRewriter(options); @@ -622,7 +621,7 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { null); // Verify the merged data are not changed - validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, renameColumns); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns); // Verify the page index validatePageIndex(new HashSet<>(), false, renameColumns); @@ -941,7 +940,6 @@ private MessageType createSchemaWithRenamed() { "schema", new PrimitiveType(OPTIONAL, INT64, "DocId"), new PrimitiveType(REQUIRED, BINARY, "NameRenamed"), - new PrimitiveType(OPTIONAL, BINARY, "Gender"), new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), new GroupType( From d78132dcb82160ea5c7366673e5a4e6154fbde53 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Tue, 5 Nov 2024 23:37:08 -0800 Subject: [PATCH 10/14] [GH-3035][ParquetRewriter] extend tests for renaming feature --- .../hadoop/rewrite/ParquetRewriter.java | 24 ++-- .../hadoop/rewrite/RewriteOptions.java | 37 +++--- .../hadoop/rewrite/ParquetRewriterTest.java | 105 ++++++++++++------ 3 files changed, 100 insertions(+), 66 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 41242359c6..e6c9d8e70a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -157,8 +157,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); - inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); - inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); + this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); this.extraMetaData = getExtraMetadata(options); ensureSameSchema(inputFiles); @@ -186,7 +186,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { } ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; - writer = new ParquetFileWriter( + this.writer = new ParquetFileWriter( out, renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema), writerMode, @@ -202,7 +202,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.nullColumnEncryptor = null; } else { this.nullColumnEncryptor = new InternalFileEncryptor(options.getFileEncryptionProperties()); - List columns = outSchema.getColumns(); + List columns = + getSchemaWithRenamedColumns(this.outSchema).getColumns(); for (int i = 0; i < columns.size(); i++) { writer.getEncryptor() .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i); @@ -225,8 +226,8 @@ public ParquetRewriter( this.writer = writer; this.outSchema = outSchema; this.newCodecName = codecName; - extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); - extraMetaData.put( + this.extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); + this.extraMetaData.put( ORIGINAL_CREATED_BY_KEY, originalCreatedBy != null ? originalCreatedBy @@ -492,9 +493,9 @@ private void processBlock( throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnChunkMetaData chunkColumnsNormalized = chunk; + ColumnChunkMetaData chunkNormalized = chunk; if (!renamedColumns.isEmpty()) { - chunkColumnsNormalized = + chunkNormalized = chunk.copy(normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType())); } @@ -558,12 +559,7 @@ private void processBlock( ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); writer.appendColumnChunk( - descriptorRenamed, - reader.getStream(), - chunkColumnsNormalized, - bloomFilter, - columnIndex, - offsetIndex); + descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 2a39b0eaeb..0dc76b8a2a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.apache.curator.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; @@ -582,11 +584,11 @@ public RewriteOptions build() { pruneColumns, newCodecName, maskColumns, - (renameColumns == null + renameColumns == null ? new HashMap<>() : renameColumns.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, x -> x.getValue().trim()))), + .collect(Collectors.toMap(x -> x.getKey().trim(), x -> x.getValue() + .trim())), encryptColumns, fileEncryptionProperties, indexCacheStrategy, @@ -613,21 +615,24 @@ private void checkPreconditions() { } } - if (renameColumns != null && !renameColumns.isEmpty()) { - if (encryptColumns != null && !encryptColumns.isEmpty()) { - for (Map.Entry entry : renameColumns.entrySet()) { - Preconditions.checkArgument( - !encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column"); - } - } - for (Map.Entry entry : renameColumns.entrySet()) { + if (renameColumns != null) { + Set nullifiedColumns = maskColumns == null + ? ImmutableSet.of() + : maskColumns.entrySet().stream() + .filter(x -> x.getValue() == MaskMode.NULLIFY) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + renameColumns.forEach((colSrc, colDst) -> { Preconditions.checkArgument( - entry.getValue() != null && !entry.getValue().trim().isEmpty(), - "Renamed column target name can't be empty"); + colSrc != null && !colSrc.trim().isEmpty(), "Renamed column source name can't be empty"); Preconditions.checkArgument( - !entry.getKey().contains(".") && !entry.getValue().contains("."), - "Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed"); - } + colDst != null && !colDst.trim().isEmpty(), "Renamed column target name can't be empty"); + Preconditions.checkArgument( + !nullifiedColumns.contains(colSrc), "Cannot nullify and rename the same column"); + Preconditions.checkArgument( + !colSrc.contains(".") && !colDst.contains("."), + "Renamed column can't be nested, in case of GroupType column only a top level column can be renamed"); + }); } if (encryptColumns != null && !encryptColumns.isEmpty()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index dca7d42762..c1da97c403 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -590,18 +590,26 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); List pruneColumns = ImmutableList.of("Gender"); + String[] encryptColumns = {"DocId"}; + FileEncryptionProperties fileEncryptionProperties = + EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); List inputPaths = inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()); RewriteOptions.Builder builder = createBuilder(inputPaths); RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) .renameColumns(ImmutableMap.of("Name", "NameRenamed")) .prune(pruneColumns) + .transform(CompressionCodecName.SNAPPY) + .encrypt(Arrays.asList(encryptColumns)) + .encryptionProperties(fileEncryptionProperties) .build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); rewriter.close(); + FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); + // Verify the schema is not changed ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); @@ -609,39 +617,59 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { MessageType expectSchema = createSchemaWithRenamed(); assertEquals(expectSchema, schema); - // Verify codec has not been translated - verifyCodec( - outputFile, - new HashSet() { - { - add(CompressionCodecName.GZIP); - add(CompressionCodecName.UNCOMPRESSED); - } - }, - null); - + verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), fileDecryptionProperties); // Verify codec // Verify the merged data are not changed - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns); - - // Verify the page index - validatePageIndex(new HashSet<>(), false, renameColumns); - - // Verify original.created.by is preserved - validateCreatedBy(); + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, renameColumns); + validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // Verify the page index + validateCreatedBy(); // Verify original.created.by is preserved validateRowGroupRowCount(); + + ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); + assertFalse(metaData.getBlocks().isEmpty()); + Set encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns)); + for (BlockMetaData blockMetaData : metaData.getBlocks()) { + List columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData column : columns) { + if (encryptedColumns.contains(column.getPath().toDotString())) { + assertTrue(column.isEncrypted()); + } else { + assertFalse(column.isEncrypted()); + } + } + } } @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(true); + testMergeTwoFilesWithDifferentSchemaSetup(true, null, null); } @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(false); + testMergeTwoFilesWithDifferentSchemaSetup(false, null, null); } - public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInputFile) throws Exception { + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", "DocId"), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", MaskMode.NULLIFY)); + } + + public void testMergeTwoFilesWithDifferentSchemaSetup( + Boolean wrongSchemaInInputFile, Map renameColumns, Map maskColumns) + throws Exception { MessageType schema1 = new MessageType( "schema", new PrimitiveType(OPTIONAL, INT64, "DocId"), @@ -670,27 +698,32 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .withWriterVersion(writerVersion) .build()); - if (wrongSchemaInInputFile) { - inputFiles.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); - } else { - inputFilesToJoin.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); + if (wrongSchemaInInputFile != null) { + if (wrongSchemaInInputFile) { + inputFiles.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } else { + inputFilesToJoin.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } } RewriteOptions.Builder builder = createBuilder( inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), inputFilesToJoin.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), false); - RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(renameColumns) + .mask(maskColumns) + .build(); // This should throw an exception because the schemas are different rewriter = new ParquetRewriter(options); From a4eccc640cf92b59c81091fe73b1b1de49fad163 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Tue, 5 Nov 2024 23:48:06 -0800 Subject: [PATCH 11/14] [GH-3035][ParquetRewriter] extend tests for renaming feature --- .../org/apache/parquet/hadoop/rewrite/RewriteOptions.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 0dc76b8a2a..a2a1c18e32 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -21,11 +21,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.curator.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; @@ -617,7 +617,7 @@ private void checkPreconditions() { if (renameColumns != null) { Set nullifiedColumns = maskColumns == null - ? ImmutableSet.of() + ? new HashSet<>() : maskColumns.entrySet().stream() .filter(x -> x.getValue() == MaskMode.NULLIFY) .map(Map.Entry::getKey) From a5b48e21ab1ac232d11efc477ecbe222267c912e Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Mon, 11 Nov 2024 14:08:20 -0800 Subject: [PATCH 12/14] [GH-3035][ParquetRewriter] removed ColumnChunkMetaData.get(path, type) --- .../hadoop/metadata/ColumnChunkMetaData.java | 23 ------------------- .../hadoop/rewrite/ParquetRewriter.java | 17 +++++++++++--- .../hadoop/rewrite/RewriteOptions.java | 2 +- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 6c0967ad3e..14a949b0e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -504,29 +504,6 @@ public boolean hasDictionaryPage() { public boolean isEncrypted() { return false; } - - /** - * Copies this ColumnChunkMetaData with path and type changed to provided ones. - * - * @param path a new ColumnPath of a chunk - * @param type a new PrimitiveType of a chunk - * @return resulting chunk - */ - public ColumnChunkMetaData copy(ColumnPath path, PrimitiveType type) { - return ColumnChunkMetaData.get( - path, - type, - this.getCodec(), - this.getEncodingStats(), - this.getEncodings(), - this.getStatistics(), - this.getFirstDataPageOffset(), - this.getDictionaryPageOffset(), - this.getValueCount(), - this.getTotalSize(), - this.getTotalUncompressedSize(), - this.getSizeStatistics()); - } } class IntColumnChunkMetaData extends ColumnChunkMetaData { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index e6c9d8e70a..865ec0fce6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -155,7 +155,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.newCodecName = options.getNewCodecName(); this.indexCacheStrategy = options.getIndexCacheStrategy(); this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); - this.renamedColumns = options.gerRenameColumns(); + this.renamedColumns = options.getRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); @@ -495,8 +495,19 @@ private void processBlock( ColumnChunkMetaData chunkNormalized = chunk; if (!renamedColumns.isEmpty()) { - chunkNormalized = - chunk.copy(normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType())); + chunkNormalized = ColumnChunkMetaData.get( + normalizeFieldsInPath(chunk.getPath()), + normalizeNameInType(chunk.getPrimitiveType()), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + chunk.getFirstDataPageOffset(), + chunk.getDictionaryPageOffset(), + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize(), + chunk.getSizeStatistics()); } ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index a2a1c18e32..f85b65ea3d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -198,7 +198,7 @@ public Map getMaskColumns() { return maskColumns; } - public Map gerRenameColumns() { + public Map getRenameColumns() { return renameColumns; } From 4d7ff10589aec8e3f578fe97c4c3585f19f03361 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Mon, 11 Nov 2024 14:17:20 -0800 Subject: [PATCH 13/14] [GH-3035][ParquetRewriter] nitpick refactoring --- .../java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 865ec0fce6..99570d4d6c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -171,7 +171,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { inputFiles.size() + inputFilesToJoin.size(), Stream.concat(options.getParquetInputFiles().stream(), options.getParquetInputFilesToJoin().stream()) .collect(Collectors.toList()), - options.getParquetOutputFile()); + out); if (options.getMaskColumns() != null) { this.maskColumns = new HashMap<>(); From 6d66baa145f1504a59951bfe1938247a08c7a0c6 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Mon, 11 Nov 2024 16:52:01 -0800 Subject: [PATCH 14/14] [GH-3035][ParquetRewriter] comment --- .../java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 99570d4d6c..9535b4335d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -495,6 +495,7 @@ private void processBlock( ColumnChunkMetaData chunkNormalized = chunk; if (!renamedColumns.isEmpty()) { + // Keep an eye if this get stale because of ColumnChunkMetaData change chunkNormalized = ColumnChunkMetaData.get( normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType()),