Skip to content

Commit f03fcbe

Browse files
author
liujia10
committed
[FLINK-27805][Connectors/ORC] bump orc version to 1.7.5
1 parent 2e5cac1 commit f03fcbe

File tree

16 files changed

+410
-541
lines changed

16 files changed

+410
-541
lines changed

flink-formats/flink-orc-nohive/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ under the License.
8282
</exclusions>
8383
</dependency>
8484

85+
<dependency>
86+
<groupId>org.apache.hadoop</groupId>
87+
<artifactId>hadoop-common</artifactId>
88+
<scope>provided</scope>
89+
</dependency>
90+
91+
<dependency>
92+
<groupId>org.apache.hadoop</groupId>
93+
<artifactId>hadoop-hdfs</artifactId>
94+
<scope>provided</scope>
95+
</dependency>
96+
8597
<!-- Tests -->
8698

8799
<dependency>

flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveBulkWriterFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
import org.apache.flink.api.common.serialization.BulkWriter;
2222
import org.apache.flink.core.fs.FSDataOutputStream;
23-
import org.apache.flink.orc.nohive.writer.NoHivePhysicalWriterImpl;
23+
import org.apache.flink.orc.writer.EncryptionProvider;
24+
import org.apache.flink.orc.writer.HadoopNoCloseStream;
2425
import org.apache.flink.table.data.RowData;
2526
import org.apache.flink.table.types.logical.DecimalType;
2627
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
@@ -31,6 +32,7 @@
3132
import org.apache.hadoop.fs.Path;
3233
import org.apache.orc.OrcFile;
3334
import org.apache.orc.TypeDescription;
35+
import org.apache.orc.impl.PhysicalFsWriter;
3436
import org.apache.orc.impl.WriterImpl;
3537
import org.apache.orc.storage.common.type.HiveDecimal;
3638
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
@@ -65,7 +67,11 @@ public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
6567
OrcFile.WriterOptions opts = OrcFile.writerOptions(new Properties(), conf);
6668
TypeDescription description = TypeDescription.fromString(schema);
6769
opts.setSchema(description);
68-
opts.physicalWriter(new NoHivePhysicalWriterImpl(out, opts));
70+
71+
HadoopNoCloseStream hadoopOutputStream = new HadoopNoCloseStream(out, null);
72+
EncryptionProvider provider = new EncryptionProvider(opts);
73+
opts.physicalWriter(
74+
new PhysicalFsWriter(hadoopOutputStream, opts, provider.getEncryptionVariants()));
6975
WriterImpl writer = new WriterImpl(null, new Path("."), opts);
7076

7177
VectorizedRowBatch rowBatch = description.createRowBatch();

flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ protected void prepareReadFileWithTypes(String file, int rowSize) throws IOExcep
4747
TypeDescription schema =
4848
TypeDescription.fromString(
4949
"struct<"
50-
+ "f0:float,"
51-
+ "f1:double,"
52-
+ "f2:timestamp,"
53-
+ "f3:tinyint,"
54-
+ "f4:smallint"
50+
+ "_col0:float,"
51+
+ "_col1:double,"
52+
+ "_col2:timestamp,"
53+
+ "_col3:tinyint,"
54+
+ "_col4:smallint"
5555
+ ">");
5656

5757
org.apache.hadoop.fs.Path filePath = new org.apache.hadoop.fs.Path(file);
@@ -105,7 +105,9 @@ protected OrcColumnarRowSplitReader createReader(
105105
throws IOException {
106106
return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
107107
new Configuration(),
108-
IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new),
108+
IntStream.range(0, fullTypes.length)
109+
.mapToObj(i -> "_col" + i)
110+
.toArray(String[]::new),
109111
fullTypes,
110112
partitionSpec,
111113
selectedFields,
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.orc.writer;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.orc.OrcFile;
23+
import org.apache.orc.TypeDescription;
24+
import org.apache.orc.impl.CryptoUtils;
25+
import org.apache.orc.impl.HadoopShims;
26+
import org.apache.orc.impl.KeyProvider;
27+
import org.apache.orc.impl.writer.WriterEncryptionKey;
28+
import org.apache.orc.impl.writer.WriterEncryptionVariant;
29+
30+
import java.io.IOException;
31+
import java.security.SecureRandom;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.SortedMap;
35+
import java.util.TreeMap;
36+
37+
/**
38+
* Copy encryption variants generation code from org.apache.orc:orc-core:1.7.2 {@link
39+
* org.apache.orc.impl.WriterImpl}. It's used to get encryption variants which are same as {@link
40+
* org.apache.orc.impl.WriterImpl} generated.
41+
*
42+
* <p>NOTE: This class will be removed after ORC-1200 is merged.
43+
*/
44+
public class EncryptionProvider {
45+
46+
private final SortedMap<String, WriterEncryptionKey> keys = new TreeMap<>();
47+
48+
private WriterEncryptionVariant[] encryptionVariants;
49+
50+
public EncryptionProvider(OrcFile.WriterOptions opts) throws IOException {
51+
TypeDescription schema = opts.getSchema();
52+
schema.annotateEncryption(opts.getEncryption(), opts.getMasks());
53+
this.encryptionVariants =
54+
setupEncryption(opts.getKeyProvider(), schema, opts.getKeyOverrides());
55+
}
56+
57+
/**
58+
* Iterate through the encryption options given by the user and set up our data structures.
59+
*
60+
* @param provider the KeyProvider to use to generate keys
61+
* @param schema the type tree that we search for annotations
62+
* @param keyOverrides user specified key overrides
63+
*/
64+
private WriterEncryptionVariant[] setupEncryption(
65+
KeyProvider provider,
66+
TypeDescription schema,
67+
Map<String, HadoopShims.KeyMetadata> keyOverrides)
68+
throws IOException {
69+
KeyProvider keyProvider =
70+
provider != null
71+
? provider
72+
: CryptoUtils.getKeyProvider(new Configuration(), new SecureRandom());
73+
// Load the overrides into the cache so that we use the required key versions.
74+
for (HadoopShims.KeyMetadata key : keyOverrides.values()) {
75+
keys.put(key.getKeyName(), new WriterEncryptionKey(key));
76+
}
77+
int variantCount = visitTypeTree(schema, false, keyProvider);
78+
79+
// Now that we have de-duped the keys and maskDescriptions, make the arrays
80+
int nextId = 0;
81+
int nextVariantId = 0;
82+
WriterEncryptionVariant[] result = new WriterEncryptionVariant[variantCount];
83+
for (WriterEncryptionKey key : keys.values()) {
84+
key.setId(nextId++);
85+
key.sortRoots();
86+
for (WriterEncryptionVariant variant : key.getEncryptionRoots()) {
87+
result[nextVariantId] = variant;
88+
variant.setId(nextVariantId++);
89+
}
90+
}
91+
return result;
92+
}
93+
94+
private int visitTypeTree(TypeDescription schema, boolean encrypted, KeyProvider provider)
95+
throws IOException {
96+
int result = 0;
97+
String keyName = schema.getAttributeValue(TypeDescription.ENCRYPT_ATTRIBUTE);
98+
if (keyName != null) {
99+
if (provider == null) {
100+
throw new IllegalArgumentException("Encryption requires a KeyProvider.");
101+
}
102+
if (encrypted) {
103+
throw new IllegalArgumentException("Nested encryption type: " + schema);
104+
}
105+
encrypted = true;
106+
result += 1;
107+
WriterEncryptionKey key = getKey(keyName, provider);
108+
HadoopShims.KeyMetadata metadata = key.getMetadata();
109+
WriterEncryptionVariant variant =
110+
new WriterEncryptionVariant(key, schema, provider.createLocalKey(metadata));
111+
key.addRoot(variant);
112+
}
113+
List<TypeDescription> children = schema.getChildren();
114+
if (children != null) {
115+
for (TypeDescription child : children) {
116+
result += visitTypeTree(child, encrypted, provider);
117+
}
118+
}
119+
return result;
120+
}
121+
122+
private WriterEncryptionKey getKey(String keyName, KeyProvider provider) throws IOException {
123+
WriterEncryptionKey result = keys.get(keyName);
124+
if (result == null) {
125+
result = new WriterEncryptionKey(provider.getCurrentKeyVersion(keyName));
126+
keys.put(keyName, result);
127+
}
128+
return result;
129+
}
130+
131+
public WriterEncryptionVariant[] getEncryptionVariants() {
132+
return encryptionVariants;
133+
}
134+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.orc.writer;
20+
21+
import org.apache.hadoop.fs.FSDataOutputStream;
22+
import org.apache.hadoop.fs.FileSystem;
23+
24+
import java.io.IOException;
25+
import java.io.OutputStream;
26+
27+
/**
28+
* This class is designed to not close the underlying flink stream to avoid exceptions when
29+
* checkpointing.
30+
*/
31+
public class HadoopNoCloseStream extends FSDataOutputStream {
32+
33+
public HadoopNoCloseStream(OutputStream out, FileSystem.Statistics stats) throws IOException {
34+
super(out, stats);
35+
}
36+
37+
@Override
38+
public void close() throws IOException {
39+
// Don't close the internal stream here to avoid
40+
// Stream Closed or ClosedChannelException when Flink performs checkpoint.
41+
// noop
42+
}
43+
}

flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
import org.apache.hadoop.conf.Configuration;
2828
import org.apache.hadoop.fs.Path;
29+
import org.apache.orc.OrcConf;
2930
import org.apache.orc.OrcFile;
31+
import org.apache.orc.impl.PhysicalFsWriter;
3032
import org.apache.orc.impl.WriterImpl;
3133

3234
import java.io.IOException;
@@ -73,6 +75,13 @@ public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuratio
7375
this(vectorizer, null, configuration);
7476
}
7577

78+
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, OrcFile.WriterOptions writerOptions) {
79+
this.vectorizer = vectorizer;
80+
this.writerOptions = writerOptions;
81+
this.writerProperties = null;
82+
this.confMap = new HashMap<>();
83+
}
84+
7685
/**
7786
* Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration, ORC
7887
* writer properties.
@@ -96,7 +105,10 @@ public OrcBulkWriterFactory(
96105
@Override
97106
public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
98107
OrcFile.WriterOptions opts = getWriterOptions();
99-
opts.physicalWriter(new PhysicalWriterImpl(out, opts));
108+
HadoopNoCloseStream hadoopOutputStream = new HadoopNoCloseStream(out, null);
109+
EncryptionProvider provider = new EncryptionProvider(opts);
110+
opts.physicalWriter(
111+
new PhysicalFsWriter(hadoopOutputStream, opts, provider.getEncryptionVariants()));
100112

101113
// The path of the Writer is not used to indicate the destination file
102114
// in this case since we have used a dedicated physical writer to write
@@ -115,9 +127,12 @@ protected OrcFile.WriterOptions getWriterOptions() {
115127
}
116128

117129
writerOptions = OrcFile.writerOptions(writerProperties, conf);
118-
writerOptions.setSchema(this.vectorizer.getSchema());
119-
}
120130

131+
// Column encryption configuration
132+
writerOptions.encrypt(OrcConf.ENCRYPTION.getString(writerProperties, conf));
133+
writerOptions.masks(OrcConf.DATA_MASK.getString(writerProperties, conf));
134+
}
135+
writerOptions.setSchema(this.vectorizer.getSchema());
121136
return writerOptions;
122137
}
123138
}

0 commit comments

Comments
 (0)