Skip to content

Commit f4a3e8b

Browse files
GH-3078: Use Hadoop FileSystem.openFile() to open files (#3079)
1 parent 0ddffb2 commit f4a3e8b

File tree

4 files changed

+443
-1
lines changed

4 files changed

+443
-1
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919

2020
package org.apache.parquet.hadoop.util;
2121

22+
import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;
23+
2224
import java.io.IOException;
25+
import java.io.InterruptedIOException;
26+
import java.util.concurrent.CompletableFuture;
2327
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FSDataInputStream;
2429
import org.apache.hadoop.fs.FileStatus;
2530
import org.apache.hadoop.fs.FileSystem;
2631
import org.apache.hadoop.fs.Path;
@@ -29,6 +34,24 @@
2934

3035
public class HadoopInputFile implements InputFile {
3136

37+
/**
38+
* openFile() option name for setting the read policy: {@value}.
39+
*/
40+
private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy";
41+
42+
/**
43+
* Read policy when opening parquet files: {@value}.
44+
* <p>Policy-aware stores pick the first policy they recognize in the list.
45+
* everything recognizes "random";
46+
* "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
47+
* parquet means "this is a Parquet file, so be clever about footers, prefetch,
48+
* and expect vector and/or random IO".
49+
* <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the
50+
* S3A connector, but as the ABFS and GCS connectors do footer caching, they
51+
* may use it as a hint to say "fetch the footer and keep it in memory"
52+
*/
53+
private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive";
54+
3255
private final FileSystem fs;
3356
private final FileStatus stat;
3457
private final Configuration conf;
@@ -70,9 +93,45 @@ public long getLength() {
7093
return stat.getLen();
7194
}
7295

96+
/**
97+
* Open the file.
98+
* <p>Uses {@code FileSystem.openFile()} so that
99+
* the existing FileStatus can be passed down: saves a HEAD request on cloud storage.
100+
* and ignored everywhere else.
101+
*
102+
* @return the input stream.
103+
*
104+
* @throws InterruptedIOException future was interrupted
105+
* @throws IOException if something went wrong
106+
* @throws RuntimeException any nested RTE thrown
107+
*/
73108
@Override
74109
public SeekableInputStream newStream() throws IOException {
75-
return HadoopStreams.wrap(fs.open(stat.getPath()));
110+
FSDataInputStream stream;
111+
try {
112+
// this method is async so that implementations may do async HEAD head
113+
// requests, such as S3A/ABFS when a file status is passed down.
114+
final CompletableFuture<FSDataInputStream> future = fs.openFile(stat.getPath())
115+
.withFileStatus(stat)
116+
.opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
117+
.build();
118+
stream = awaitFuture(future);
119+
} catch (RuntimeException e) {
120+
// S3A < 3.3.5 would raise illegal path exception if the openFile path didn't
121+
// equal the path in the FileStatus; Hive virtual FS could create this condition.
122+
// As the path to open is derived from stat.getPath(), this condition seems
123+
// near-impossible to create -but is handled here for due diligence.
124+
try {
125+
stream = fs.open(stat.getPath());
126+
} catch (IOException | RuntimeException ex) {
127+
// failure on this attempt attaches the failure of the openFile() call
128+
// so the stack trace is preserved.
129+
ex.addSuppressed(e);
130+
throw ex;
131+
}
132+
}
133+
134+
return HadoopStreams.wrap(stream);
76135
}
77136

78137
@Override

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,29 @@ public static <T> T awaitFuture(final Future<T> future, final long timeout, fina
7070
}
7171
}
7272

73+
/**
74+
* Given a future, evaluate it.
75+
* <p>
76+
* Any exception generated in the future is
77+
* extracted and rethrown.
78+
* </p>
79+
* @param future future to evaluate
80+
* @param <T> type of the result.
81+
* @return the result, if all went well.
82+
* @throws InterruptedIOException future was interrupted
83+
* @throws IOException if something went wrong
84+
* @throws RuntimeException any nested RTE thrown
85+
*/
86+
public static <T> T awaitFuture(final Future<T> future)
87+
throws InterruptedIOException, IOException, RuntimeException {
88+
try {
89+
return future.get();
90+
} catch (InterruptedException e) {
91+
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
92+
} catch (ExecutionException e) {
93+
throw unwrapInnerException(e);
94+
}
95+
}
7396
/**
7497
* From the inner cause of an execution exception, extract the inner cause
7598
* to an IOException, raising Errors immediately.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs;
20+
21+
import java.io.IOException;
22+
import java.io.UncheckedIOException;
23+
import java.net.URI;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.security.UserGroupInformation;
26+
27+
/**
28+
* Based on {@code org.apache.hadoop.fs.FileSystemTestHelper},
29+
* This class exports the package private {@code FileSystem}
30+
* methods which can be used to push FS instances into the
31+
* map of URI -> fs instance.
32+
* <p>
33+
* This makes it easy to add instances of Mocked filesystems
34+
* to the map, which will then be picked up by any
35+
* code retrieving an FS instance for that URI
36+
* <p>
37+
* The API is stable and used elsewhere. What is important
38+
* is to remove FS instances after each test case.
39+
* {@link #cleanFilesystemCache()} cleans the entire cache
40+
* and should be used in teardown methods.
41+
*/
42+
public final class FileSystemTestBinder {
43+
44+
/**
45+
* Empty configuration.
46+
* Part of the FileSystem method signatures, but not used behind them.
47+
*/
48+
public static final Configuration CONF = new Configuration(false);
49+
50+
/**
51+
* Inject a filesystem into the cache.
52+
* @param uri filesystem URI
53+
* @param fs filesystem to inject
54+
* @throws UncheckedIOException Hadoop UGI problems.
55+
*/
56+
public static void addFileSystemForTesting(URI uri, FileSystem fs) {
57+
try {
58+
FileSystem.addFileSystemForTesting(uri, CONF, fs);
59+
} catch (IOException e) {
60+
throw new UncheckedIOException(e);
61+
}
62+
}
63+
64+
/**
65+
* Clean up the filesystem cache.
66+
* This swallows any IOE nominally raised in the process, to ensure
67+
* this can safely invoked in teardowns.
68+
*/
69+
public static void cleanFilesystemCache() {
70+
try {
71+
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
72+
} catch (IOException ignored) {
73+
// Ignore the exception as if getCurrentUser() fails then it'll
74+
// have been impossible to add mock instances to a per-user cache.
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)