Skip to content

Commit ebade2d

Browse files
committed
GH-494: [Flight] Improve handling of unreachable locations in JDBC
- Expose gRPC for the client builder - Cache failed locations and try them last - Allow configuring the connect timeout Fixes #494.
1 parent 3440633 commit ebade2d

File tree

15 files changed

+813
-167
lines changed

15 files changed

+813
-167
lines changed

flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java

Lines changed: 18 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,21 @@
2323
import io.grpc.ManagedChannel;
2424
import io.grpc.MethodDescriptor;
2525
import io.grpc.StatusRuntimeException;
26-
import io.grpc.netty.GrpcSslContexts;
2726
import io.grpc.netty.NettyChannelBuilder;
2827
import io.grpc.stub.ClientCallStreamObserver;
2928
import io.grpc.stub.ClientCalls;
3029
import io.grpc.stub.ClientResponseObserver;
3130
import io.grpc.stub.StreamObserver;
32-
import io.netty.channel.EventLoopGroup;
33-
import io.netty.channel.ServerChannel;
34-
import io.netty.handler.ssl.SslContextBuilder;
35-
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
3631
import java.io.IOException;
3732
import java.io.InputStream;
38-
import java.lang.reflect.InvocationTargetException;
3933
import java.net.URISyntaxException;
4034
import java.nio.ByteBuffer;
41-
import java.util.ArrayList;
4235
import java.util.Iterator;
4336
import java.util.List;
4437
import java.util.Optional;
4538
import java.util.concurrent.ExecutionException;
4639
import java.util.concurrent.TimeUnit;
4740
import java.util.function.BooleanSupplier;
48-
import javax.net.ssl.SSLException;
4941
import org.apache.arrow.flight.FlightProducer.StreamListener;
5042
import org.apache.arrow.flight.auth.BasicClientAuthHandler;
5143
import org.apache.arrow.flight.auth.ClientAuthHandler;
@@ -57,6 +49,7 @@
5749
import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
5850
import org.apache.arrow.flight.grpc.ClientInterceptorAdapter;
5951
import org.apache.arrow.flight.grpc.CredentialCallOption;
52+
import org.apache.arrow.flight.grpc.NettyClientBuilder;
6053
import org.apache.arrow.flight.grpc.StatusUtils;
6154
import org.apache.arrow.flight.impl.Flight;
6255
import org.apache.arrow.flight.impl.Flight.Empty;
@@ -72,11 +65,6 @@
7265
/** Client for Flight services. */
7366
public class FlightClient implements AutoCloseable {
7467
private static final int PENDING_REQUESTS = 5;
75-
/**
76-
* The maximum number of trace events to keep on the gRPC Channel. This value disables channel
77-
* tracing.
78-
*/
79-
private static final int MAX_CHANNEL_TRACE_EVENTS = 0;
8068

8169
private final BufferAllocator allocator;
8270
private final ManagedChannel channel;
@@ -771,176 +759,71 @@ public static Builder builder(BufferAllocator allocator, Location location) {
771759

772760
/** A builder for Flight clients. */
773761
public static final class Builder {
774-
private BufferAllocator allocator;
775-
private Location location;
776-
private boolean forceTls = false;
777-
private int maxInboundMessageSize = FlightServer.MAX_GRPC_MESSAGE_SIZE;
778-
private InputStream trustedCertificates = null;
779-
private InputStream clientCertificate = null;
780-
private InputStream clientKey = null;
781-
private String overrideHostname = null;
782-
private List<FlightClientMiddleware.Factory> middleware = new ArrayList<>();
783-
private boolean verifyServer = true;
784-
785-
private Builder() {}
762+
private final NettyClientBuilder builder;
763+
764+
private Builder() {
765+
this.builder = new NettyClientBuilder();
766+
}
786767

787768
private Builder(BufferAllocator allocator, Location location) {
788-
this.allocator = Preconditions.checkNotNull(allocator);
789-
this.location = Preconditions.checkNotNull(location);
769+
this.builder = new NettyClientBuilder(allocator, location);
790770
}
791771

792772
/** Force the client to connect over TLS. */
793773
public Builder useTls() {
794-
this.forceTls = true;
774+
builder.useTls();
795775
return this;
796776
}
797777

798778
/** Override the hostname checked for TLS. Use with caution in production. */
799779
public Builder overrideHostname(final String hostname) {
800-
this.overrideHostname = hostname;
780+
builder.overrideHostname(hostname);
801781
return this;
802782
}
803783

804784
/** Set the maximum inbound message size. */
805785
public Builder maxInboundMessageSize(int maxSize) {
806-
Preconditions.checkArgument(maxSize > 0);
807-
this.maxInboundMessageSize = maxSize;
786+
builder.maxInboundMessageSize(maxSize);
808787
return this;
809788
}
810789

811790
/** Set the trusted TLS certificates. */
812791
public Builder trustedCertificates(final InputStream stream) {
813-
this.trustedCertificates = Preconditions.checkNotNull(stream);
792+
builder.trustedCertificates(stream);
814793
return this;
815794
}
816795

817796
/** Set the trusted TLS certificates. */
818797
public Builder clientCertificate(
819798
final InputStream clientCertificate, final InputStream clientKey) {
820-
Preconditions.checkNotNull(clientKey);
821-
this.clientCertificate = Preconditions.checkNotNull(clientCertificate);
822-
this.clientKey = Preconditions.checkNotNull(clientKey);
799+
builder.clientCertificate(clientCertificate, clientKey);
823800
return this;
824801
}
825802

826803
public Builder allocator(BufferAllocator allocator) {
827-
this.allocator = Preconditions.checkNotNull(allocator);
804+
builder.allocator(allocator);
828805
return this;
829806
}
830807

831808
public Builder location(Location location) {
832-
this.location = Preconditions.checkNotNull(location);
809+
builder.location(location);
833810
return this;
834811
}
835812

836813
public Builder intercept(FlightClientMiddleware.Factory factory) {
837-
middleware.add(factory);
814+
builder.intercept(factory);
838815
return this;
839816
}
840817

841818
public Builder verifyServer(boolean verifyServer) {
842-
this.verifyServer = verifyServer;
819+
builder.verifyServer(verifyServer);
843820
return this;
844821
}
845822

846823
/** Create the client from this builder. */
847824
public FlightClient build() {
848-
final NettyChannelBuilder builder;
849-
850-
switch (location.getUri().getScheme()) {
851-
case LocationSchemes.GRPC:
852-
case LocationSchemes.GRPC_INSECURE:
853-
case LocationSchemes.GRPC_TLS:
854-
{
855-
builder = NettyChannelBuilder.forAddress(location.toSocketAddress());
856-
break;
857-
}
858-
case LocationSchemes.GRPC_DOMAIN_SOCKET:
859-
{
860-
// The implementation is platform-specific, so we have to find the classes at runtime
861-
builder = NettyChannelBuilder.forAddress(location.toSocketAddress());
862-
try {
863-
try {
864-
// Linux
865-
builder.channelType(
866-
Class.forName("io.netty.channel.epoll.EpollDomainSocketChannel")
867-
.asSubclass(ServerChannel.class));
868-
final EventLoopGroup elg =
869-
Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
870-
.asSubclass(EventLoopGroup.class)
871-
.getDeclaredConstructor()
872-
.newInstance();
873-
builder.eventLoopGroup(elg);
874-
} catch (ClassNotFoundException e) {
875-
// BSD
876-
builder.channelType(
877-
Class.forName("io.netty.channel.kqueue.KQueueDomainSocketChannel")
878-
.asSubclass(ServerChannel.class));
879-
final EventLoopGroup elg =
880-
Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
881-
.asSubclass(EventLoopGroup.class)
882-
.getDeclaredConstructor()
883-
.newInstance();
884-
builder.eventLoopGroup(elg);
885-
}
886-
} catch (ClassNotFoundException
887-
| InstantiationException
888-
| IllegalAccessException
889-
| NoSuchMethodException
890-
| InvocationTargetException e) {
891-
throw new UnsupportedOperationException(
892-
"Could not find suitable Netty native transport implementation for domain socket address.");
893-
}
894-
break;
895-
}
896-
default:
897-
throw new IllegalArgumentException(
898-
"Scheme is not supported: " + location.getUri().getScheme());
899-
}
900-
901-
if (this.forceTls || LocationSchemes.GRPC_TLS.equals(location.getUri().getScheme())) {
902-
builder.useTransportSecurity();
903-
904-
final boolean hasTrustedCerts = this.trustedCertificates != null;
905-
final boolean hasKeyCertPair = this.clientCertificate != null && this.clientKey != null;
906-
if (!this.verifyServer && (hasTrustedCerts || hasKeyCertPair)) {
907-
throw new IllegalArgumentException(
908-
"FlightClient has been configured to disable server verification, "
909-
+ "but certificate options have been specified.");
910-
}
911-
912-
final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
913-
914-
if (!this.verifyServer) {
915-
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
916-
} else if (this.trustedCertificates != null
917-
|| this.clientCertificate != null
918-
|| this.clientKey != null) {
919-
if (this.trustedCertificates != null) {
920-
sslContextBuilder.trustManager(this.trustedCertificates);
921-
}
922-
if (this.clientCertificate != null && this.clientKey != null) {
923-
sslContextBuilder.keyManager(this.clientCertificate, this.clientKey);
924-
}
925-
}
926-
try {
927-
builder.sslContext(sslContextBuilder.build());
928-
} catch (SSLException e) {
929-
throw new RuntimeException(e);
930-
}
931-
932-
if (this.overrideHostname != null) {
933-
builder.overrideAuthority(this.overrideHostname);
934-
}
935-
} else {
936-
builder.usePlaintext();
937-
}
938-
939-
builder
940-
.maxTraceEvents(MAX_CHANNEL_TRACE_EVENTS)
941-
.maxInboundMessageSize(maxInboundMessageSize)
942-
.maxInboundMetadataSize(maxInboundMessageSize);
943-
return new FlightClient(allocator, builder.build(), middleware);
825+
final NettyChannelBuilder channelBuilder = builder.build();
826+
return new FlightClient(builder.allocator(), channelBuilder.build(), builder.middleware());
944827
}
945828
}
946829

flight/flight-core/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.ManagedChannel;
2424
import io.grpc.MethodDescriptor;
2525
import java.util.Collections;
26+
import java.util.List;
2627
import java.util.concurrent.ExecutorService;
2728
import java.util.concurrent.TimeUnit;
2829
import org.apache.arrow.flight.auth.ServerAuthHandler;
@@ -151,6 +152,19 @@ public static FlightClient createFlightClient(
151152
return new FlightClient(incomingAllocator, channel, Collections.emptyList());
152153
}
153154

155+
/**
156+
* Creates a Flight client.
157+
*
158+
* @param incomingAllocator Memory allocator
159+
* @param channel provides a connection to a gRPC server.
160+
*/
161+
public static FlightClient createFlightClient(
162+
BufferAllocator incomingAllocator,
163+
ManagedChannel channel,
164+
List<FlightClientMiddleware.Factory> middleware) {
165+
return new FlightClient(incomingAllocator, channel, middleware);
166+
}
167+
154168
/**
155169
* Creates a Flight client.
156170
*

0 commit comments

Comments
 (0)