diff --git a/examples/src/main/java/com/influxdb/v3/ProxyExample.java b/examples/src/main/java/com/influxdb/v3/ProxyExample.java index 76a3204d..37cd9ace 100644 --- a/examples/src/main/java/com/influxdb/v3/ProxyExample.java +++ b/examples/src/main/java/com/influxdb/v3/ProxyExample.java @@ -21,13 +21,23 @@ */ package com.influxdb.v3; +import java.net.InetSocketAddress; +import java.net.URI; import java.util.UUID; +import java.util.function.Supplier; import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.ProxyDetector; +import io.netty.handler.proxy.HttpProxyHandler; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.config.NettyHttpClientConfig; public final class ProxyExample { @@ -35,38 +45,72 @@ private ProxyExample() { } public static void main(final String[] args) throws Exception { - // Run docker-compose.yml file to start Envoy proxy + // Run the docker-compose.yml file to start Envoy proxy, + // or start envoy proxy directly with the command `envoy-c envoy.yaml` String proxyUrl = "http://localhost:10000"; - String sslRootsFilePath = "src/test/java/com/influxdb/v3/client/testdata/influxdb-certificate.pem"; + String targetUrl = "http://localhost:8086"; + String username = "username"; + String password = "password"; + + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + + // Set proxy for write api + Supplier writeApiProxy = () -> + new HttpProxyHandler(new InetSocketAddress("localhost", 10000), username, password); + nettyHttpClientConfig.configureChannelProxy(writeApiProxy); + + // Set proxy for query api + ProxyDetector proxyDetector = createProxyDetector(targetUrl, proxyUrl, username, password); + nettyHttpClientConfig.configureManagedChannelProxy(proxyDetector); + ClientConfig clientConfig = new ClientConfig.Builder() .host(System.getenv("INFLUXDB_URL")) .token(System.getenv("INFLUXDB_TOKEN").toCharArray()) .database(System.getenv("INFLUXDB_DATABASE")) - .proxyUrl(proxyUrl) - .sslRootsFilePath(sslRootsFilePath) + .nettyHttpClientConfig(nettyHttpClientConfig) .build(); - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - String testId = UUID.randomUUID().toString(); - Point point = Point.measurement("My_Home") - .setTag("room", "Kitchen") - .setField("temp", 12.7) - .setField("hum", 37) - .setField("testId", testId); - influxDBClient.writePoint(point); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + String testId = UUID.randomUUID().toString(); + Point point = Point.measurement("My_Home") + .setTag("room", "Kitchen") + .setField("temp", 12.7) + .setField("hum", 37) + .setField("testId", testId); + influxDBClient.writePoint(point); - String query = String.format("SELECT * FROM \"My_Home\" WHERE \"testId\" = '%s'", testId); - try (Stream stream = influxDBClient.queryPoints(query)) { - stream.findFirst().ifPresent(values -> { - assert values.getTimestamp() != null; - System.out.printf("room[%s]: %s, temp: %3.2f, hum: %d", - new java.util.Date(values.getTimestamp().longValue() / 1000000), - values.getTag("room"), - (Double) values.getField("temp"), - (Long) values.getField("hum")); - }); + String query = String.format("SELECT * FROM \"My_Home\" WHERE \"testId\" = '%s'", testId); + try (Stream stream = influxDBClient.queryPoints(query)) { + stream.findFirst().ifPresent(values -> { + assert values.getTimestamp() != null; + System.out.printf("room[%s]: %s, temp: %3.2f, hum: %d", + new java.util.Date(values.getTimestamp().longValue() / 1000000), + values.getTag("room"), + (Double) values.getField("temp"), + (Long) values.getField("hum")); + }); + } } } + + public static ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull final String proxyUrl, + @Nullable final String username, @Nullable final String password) { + URI targetUri = URI.create(targetUrl); + URI proxyUri = URI.create(proxyUrl); + return (targetServerAddress) -> { + InetSocketAddress targetAddress = (InetSocketAddress) targetServerAddress; + if (targetUri.getHost().equals(targetAddress.getHostString()) + && targetUri.getPort() == targetAddress.getPort()) { + return HttpConnectProxiedSocketAddress.newBuilder() + .setProxyAddress(new InetSocketAddress(proxyUri.getHost(), proxyUri.getPort())) + .setTargetAddress(targetAddress) + .setUsername(username) + .setPassword(password) + .build(); + } + return null; + }; + } } diff --git a/examples/src/main/java/com/influxdb/v3/durable/DurableExample.java b/examples/src/main/java/com/influxdb/v3/durable/DurableExample.java index 55766170..e1e4bb52 100644 --- a/examples/src/main/java/com/influxdb/v3/durable/DurableExample.java +++ b/examples/src/main/java/com/influxdb/v3/durable/DurableExample.java @@ -21,6 +21,7 @@ */ package com.influxdb.v3.durable; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -30,6 +31,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.logging.Logger; import java.util.stream.Stream; +import javax.net.ssl.SSLException; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; @@ -124,8 +126,13 @@ public static void main(final String[] args) { } // borrow then return a client - InfluxDBClient client = clientPool.borrowClient(); - try { + InfluxDBClient client = null; + try { + client = clientPool.borrowClient(); + } catch (URISyntaxException | SSLException e) { + throw new RuntimeException(e); + } + try { logger.info(" [writeTaskPointsOK " + count + "] Writing " + points.size() + " points with client " + client.hashCode()); client.writePoints(points); @@ -159,8 +166,13 @@ public static void main(final String[] args) { } } // borrow a client from the pool - InfluxDBClient client = clientPool.borrowClient(); - try { + InfluxDBClient client = null; + try { + client = clientPool.borrowClient(); + } catch (URISyntaxException | SSLException e) { + throw new RuntimeException(e); + } + try { logger.info("[writeErrorRecover " + count + "] Writing " + lps.size() + " lps with client " + client.hashCode()); client.writeRecords(lps); @@ -183,9 +195,14 @@ public static void main(final String[] args) { int count = 0; while (!shutdownAll.get()) { // borrow a client from the pool - InfluxDBClient client = clientPool.borrowClient(); + InfluxDBClient client = null; + try { + client = clientPool.borrowClient(); + } catch (URISyntaxException | SSLException e) { + throw new RuntimeException(e); + } - // initiate the query and process the results + // initiate the query and process the results try (Stream pvs = client.queryPoints(query)) { logger.info("[queryOK " + count + "] with client " + client.hashCode() + ": query returned " + pvs.toArray().length + " records"); @@ -208,8 +225,13 @@ public static void main(final String[] args) { int count = 0; while (!shutdownAll.get()) { // borrow a client from the pool - InfluxDBClient client = clientPool.borrowClient(); - // every third query attempt results in an error + InfluxDBClient client = null; + try { + client = clientPool.borrowClient(); + } catch (URISyntaxException | SSLException e) { + throw new RuntimeException(e); + } + // every third query attempt results in an error String effectiveQuery = count > 0 && count % 3 == 0 ? badQuery : query; // attempt to execute the query and process the results diff --git a/examples/src/main/java/com/influxdb/v3/durable/InfluxClientPool.java b/examples/src/main/java/com/influxdb/v3/durable/InfluxClientPool.java index 159f3772..f5246e66 100644 --- a/examples/src/main/java/com/influxdb/v3/durable/InfluxClientPool.java +++ b/examples/src/main/java/com/influxdb/v3/durable/InfluxClientPool.java @@ -21,10 +21,12 @@ */ package com.influxdb.v3.durable; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Stack; import java.util.logging.Logger; +import javax.net.ssl.SSLException; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.config.ClientConfig; @@ -75,7 +77,7 @@ public InfluxClientPool(final ClientConfig clientConfig, final int maxSize) { * * @return - An InfluxDBClient ready for use. */ - public synchronized InfluxDBClient borrowClient() { + public synchronized InfluxDBClient borrowClient() throws URISyntaxException, SSLException { InfluxDBClient client; if (idlers.isEmpty()) { client = InfluxDBClient.getInstance(clientConfig); diff --git a/pom.xml b/pom.xml index b3b3066a..1039db58 100644 --- a/pom.xml +++ b/pom.xml @@ -165,6 +165,10 @@ com.google.code.gson gson + + io.netty + netty-handler-proxy + @@ -174,6 +178,12 @@ ${netty-handler.version} + + io.netty + netty-handler-proxy + ${netty-handler.version} + + io.netty netty-buffer @@ -417,7 +427,7 @@ **/target/**, **/*.jar, **/.git/**, **/.*, **/*.png, **/*.iml, **/*.bolt, .idea/**, **/*nightly*/**, **/.m2/**, LICENSE, **/*.md, **/.github/**, license_header.txt, release.properties/, **/pom.xml.releaseBackup, **/pom.xml.tag, **/semantic.yml, - .circleci/config.yml, **/*.pem + .circleci/config.yml, **/*.pem, **/*.p12 diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBApiNettyException.java b/src/main/java/com/influxdb/v3/client/InfluxDBApiNettyException.java new file mode 100644 index 00000000..2f304817 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/InfluxDBApiNettyException.java @@ -0,0 +1,104 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.util.List; +import javax.annotation.Nullable; + +import io.netty.handler.codec.http.HttpHeaders; + +/** + * The InfluxDBApiNettyException gets thrown whenever an error status is returned + * in the HTTP response. It facilitates recovering from such errors whenever possible. + */ +public class InfluxDBApiNettyException extends InfluxDBApiException { + + /** + * The HTTP headers associated with the error. + */ + HttpHeaders headers; + /** + * The HTTP status code associated with the error. + */ + int statusCode; + + /** + * Construct a new InfluxDBApiNettyException with statusCode and headers. + * + * @param message the detail message. + * @param headers headers returned in the response. + * @param statusCode statusCode of the response. + */ + public InfluxDBApiNettyException( + @Nullable final String message, + @Nullable final HttpHeaders headers, + final int statusCode) { + super(message); + this.headers = headers; + this.statusCode = statusCode; + } + + /** + * Construct a new InfluxDBApiNettyException with statusCode and headers. + * + * @param cause root cause of the exception. + * @param headers headers returned in the response. + * @param statusCode status code of the response. + */ + public InfluxDBApiNettyException( + @Nullable final Throwable cause, + @Nullable final HttpHeaders headers, + final int statusCode) { + super(cause); + this.headers = headers; + this.statusCode = statusCode; + } + + /** + * Gets the HTTP headers property associated with the error. + * + * @return - the headers object. + */ + public HttpHeaders headers() { + return headers; + } + + /** + * Helper method to simplify retrieval of specific headers. + * + * @param name - name of the header. + * @return - value matching the header key, or null if the key does not exist. + */ + public List getHeader(final String name) { + return headers.getAll(name); + } + + /** + * Gets the HTTP statusCode associated with the error. + * + * @return - the HTTP statusCode. + */ + public int statusCode() { + return statusCode; + } + +} diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index 4ea1ea20..4f1d03ad 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -22,12 +22,16 @@ package com.influxdb.v3.client; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.arrow.vector.VectorSchemaRoot; import com.influxdb.v3.client.config.ClientConfig; @@ -76,9 +80,9 @@ public interface InfluxDBClient extends AutoCloseable { * Write a {@link Point} to the InfluxDB server. * * @param point the {@link Point} to write, can be null - *

- * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch - * by NanosecondConverter helper class + *

+ * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch + * by NanosecondConverter helper class */ void writePoint(@Nullable final Point point); @@ -87,9 +91,9 @@ public interface InfluxDBClient extends AutoCloseable { * * @param point the {@link Point} to write, can be null * @param options the options for writing data to InfluxDB - *

- * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch - * by NanosecondConverter helper class + *

+ * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch + * by NanosecondConverter helper class */ void writePoint(@Nullable final Point point, @Nonnull final WriteOptions options); @@ -97,9 +101,9 @@ public interface InfluxDBClient extends AutoCloseable { * Write a list of {@link Point} to the InfluxDB server. * * @param points the list of {@link Point} to write, cannot be null - *

- * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch - * by NanosecondConverter helper class + *

+ * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch + * by NanosecondConverter helper class */ void writePoints(@Nonnull final List points); @@ -108,9 +112,9 @@ public interface InfluxDBClient extends AutoCloseable { * * @param points the list of {@link Point} to write, cannot be null * @param options the options for writing data to InfluxDB - *

- * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch - * by NanosecondConverter helper class + *

+ * Note: the timestamp passed will be converted to nanoseconds since the Unix epoch + * by NanosecondConverter helper class */ void writePoints(@Nonnull final List points, @Nonnull final WriteOptions options); @@ -456,7 +460,8 @@ Stream queryBatches(@Nonnull final String query, * Returns null if the server version can't be determined. */ @Nullable - String getServerVersion(); + String getServerVersion() + throws RuntimeException, ExecutionException, InterruptedException, JsonProcessingException; /** * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying @@ -470,7 +475,7 @@ Stream queryBatches(@Nonnull final String query, @Nonnull static InfluxDBClient getInstance(@Nonnull final String host, @Nullable final char[] token, - @Nullable final String database) { + @Nullable final String database) throws URISyntaxException, SSLException { ClientConfig config = new ClientConfig.Builder() .host(host) .token(token) @@ -494,7 +499,8 @@ static InfluxDBClient getInstance(@Nonnull final String host, static InfluxDBClient getInstance(@Nonnull final String host, @Nullable final char[] token, @Nullable final String database, - @Nullable Map defaultTags) { + @Nullable Map defaultTags) + throws URISyntaxException, SSLException { ClientConfig config = new ClientConfig.Builder() .host(host) @@ -515,7 +521,7 @@ static InfluxDBClient getInstance(@Nonnull final String host, * @return new instance of the {@link InfluxDBClient} */ @Nonnull - static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { + static InfluxDBClient getInstance(@Nonnull final ClientConfig config) throws URISyntaxException, SSLException { return new InfluxDBClientImpl(config); } @@ -545,7 +551,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { static InfluxDBClient getInstance(@Nonnull final String connectionString) { try { return getInstance(new ClientConfig.Builder().build(connectionString)); - } catch (MalformedURLException e) { + } catch (MalformedURLException | URISyntaxException | SSLException e) { throw new IllegalArgumentException(e); // same exception as ClientConfig.validate() } } @@ -585,7 +591,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { * @return instance of {@link InfluxDBClient} */ @Nonnull - static InfluxDBClient getInstance() { + static InfluxDBClient getInstance() throws URISyntaxException, SSLException { return getInstance(new ClientConfig.Builder().build(System.getenv(), System.getProperties())); } } diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index 7e0e8ac7..2040ed5c 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -21,9 +21,7 @@ */ package com.influxdb.v3.client.config; -import java.net.Authenticator; import java.net.MalformedURLException; -import java.net.ProxySelector; import java.net.URL; import java.time.Duration; import java.util.Arrays; @@ -60,13 +58,7 @@ *

  • queryTimeout - timeout used to calculate a default gRPC deadline when querying InfluxDB. * Can be null, in which case queries can potentially run forever.
  • *
  • allowHttpRedirects - allow redirects for InfluxDB connections
  • - *
  • disableServerCertificateValidation - - * disable server certificate validation for HTTPS connections - *
  • - *
  • proxyUrl - proxy url for query api and write api
  • - *
  • authenticator - HTTP proxy authenticator
  • *
  • headers - headers to be added to requests
  • - *
  • sslRootsFilePath - path to the stored certificates file in PEM format
  • *
  • disableGRPCCompression - disables the default gRPC compression header
  • * *

    @@ -81,7 +73,6 @@ * .writePrecision(WritePrecision.S) * .gzipThreshold(4096) * .writeNoSync(true) - * .proxyUrl("http://localhost:10000") * .build(); * * try (InfluxDBClient client = InfluxDBClient.getInstance(config)) { @@ -109,18 +100,9 @@ public final class ClientConfig { private final Duration writeTimeout; private final Duration queryTimeout; private final Boolean allowHttpRedirects; - private final Boolean disableServerCertificateValidation; - private final String proxyUrl; - private final Authenticator authenticator; private final Map headers; - private final String sslRootsFilePath; private final boolean disableGRPCCompression; - - /** - * Deprecated use {@link #proxyUrl}. - */ - @Deprecated - private final ProxySelector proxy; + private final NettyHttpClientConfig nettyHttpClientConfig; /** * Gets URL of the InfluxDB server. @@ -205,6 +187,7 @@ public Boolean getWriteNoSync() { /** * Gets default tags used when writing points. + * * @return default tags */ public Map getDefaultTags() { @@ -258,58 +241,6 @@ public Boolean getAllowHttpRedirects() { return allowHttpRedirects; } - /** - * Gets the disable server SSL certificate validation. Default to 'false'. - * - * @return the disable server SSL certificate validation - */ - @Nonnull - public Boolean getDisableServerCertificateValidation() { - return disableServerCertificateValidation; - } - - /** - * Gets the proxy. - * - * @return the proxy, may be null - * Deprecated use {@link #proxyUrl} - */ - @Nullable - @Deprecated - public ProxySelector getProxy() { - return proxy; - } - - /** - * Gets the proxy url. - * - * @return the proxy url, may be null - */ - @Nullable - public String getProxyUrl() { - return proxyUrl; - } - - /** - * Gets certificates file path. - * - * @return the certificates file path, may be null - */ - @Nullable - public String sslRootsFilePath() { - return sslRootsFilePath; - } - - /** - * Gets the (proxy) authenticator. - * - * @return the (proxy) authenticator - */ - @Nullable - public Authenticator getAuthenticator() { - return authenticator; - } - /** * Gets custom headers for requests. * @@ -329,6 +260,11 @@ public boolean getDisableGRPCCompression() { return disableGRPCCompression; } + //fixme comments + public NettyHttpClientConfig getNettyHttpClientConfig() { + return nettyHttpClientConfig; + } + /** * Validates the configuration properties. */ @@ -360,12 +296,7 @@ public boolean equals(final Object o) { && Objects.equals(writeTimeout, that.writeTimeout) && Objects.equals(queryTimeout, that.queryTimeout) && Objects.equals(allowHttpRedirects, that.allowHttpRedirects) - && Objects.equals(disableServerCertificateValidation, that.disableServerCertificateValidation) - && Objects.equals(proxy, that.proxy) - && Objects.equals(proxyUrl, that.proxyUrl) - && Objects.equals(authenticator, that.authenticator) && Objects.equals(headers, that.headers) - && Objects.equals(sslRootsFilePath, that.sslRootsFilePath) && disableGRPCCompression == that.disableGRPCCompression; } @@ -373,9 +304,8 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, database, writePrecision, gzipThreshold, writeNoSync, - timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, - proxy, proxyUrl, authenticator, headers, - defaultTags, sslRootsFilePath, disableGRPCCompression); + timeout, writeTimeout, queryTimeout, allowHttpRedirects, + headers, defaultTags, disableGRPCCompression); } @Override @@ -391,13 +321,8 @@ public String toString() { .add("writeTimeout=" + writeTimeout) .add("queryTimeout=" + queryTimeout) .add("allowHttpRedirects=" + allowHttpRedirects) - .add("disableServerCertificateValidation=" + disableServerCertificateValidation) - .add("proxy=" + proxy) - .add("proxyUrl=" + proxyUrl) - .add("authenticator=" + authenticator) .add("headers=" + headers) .add("defaultTags=" + defaultTags) - .add("sslRootsFilePath=" + sslRootsFilePath) .add("disableGRPCCompression=" + disableGRPCCompression) .toString(); } @@ -422,13 +347,9 @@ public static final class Builder { private Duration writeTimeout; private Duration queryTimeout; private Boolean allowHttpRedirects; - private Boolean disableServerCertificateValidation; - private ProxySelector proxy; - private String proxyUrl; - private Authenticator authenticator; private Map headers; - private String sslRootsFilePath; private boolean disableGRPCCompression; + private NettyHttpClientConfig nettyHttpClientConfig; /** * Sets the URL of the InfluxDB server. @@ -557,9 +478,8 @@ public Builder defaultTags(@Nullable final Map defaultTags) { * Deprecated in v1.4.0. This setter is superseded by the clearer writeTimeout(). * * @param timeout default timeout to use for Write API calls. Default to - * ''{@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds'. + * ''{@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds'. * @return this - * * @see #writeTimeout(Duration writeTimeout) */ @Deprecated @@ -571,18 +491,18 @@ public Builder timeout(@Nullable final Duration timeout) { } /** - * Sets the default writeTimeout to use for Write API calls in the REST client. - * Default is {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} + * Sets the default writeTimeout to use for Write API calls in the REST client. + * Default is {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} * * @param writeTimeout default timeout to use for REST API write calls. Default is - * {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} + * {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} * @return - this */ @Nonnull public Builder writeTimeout(@Nullable final Duration writeTimeout) { - this.writeTimeout = writeTimeout; - return this; + this.writeTimeout = writeTimeout; + return this; } /** @@ -596,8 +516,8 @@ public Builder writeTimeout(@Nullable final Duration writeTimeout) { */ @Nonnull public Builder queryTimeout(@Nullable final Duration queryTimeout) { - this.queryTimeout = queryTimeout; - return this; + this.queryTimeout = queryTimeout; + return this; } /** @@ -613,59 +533,6 @@ public Builder allowHttpRedirects(@Nullable final Boolean allowHttpRedirects) { return this; } - /** - * Sets the disable server SSL certificate validation. Default to 'false'. - * - * @param disableServerCertificateValidation Disable server SSL certificate validation. Default to 'false'. - * @return this - */ - @Nonnull - public Builder disableServerCertificateValidation(@Nullable final Boolean disableServerCertificateValidation) { - - this.disableServerCertificateValidation = disableServerCertificateValidation; - return this; - } - - /** - * Sets the proxy selector. Default is 'null'. - * - * @param proxy Proxy selector. - * @return this - * Deprecated use {@link #proxyUrl} - */ - @Nonnull - public Builder proxy(@Nullable final ProxySelector proxy) { - - this.proxy = proxy; - return this; - } - - /** - * Sets the proxy url. Default is 'null'. - * - * @param proxyUrl Proxy url. - * @return this - */ - @Nonnull - public Builder proxyUrl(@Nullable final String proxyUrl) { - - this.proxyUrl = proxyUrl; - return this; - } - - /** - * Sets the proxy authenticator. Default is 'null'. - * - * @param authenticator Proxy authenticator. Ignored if 'proxy' is not set. - * @return this - */ - @Nonnull - public Builder authenticator(@Nullable final Authenticator authenticator) { - - this.authenticator = authenticator; - return this; - } - /** * Sets the custom headers that will be added to requests. This is useful for adding custom headers to requests, * such as tracing headers. To add custom headers use following code: @@ -698,19 +565,6 @@ public Builder headers(@Nullable final Map headers) { return this; } - /** - * Sets certificate file path. Default is 'null'. - * - * @param sslRootsFilePath The certificate file path - * @return this - */ - @Nonnull - public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) { - - this.sslRootsFilePath = sslRootsFilePath; - return this; - } - /** * Sets whether to disable gRPC compression. Default is 'false'. * @@ -723,6 +577,12 @@ public Builder disableGRPCCompression(final boolean disableGRPCCompression) { return this; } + @Nonnull + public Builder nettyHttpClientConfig(final NettyHttpClientConfig nettyHttpClientConfig) { + this.nettyHttpClientConfig = nettyHttpClientConfig; + return this; + } + /** * Build an instance of {@code ClientConfig}. * @@ -837,7 +697,7 @@ public ClientConfig build(@Nonnull final Map env, final Properti this.queryTimeout(Duration.ofSeconds(to)); } final String disableGRPCCompression = get.apply("INFLUX_DISABLE_GRPC_COMPRESSION", - "influx.disableGRPCCompression"); + "influx.disableGRPCCompression"); if (disableGRPCCompression != null) { this.disableGRPCCompression(Boolean.parseBoolean(disableGRPCCompression)); } @@ -885,17 +745,12 @@ private ClientConfig(@Nonnull final Builder builder) { defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); writeTimeout = builder.writeTimeout != null - ? builder.writeTimeout : builder.timeout != null - ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); + ? builder.writeTimeout : builder.timeout != null + ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); queryTimeout = builder.queryTimeout; allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false; - disableServerCertificateValidation = builder.disableServerCertificateValidation != null - ? builder.disableServerCertificateValidation : false; - proxy = builder.proxy; - proxyUrl = builder.proxyUrl; - authenticator = builder.authenticator; headers = builder.headers; - sslRootsFilePath = builder.sslRootsFilePath; disableGRPCCompression = builder.disableGRPCCompression; + nettyHttpClientConfig = builder.nettyHttpClientConfig; } } diff --git a/src/main/java/com/influxdb/v3/client/config/NettyHttpClientConfig.java b/src/main/java/com/influxdb/v3/client/config/NettyHttpClientConfig.java new file mode 100644 index 00000000..0cc17c1f --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/config/NettyHttpClientConfig.java @@ -0,0 +1,67 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.config; + +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +import io.grpc.ProxyDetector; +import io.netty.handler.proxy.HttpProxyHandler; +import io.netty.handler.ssl.SslContext; + +// fixme refactor +public class NettyHttpClientConfig { + + private SslContext sslContext; + + private HttpProxyHandler httpProxyHandler; + + private ProxyDetector proxyDetector; + + public NettyHttpClientConfig() { + } + + public void configureSsl(@Nonnull final Supplier configureSsl) { + this.sslContext = configureSsl.get(); + } + + public void configureChannelProxy(@Nonnull final Supplier configureHttpProxyHandler) { + this.httpProxyHandler = configureHttpProxyHandler.get(); + } + + public void configureManagedChannelProxy(@Nonnull final ProxyDetector configureManagedChannelProxy) { + this.proxyDetector = configureManagedChannelProxy; + } + + public SslContext getSslContext() { + return sslContext; + } + + public ProxyDetector getProxyDetector() { + return proxyDetector; + } + + public HttpProxyHandler getHttpProxyHandler() { + return httpProxyHandler; + } + +} diff --git a/src/main/java/com/influxdb/v3/client/internal/ClientChannelInitializer.java b/src/main/java/com/influxdb/v3/client/internal/ClientChannelInitializer.java new file mode 100644 index 00000000..ef261ed8 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/ClientChannelInitializer.java @@ -0,0 +1,83 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.oio.OioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.proxy.HttpProxyHandler; +import io.netty.handler.proxy.ProxyHandler; +import io.netty.handler.ssl.SslContext; + +public class ClientChannelInitializer extends ChannelInitializer { + + private final SslContext sslCtx; + + private final String host; + + private final Integer port; + + private final ProxyHandler proxyHandler; + + private ChannelHandler[] h; + + public ClientChannelInitializer(@Nonnull final String host, + @Nonnull final Integer port, + @Nullable final SslContext sslCtx, + @Nullable final HttpProxyHandler proxyHandler, + final ChannelHandler... handlers + + ) { + this.sslCtx = sslCtx; + this.host = host; + this.port = port; + this.proxyHandler = proxyHandler; + this.h = handlers; + } + + @Override + public void initChannel(final OioSocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + if (proxyHandler != null) { + p.addLast(proxyHandler); + } + if (sslCtx != null) { + p.addLast("ssl", sslCtx.newHandler(ch.alloc(), host, port)); + } + p.addLast(new HttpClientCodec()); + p.addLast(new HttpObjectAggregator(1048576)); + if (h != null) { + for (ChannelHandler handler : h) { + p.addLast(handler); + } + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/ClientHandler.java b/src/main/java/com/influxdb/v3/client/internal/ClientHandler.java new file mode 100644 index 00000000..d89bc3c2 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/ClientHandler.java @@ -0,0 +1,56 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.util.concurrent.CompletableFuture; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; + +@ChannelHandler.Sharable +public class ClientHandler extends SimpleChannelInboundHandler { + + private final CompletableFuture responseFuture = new CompletableFuture<>(); + + public ClientHandler() { + + } + + @Override + public void channelRead0(final ChannelHandlerContext ctx, final FullHttpResponse msg) { + this.responseFuture.complete(msg.retain()); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + this.responseFuture.completeExceptionally(cause); + cause.printStackTrace(); + ctx.close(); + } + + public CompletableFuture getResponseFuture() { + return responseFuture; + } + +} diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index d80c7e87..6e50cacd 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -21,8 +21,6 @@ */ package com.influxdb.v3.client.internal; -import java.io.FileInputStream; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -38,19 +36,16 @@ import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.net.ssl.SSLException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.grpc.Codec; import io.grpc.DecompressorRegistry; -import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.Metadata; -import io.grpc.ProxyDetector; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightGrpcUtils; @@ -78,7 +73,7 @@ final class FlightSqlClient implements AutoCloseable { private final Map defaultHeaders = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); - FlightSqlClient(@Nonnull final ClientConfig config) { + FlightSqlClient(@Nonnull final ClientConfig config) throws SSLException { this(config, null); } @@ -88,7 +83,7 @@ final class FlightSqlClient implements AutoCloseable { * @param config the client configuration * @param client the flight client, if null a new client will be created */ - FlightSqlClient(@Nonnull final ClientConfig config, @Nullable final FlightClient client) { + FlightSqlClient(@Nonnull final ClientConfig config, @Nullable final FlightClient client) throws SSLException { Arguments.checkNotNull(config, "config"); if (config.getToken() != null && config.getToken().length > 0) { @@ -144,7 +139,7 @@ public void close() throws Exception { } @Nonnull - private FlightClient createFlightClient(@Nonnull final ClientConfig config) { + private FlightClient createFlightClient(@Nonnull final ClientConfig config) throws SSLException { URI uri = createLocation(config).getUri(); final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()); @@ -152,20 +147,16 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { if (LocationSchemes.GRPC_TLS.equals(uri.getScheme())) { nettyChannelBuilder.useTransportSecurity(); - - SslContext nettySslContext = createNettySslContext(config); - nettyChannelBuilder.sslContext(nettySslContext); + SslContext sslContext = config.getNettyHttpClientConfig() != null + && config.getNettyHttpClientConfig().getSslContext() != null + ? config.getNettyHttpClientConfig().getSslContext() : GrpcSslContexts.forClient().build(); + nettyChannelBuilder.sslContext(sslContext); } else { nettyChannelBuilder.usePlaintext(); } - if (config.getProxyUrl() != null) { - ProxyDetector proxyDetector = createProxyDetector(config.getHost(), config.getProxyUrl()); - nettyChannelBuilder.proxyDetector(proxyDetector); - } - - if (config.getProxy() != null) { - LOG.warn("proxy property in ClientConfig will not work in query api, use proxyUrl property instead"); + if (config.getNettyHttpClientConfig() != null && config.getNettyHttpClientConfig().getProxyDetector() != null) { + nettyChannelBuilder.proxyDetector(config.getNettyHttpClientConfig().getProxyDetector()); } nettyChannelBuilder.maxTraceEvents(0) @@ -179,24 +170,6 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build()); } - @Nonnull - SslContext createNettySslContext(@Nonnull final ClientConfig config) { - try { - SslContextBuilder sslContextBuilder; - sslContextBuilder = GrpcSslContexts.forClient(); - if (config.getDisableServerCertificateValidation()) { - sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); - } else if (config.sslRootsFilePath() != null) { - try (FileInputStream fileInputStream = new FileInputStream(config.sslRootsFilePath())) { - sslContextBuilder.trustManager(fileInputStream); - } - } - return sslContextBuilder.build(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - @Nonnull private Location createLocation(@Nonnull final ClientConfig config) { try { @@ -226,22 +199,6 @@ private HeaderCallOption metadataHeader(@Nonnull final Map reque return new HeaderCallOption(metadata); } - ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull final String proxyUrl) { - URI targetUri = URI.create(targetUrl); - URI proxyUri = URI.create(proxyUrl); - return (targetServerAddress) -> { - InetSocketAddress targetAddress = (InetSocketAddress) targetServerAddress; - if (targetUri.getHost().equals(targetAddress.getHostString()) - && targetUri.getPort() == targetAddress.getPort()) { - return HttpConnectProxiedSocketAddress.newBuilder() - .setProxyAddress(new InetSocketAddress(proxyUri.getHost(), proxyUri.getPort())) - .setTargetAddress(targetAddress) - .build(); - } - return null; - }; - } - private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 50db570a..527b5426 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -23,12 +23,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.logging.Logger; @@ -38,6 +40,7 @@ import java.util.zip.GZIPOutputStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.net.ssl.SSLException; import io.grpc.Deadline; import io.netty.handler.codec.http.HttpMethod; @@ -47,7 +50,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import com.influxdb.v3.client.InfluxDBApiException; -import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBApiNettyException; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; @@ -93,7 +96,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient { * * @param config the client config. */ - public InfluxDBClientImpl(@Nonnull final ClientConfig config) { + public InfluxDBClientImpl(@Nonnull final ClientConfig config) throws URISyntaxException, SSLException { this(config, null, null); } @@ -106,13 +109,13 @@ public InfluxDBClientImpl(@Nonnull final ClientConfig config) { */ InfluxDBClientImpl(@Nonnull final ClientConfig config, @Nullable final RestClient restClient, - @Nullable final FlightSqlClient flightSqlClient) { + @Nullable final FlightSqlClient flightSqlClient) throws URISyntaxException, SSLException { Arguments.checkNotNull(config, "config"); config.validate(); this.config = config; - this.restClient = restClient != null ? restClient : new RestClient(config); + this.restClient = new RestClient(config); this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config); this.emptyWriteOptions = new WriteOptions(null); } @@ -279,7 +282,7 @@ public Stream queryBatches(@Nonnull final String query, } @Override - public String getServerVersion() { + public String getServerVersion() throws RuntimeException, ExecutionException, InterruptedException { return this.restClient.getServerVersion(); } @@ -364,14 +367,16 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti headers.putAll(options.headersSafe()); try { - restClient.request(path, HttpMethod.POST, body, queryParams, headers); - } catch (InfluxDBApiHttpException e) { + restClient.request(HttpMethod.POST, path, headers, body, queryParams); + } catch (InfluxDBApiNettyException e) { if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { // Server does not support the v3 write API, can't use the NoSync option. - throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " + throw new InfluxDBApiNettyException("Server doesn't support write with NoSync=true " + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); } throw e; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } } @@ -414,14 +419,14 @@ private Stream queryData(@Nonnull final String query, }); GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder() - .fromGrpcCallOptions(options.grpcCallOptions()); + .fromGrpcCallOptions(options.grpcCallOptions()); if (config.getQueryTimeout() == null) { if (options.grpcCallOptions().getDeadline() != null - && options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { + && options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { LOG.warning("Query timeout " - + options.grpcCallOptions().getDeadline() - + " is 0 or negative and will be ignored."); + + options.grpcCallOptions().getDeadline() + + " is 0 or negative and will be ignored."); builder.withoutDeadline(); } } else { @@ -429,10 +434,10 @@ private Stream queryData(@Nonnull final String query, builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); } else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { LOG.warning("Query timeout " - + options.grpcCallOptions().getDeadline() - + " is 0 or negative. Using config.queryTimeout " - + config.getQueryTimeout() - + " instead."); + + options.grpcCallOptions().getDeadline() + + " is 0 or negative. Using config.queryTimeout " + + config.getQueryTimeout() + + " instead."); builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); } } diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 0453e662..7920c66e 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -21,73 +21,90 @@ */ package com.influxdb.v3.client.internal; -import java.io.FileInputStream; -import java.net.InetSocketAddress; -import java.net.ProxySelector; import java.net.URI; import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.security.KeyStore; -import java.security.SecureRandom; -import java.security.cert.Certificate; -import java.security.cert.CertificateFactory; -import java.security.cert.X509Certificate; -import java.util.ArrayList; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Stream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509TrustManager; +import javax.net.ssl.SSLException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.oio.OioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.proxy.HttpProxyHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.influxdb.v3.client.InfluxDBApiException; -import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBApiNettyException; import com.influxdb.v3.client.config.ClientConfig; final class RestClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); - private static final TrustManager[] TRUST_ALL_CERTS = new TrustManager[]{ - new X509TrustManager() { - public X509Certificate[] getAcceptedIssuers() { - return null; - } - - public void checkClientTrusted( - final X509Certificate[] certs, final String authType) { - } - - public void checkServerTrusted( - final X509Certificate[] certs, final String authType) { - } - } - }; +// private static final TrustManager[] TRUST_ALL_CERTS = new TrustManager[]{ +// new X509TrustManager() { +// public X509Certificate[] getAcceptedIssuers() { +// return null; +// } +// +// public void checkClientTrusted( +// final X509Certificate[] certs, final String authType) { +// } +// +// public void checkServerTrusted( +// final X509Certificate[] certs, final String authType) { +// } +// } +// }; final String baseUrl; final String userAgent; - final HttpClient client; + private final Integer port; + private final String host; + private SslContext sslContext; + final Duration timeout; + Channel channel; + + private final EventLoopGroup eventLoopGroup; + + private final ClientHandler clientHandler; + private HttpProxyHandler proxyHandler; + private final ClientConfig config; private final Map defaultHeaders; private final ObjectMapper objectMapper = new ObjectMapper(); - RestClient(@Nonnull final ClientConfig config) { + RestClient(@Nonnull final ClientConfig config) throws SSLException, URISyntaxException { Arguments.checkNotNull(config, "config"); this.config = config; @@ -95,60 +112,68 @@ public void checkServerTrusted( // user agent version this.userAgent = Identity.getUserAgent(); - // URL String host = config.getHost(); this.baseUrl = host.endsWith("/") ? host : String.format("%s/", host); - // timeout and redirects - HttpClient.Builder builder = HttpClient.newBuilder() - .connectTimeout(config.getWriteTimeout()) - .followRedirects(config.getAllowHttpRedirects() - ? HttpClient.Redirect.NORMAL : HttpClient.Redirect.NEVER); + URI uri = new URI(config.getHost()); + String scheme = uri.getScheme() == null ? "http" : uri.getScheme(); + int port = uri.getPort(); + if (port == -1) { + if ("http".equalsIgnoreCase(scheme)) { + port = 80; + } else if ("https".equalsIgnoreCase(scheme)) { + port = 443; + } + } + this.port = port; + this.host = uri.getHost(); - // default headers - this.defaultHeaders = config.getHeaders() != null ? Map.copyOf(config.getHeaders()) : null; + this.timeout = config.getWriteTimeout(); - if (config.getProxyUrl() != null) { - URI proxyUri = URI.create(config.getProxyUrl()); - ProxySelector proxy = ProxySelector.of(new InetSocketAddress(proxyUri.getHost(), proxyUri.getPort())); - builder.proxy(proxy); - if (config.getAuthenticator() != null) { - builder.authenticator(config.getAuthenticator()); - } - } else if (config.getProxy() != null) { - builder.proxy(config.getProxy()); - if (config.getAuthenticator() != null) { - builder.authenticator(config.getAuthenticator()); + if ("https".equalsIgnoreCase(scheme)) { + if (config.getNettyHttpClientConfig() != null + && config.getNettyHttpClientConfig().getSslContext() != null) { + this.sslContext = config.getNettyHttpClientConfig().getSslContext(); + } else { + this.sslContext = SslContextBuilder.forClient().build(); } } - if (baseUrl.startsWith("https")) { - try { - SSLContext sslContext = SSLContext.getInstance("TLS"); - if (config.getDisableServerCertificateValidation()) { - sslContext.init(null, TRUST_ALL_CERTS, new SecureRandom()); - } else if (config.sslRootsFilePath() != null) { - X509TrustManager x509TrustManager = getX509TrustManagerFromFile(config.sslRootsFilePath()); - sslContext.init(null, new X509TrustManager[]{x509TrustManager}, new SecureRandom()); - } else { - sslContext.init(null, null, new SecureRandom()); - } - builder.sslContext(sslContext); - } catch (Exception e) { - throw new RuntimeException(e); - } + this.eventLoopGroup = new OioEventLoopGroup(); + + //fixme redirects ??? + + // default headers + this.defaultHeaders = config.getHeaders() != null ? Map.copyOf(config.getHeaders()) : null; + + this.clientHandler = new ClientHandler(); + + if (this.config.getNettyHttpClientConfig() != null + && this.config.getNettyHttpClientConfig().getHttpProxyHandler() != null) { + this.proxyHandler = this.config.getNettyHttpClientConfig().getHttpProxyHandler(); } - this.client = builder.build(); } - public String getServerVersion() { + public Bootstrap getBootstrap() { + Bootstrap b = new Bootstrap(); + b.group(this.eventLoopGroup) + .channel(OioSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .handler(new ClientChannelInitializer(this.host, this.port, this.sslContext, this.proxyHandler, + this.clientHandler)) + .remoteAddress(this.host, this.port); + return b; + } + + public String getServerVersion() throws ExecutionException, InterruptedException { String influxdbVersion; - HttpResponse response = request("ping", HttpMethod.GET, null, null, null); + FullHttpResponse response = this.request(HttpMethod.GET, "/ping"); try { - influxdbVersion = response.headers().firstValue("X-Influxdb-Version").orElse(null); + influxdbVersion = response.headers().get("x-influxdb-version"); if (influxdbVersion == null) { - JsonNode jsonNode = objectMapper.readTree(response.body()); + var str = response.content().toString(CharsetUtil.UTF_8); + JsonNode jsonNode = objectMapper.readTree(str); influxdbVersion = Optional.ofNullable(jsonNode.get("version")).map(JsonNode::asText).orElse(null); } } catch (JsonProcessingException e) { @@ -158,138 +183,139 @@ public String getServerVersion() { return influxdbVersion; } - HttpResponse request(@Nonnull final String path, - @Nonnull final HttpMethod method, - @Nullable final byte[] data, - @Nullable final Map queryParams, - @Nullable final Map headers) { - - QueryStringEncoder uriEncoder = new QueryStringEncoder(String.format("%s%s", baseUrl, path)); - if (queryParams != null) { - queryParams.forEach((name, value) -> { - if (value != null && !value.isEmpty()) { - uriEncoder.addParam(name, value); - } - }); - } + public FullHttpResponse request(@Nonnull final HttpMethod method, @Nonnull final String path, + @Nonnull final Map headers) + throws RuntimeException, InterruptedException, ExecutionException { + return request(method, path, headers, null, null); + } - HttpRequest.Builder request = HttpRequest.newBuilder(); + public FullHttpResponse request(@Nonnull final HttpMethod method, @Nonnull final String path) + throws RuntimeException, InterruptedException, ExecutionException { + return request(method, path, null, null, null); + } - // uri + public FullHttpResponse request(@Nonnull final HttpMethod method, + @Nonnull final String path, + @Nullable final Map headers, + @Nullable final byte[] body, + @Nullable final Map queryParams) + throws RuntimeException, InterruptedException, ExecutionException { + FullHttpResponse fullHttpResponse = null; try { - request.uri(uriEncoder.toUri()); - } catch (URISyntaxException e) { - throw new InfluxDBApiException(e); - } + var content = Unpooled.EMPTY_BUFFER; + if (body != null) { + content = Unpooled.copiedBuffer(body); + } + + String uri = path.startsWith("/") ? path : "/" + path; + if (queryParams != null) { + QueryStringEncoder queryStringEncoder = new QueryStringEncoder("/" + path); + queryParams.forEach((key, value) -> { + if (value != null) { + queryStringEncoder.addParam(key, value); + } + }); + uri = queryStringEncoder.toString(); + } - // method and body - request.method(method.name(), data == null - ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(data)); - // headers - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - request.header(entry.getKey(), entry.getValue()); + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, content); + + if (this.defaultHeaders != null) { + this.defaultHeaders.forEach((s, s2) -> request.headers().set(s, s2)); } - } - if (defaultHeaders != null) { - for (Map.Entry entry : defaultHeaders.entrySet()) { - if (headers == null || !headers.containsKey(entry.getKey())) { - request.header(entry.getKey(), entry.getValue()); + + request.headers().add("user-agent", this.userAgent); + request.headers().add("host", String.format("%s:%d", this.host, this.port)); + request.headers().add("content-length", body == null ? "0" : body.length + ""); + if (this.config.getToken() != null && this.config.getToken().length > 0) { + String authScheme = config.getAuthScheme(); + if (authScheme == null) { + authScheme = "Token"; } + request.headers() + .add("authorization", String.format("%s %s", authScheme, new String(this.config.getToken()))); } - } - request.header("User-Agent", userAgent); - if (config.getToken() != null && config.getToken().length > 0) { - String authScheme = config.getAuthScheme(); - if (authScheme == null) { - authScheme = "Token"; + + request.headers().add("accept", "*/*"); + + if (headers != null) { + headers.forEach(request.headers()::set); } - request.header("Authorization", String.format("%s %s", authScheme, new String(config.getToken()))); - } - HttpResponse response; - try { - response = client.send(request.build(), HttpResponse.BodyHandlers.ofString()); - } catch (Exception e) { - throw new InfluxDBApiException(e); - } - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - String reason = ""; - String body = response.body(); - if (!body.isEmpty()) { - try { - final JsonNode root = objectMapper.readTree(body); - final List possibilities = List.of("message", "error_message", "error"); - for (final String field : possibilities) { - final JsonNode node = root.findValue(field); - if (node != null) { - reason = node.asText(); + if (this.channel == null || !this.channel.isOpen()) { + ChannelFuture channelFuture = getBootstrap().connect(); + if (!channelFuture.await(this.timeout.toMillis(), TimeUnit.MILLISECONDS)) { + throw new InfluxDBApiException(new ConnectTimeoutException()); + } + this.channel = channelFuture.channel(); + } + this.channel.writeAndFlush(request).sync(); + + fullHttpResponse = this.clientHandler.getResponseFuture().get(); + + HttpHeaders responseHeaders = new DefaultHttpHeaders(); + fullHttpResponse.headers().forEach(entry -> responseHeaders.add(entry.getKey(), entry.getValue())); + int statusCode = fullHttpResponse.status().code(); + if (statusCode < 200 || statusCode >= 300) { + String reason = ""; + var strContent = fullHttpResponse.content().toString(CharsetUtil.UTF_8); + if (!strContent.isEmpty()) { + try { + final JsonNode root = objectMapper.readTree(strContent); + final List possibilities = List.of("message", "error_message", "error"); + for (final String field : possibilities) { + final JsonNode node = root.findValue(field); + if (node != null) { + reason = node.asText(); + break; + } + } + } catch (JsonProcessingException e) { + LOG.debug("Can't parse msg from response {}", fullHttpResponse); + } + } + + if (reason.isEmpty()) { + for (String s : List.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error")) { + if (responseHeaders.contains(s.toLowerCase())) { + reason = responseHeaders.get(s.toLowerCase()); break; } } - } catch (JsonProcessingException e) { - LOG.debug("Can't parse msg from response {}", response); } - } - if (reason.isEmpty()) { - reason = Stream.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error") - .map(name -> response.headers().firstValue(name).orElse(null)) - .filter(message -> message != null && !message.isEmpty()).findFirst() - .orElse(""); - } + if (reason.isEmpty()) { + reason = strContent; + } - if (reason.isEmpty()) { - reason = body; - } + if (reason.isEmpty()) { + reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase(); + } - if (reason.isEmpty()) { - reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase(); - } - String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); - throw new InfluxDBApiHttpException(message, response.headers(), response.statusCode()); + String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); + + throw new InfluxDBApiNettyException(message, responseHeaders, statusCode); + } + } finally { + //fixme Should we close it after every request? + closeChannel(); } - return response; + return fullHttpResponse; } - private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { - try { - KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - trustStore.load(null); - - FileInputStream fis = new FileInputStream(filePath); - List certificates = new ArrayList( - CertificateFactory.getInstance("X.509") - .generateCertificates(fis) - ); - - for (int i = 0; i < certificates.size(); i++) { - Certificate cert = certificates.get(i); - trustStore.setCertificateEntry("alias" + i, cert); - } - - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm() - ); - trustManagerFactory.init(trustStore); - X509TrustManager x509TrustManager = null; - for (TrustManager trustManager : trustManagerFactory.getTrustManagers()) { - if (trustManager instanceof X509TrustManager) { - x509TrustManager = (X509TrustManager) trustManager; - } - } - return x509TrustManager; - } catch (Exception e) { - throw new RuntimeException(e); + private void closeChannel() throws InterruptedException { + if (this.channel != null && this.channel.isOpen()) { + this.channel.close().sync(); } } @Override - public void close() { + public void close() throws InterruptedException { + this.eventLoopGroup.shutdownGracefully(); + this.closeChannel(); } } diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index d9bf4dd2..81a3c693 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.math.BigInteger; +import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -34,6 +35,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; +import javax.net.ssl.SSLException; import io.grpc.Deadline; import org.apache.arrow.flight.CallStatus; @@ -69,7 +71,7 @@ void closeClient() throws Exception { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void queryWrite() { + void queryWrite() throws URISyntaxException, SSLException { client = getInstance(); String measurement = "integration_test"; @@ -114,7 +116,7 @@ void queryWrite() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void queryBatches() { + void queryBatches() throws URISyntaxException, SSLException { client = getInstance(); try (Stream batches = client.queryBatches("SELECT * FROM integration_test")) { @@ -129,24 +131,24 @@ void queryBatches() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void queryWriteGzip() { - client = InfluxDBClient.getInstance(new ClientConfig.Builder() + void queryWriteGzip() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) .database(System.getenv("TESTING_INFLUXDB_DATABASE")) .gzipThreshold(1) - .build()); + .build());) { + String measurement = "integration_test"; + long testId = System.currentTimeMillis(); + client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId); - String measurement = "integration_test"; - long testId = System.currentTimeMillis(); - client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId); - - String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId); - try (Stream stream = client.query(sql)) { - stream.forEach(row -> { - Assertions.assertThat(row).hasSize(1); - Assertions.assertThat(row[0]).isEqualTo(123.0); - }); + String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId); + try (Stream stream = client.query(sql)) { + stream.forEach(row -> { + Assertions.assertThat(row).hasSize(1); + Assertions.assertThat(row[0]).isEqualTo(123.0); + }); + } } } @@ -154,7 +156,7 @@ void queryWriteGzip() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void queryWriteParameters() { + void queryWriteParameters() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -186,7 +188,7 @@ void queryWriteParameters() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void iteratingMoreVectorSchemaRoots() { + void iteratingMoreVectorSchemaRoots() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -207,7 +209,7 @@ void iteratingMoreVectorSchemaRoots() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void pointValues() { + void pointValues() throws URISyntaxException, SSLException { Instant timestamp = Instant.now().minus(1, ChronoUnit.DAYS); @@ -243,7 +245,7 @@ void pointValues() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void handleFlightRuntimeException() throws IOException { + public void handleFlightRuntimeException() throws IOException, URISyntaxException { Instant now = Instant.now(); String measurement = "/influxdb3-java/test/ITQueryWrite/handleFlightRuntimeException"; @@ -312,7 +314,7 @@ public void handleFlightRuntimeException() throws IOException { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void queryTimeoutExceededTest() { + public void queryTimeoutExceededTest() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -342,7 +344,7 @@ public void queryTimeoutExceededTest() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void queryTimeoutOKTest() { + public void queryTimeoutOKTest() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -370,7 +372,7 @@ public void queryTimeoutOKTest() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void queryTimeoutOtherGrpcOptUnaffectedTest() { + public void queryTimeoutOtherGrpcOptUnaffectedTest() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -406,7 +408,7 @@ public void queryTimeoutOtherGrpcOptUnaffectedTest() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void queryTimeoutSuperceededByGrpcOptTest() { + public void queryTimeoutSuperceededByGrpcOptTest() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) @@ -423,7 +425,7 @@ public void queryTimeoutSuperceededByGrpcOptTest() { QueryOptions queryOptions = QueryOptions.defaultQueryOptions(); queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder() - .withDeadline(Deadline.after(500, TimeUnit.MILLISECONDS)) + .withDeadline(Deadline.after(900, TimeUnit.MILLISECONDS)) .build() ); @@ -441,8 +443,8 @@ public void queryTimeoutSuperceededByGrpcOptTest() { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void repeatQueryWithTimeoutTest() { - long timeout = 1000; + public void repeatQueryWithTimeoutTest() throws URISyntaxException, SSLException { + long timeout = 2000; client = InfluxDBClient.getInstance(new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) @@ -471,7 +473,7 @@ public void repeatQueryWithTimeoutTest() { @Test @Disabled("Runs across issue 12109 in Grpc-Java library ") - public void queryGrpcMaxOutSizeTest() { + public void queryGrpcMaxOutSizeTest() throws URISyntaxException, SSLException { // See Grpc-java issue 12109 https://github.com/grpc/grpc-java/issues/12109 // TODO - re-enable after 12109 has a fix and dependencies are updated client = InfluxDBClient.getInstance(new ClientConfig.Builder() @@ -506,7 +508,7 @@ public void queryGrpcMaxOutSizeTest() { } @Nonnull - private static InfluxDBClient getInstance() { + private static InfluxDBClient getInstance() throws URISyntaxException, SSLException { return InfluxDBClient.getInstance( System.getenv("TESTING_INFLUXDB_URL"), System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 4cb1ccb4..a7b74d14 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -27,28 +27,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import com.influxdb.v3.client.config.ClientConfig; - public class InfluxDBClientTest { - @Test - void withProxyUrl() { - String proxyUrl = "http://localhost:10000"; - ClientConfig.Builder builder = new ClientConfig.Builder(); - builder.proxyUrl(proxyUrl); - ClientConfig clientConfig = builder.build(); - Assertions.assertThat(clientConfig.getProxyUrl()).isEqualTo(proxyUrl); - } - - @Test - void withSslRootsFilePath() { - String path = "/path/to/cert"; - ClientConfig.Builder builder = new ClientConfig.Builder(); - builder.sslRootsFilePath(path); - ClientConfig clientConfig = builder.build(); - Assertions.assertThat(clientConfig.sslRootsFilePath()).isEqualTo(path); - } - @Test void requiredHost() { diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 4d3a2cba..130966d1 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -21,6 +21,7 @@ */ package com.influxdb.v3.client; +import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nonnull; +import javax.net.ssl.SSLException; import io.netty.handler.codec.http.HttpResponseStatus; import mockwebserver3.RecordedRequest; @@ -49,7 +51,7 @@ class InfluxDBClientWriteTest extends AbstractMockServerTest { private InfluxDBClient client; @BeforeEach - void initClient() { + void initClient() throws URISyntaxException, SSLException { client = InfluxDBClient.getInstance(baseURL, "my-token".toCharArray(), "my-database"); } @@ -228,7 +230,7 @@ void writeNoSyncTrueUsesV3API() throws InterruptedException { void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); - InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, + InfluxDBApiNettyException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiNettyException.class, () -> client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) ); @@ -527,8 +529,8 @@ public void retryHandled429Test() { Throwable thrown = catchThrowable(() -> client.writePoint(point)); assertThat(thrown).isNotNull(); - assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); - InfluxDBApiHttpException he = (InfluxDBApiHttpException) thrown; + assertThat(thrown).isInstanceOf(InfluxDBApiNettyException.class); + InfluxDBApiNettyException he = (InfluxDBApiNettyException) thrown; assertThat(he.headers()).isNotNull(); assertThat(he.getHeader("retry-after").get(0)) .isNotNull().isEqualTo("42"); @@ -564,7 +566,7 @@ public void timeoutExceededTest() { }); assertThat(thrown).isNotNull(); assertThat(thrown).isInstanceOf(InfluxDBApiException.class); - assertThat(thrown.getMessage()).contains("java.net.http.HttpConnectTimeoutException"); + assertThat(thrown.getMessage()).contains("io.netty.channel.ConnectTimeoutException"); } catch (Exception e) { throw new RuntimeException(e); @@ -594,7 +596,7 @@ public void writeTimeoutExceededTest() { }); assertThat(thrown).isNotNull(); assertThat(thrown).isInstanceOf(InfluxDBApiException.class); - assertThat(thrown.getMessage()).contains("java.net.http.HttpConnectTimeoutException"); + assertThat(thrown.getMessage()).contains("io.netty.channel.ConnectTimeoutException"); } catch (Exception e) { throw new RuntimeException(e); @@ -605,7 +607,7 @@ public void writeTimeoutExceededTest() { public void writeTimeoutOKTest() { mockServer.enqueue(createResponse(200)); - Duration testDuration = Duration.ofMillis(2000); + Duration testDuration = Duration.ofMillis(2700); ClientConfig config = new ClientConfig.Builder() .host(baseURL) diff --git a/src/test/java/com/influxdb/v3/client/TestUtils.java b/src/test/java/com/influxdb/v3/client/TestUtils.java index d7f2d1c9..6ac58f6c 100644 --- a/src/test/java/com/influxdb/v3/client/TestUtils.java +++ b/src/test/java/com/influxdb/v3/client/TestUtils.java @@ -21,12 +21,36 @@ */ package com.influxdb.v3.client; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.List; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.ProxyDetector; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.JdkSslContext; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import mockwebserver3.Dispatcher; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.RecordedRequest; +import okhttp3.Headers; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; @@ -39,6 +63,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import org.jetbrains.annotations.NotNull; public final class TestUtils { @@ -48,7 +73,7 @@ private TestUtils() { public static FlightServer simpleFlightServer(@Nonnull final URI uri, @Nonnull final BufferAllocator allocator, - @Nonnull final NoOpFlightProducer producer) throws Exception { + @Nonnull final NoOpFlightProducer producer) { Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort()); return FlightServer.builder(allocator, location, producer).build(); } @@ -88,5 +113,101 @@ public static VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, fi return vectorSchemaRoot; } + + // Create SslContext for mTLS only + public static SslContext createNettySslContext(final boolean isServer, final String format, final String password, + final String keyFilePath, + final String trustFilePath, final boolean isDisableKeystore, + final boolean isJdkSslContext) + throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException { + KeyManagerFactory keyManagerFactory = getKeyManagerFactory(format, password, keyFilePath); + + TrustManagerFactory trustManagerFactory = getTrustManagerFactory(format, password, trustFilePath); + + SslContextBuilder sslContextBuilder; + if (isServer) { + sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory).clientAuth(ClientAuth.REQUIRE); + } else { + sslContextBuilder = SslContextBuilder.forClient(); + } + + sslContextBuilder.trustManager(trustManagerFactory); + + if (isJdkSslContext) { + sslContextBuilder.sslProvider(SslProvider.JDK); + } + + if (!isDisableKeystore) { + sslContextBuilder.keyManager(keyManagerFactory); + } + + return sslContextBuilder.build(); + } + + public static ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull final String proxyUrl, + @Nullable final String username, @Nullable final String password) { + URI targetUri = URI.create(targetUrl); + URI proxyUri = URI.create(proxyUrl); + return (targetServerAddress) -> { + InetSocketAddress targetAddress = (InetSocketAddress) targetServerAddress; + if (targetUri.getHost().equals(targetAddress.getHostString()) + && targetUri.getPort() == targetAddress.getPort()) { + return HttpConnectProxiedSocketAddress.newBuilder() + .setProxyAddress(new InetSocketAddress(proxyUri.getHost(), proxyUri.getPort())) + .setTargetAddress(targetAddress) + .setUsername(username) + .setPassword(password) + .build(); + } + return null; + }; + } + + public static MockWebServer customDispatchServer(@NotNull final String host, @NotNull final Integer port, + @Nullable final MockResponse mockResponse, + @Nullable final SslContext sslContext, + final boolean requireClientAuth) throws IOException { + MockWebServer server = new MockWebServer(); + server.setDispatcher(new Dispatcher() { + @NotNull + @Override + public MockResponse dispatch(@NotNull final RecordedRequest recordedRequest) { + return mockResponse != null ? mockResponse : new MockResponse(200, Headers.EMPTY, "Success"); + } + }); + if (sslContext instanceof JdkSslContext) { + server.useHttps(((JdkSslContext) sslContext).context().getSocketFactory()); + if (requireClientAuth) { + server.requireClientAuth(); + } + } + server.start(InetAddress.getByName(host), port); + return server; + } + + @NotNull + private static TrustManagerFactory getTrustManagerFactory(final String format, final String password, + final String trustFilePath) + throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException { + TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyStore trustStore = KeyStore.getInstance(format); + trustStore.load(new FileInputStream(trustFilePath), password.toCharArray()); + trustManagerFactory.init(trustStore); + return trustManagerFactory; + } + + @NotNull + private static KeyManagerFactory getKeyManagerFactory(final String format, final String password, + final String keyFilePath) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException { + KeyStore keyStore = KeyStore.getInstance(format); + keyStore.load(new FileInputStream(keyFilePath), password.toCharArray()); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, password.toCharArray()); + return keyManagerFactory; + } } diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index f5ce2e8f..fc0916a4 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -45,7 +45,6 @@ class ClientConfigTest { .writeTimeout(Duration.ofSeconds(35)) .queryTimeout(Duration.ofSeconds(120)) .allowHttpRedirects(true) - .disableServerCertificateValidation(true) .headers(Map.of("X-device", "ab-01")) .disableGRPCCompression(true); diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 63c95afa..c8cab968 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -23,6 +23,8 @@ import java.math.BigInteger; import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.time.Instant; @@ -31,11 +33,15 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Supplier; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; +import javax.net.ssl.SSLException; +import io.grpc.ProxyDetector; +import io.netty.handler.proxy.HttpProxyHandler; import org.apache.arrow.flight.FlightRuntimeException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -44,7 +50,9 @@ import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; +import com.influxdb.v3.client.TestUtils; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.config.NettyHttpClientConfig; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -59,8 +67,11 @@ public class E2ETest { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void testQueryWithProxy() { + void testQueryWithProxy() throws URISyntaxException, SSLException { String proxyUrl = "http://localhost:10000"; + String targetUrl = "http://localhost:8086"; + String username = "username"; + String password = "password"; try { // Continue to run this test only if Envoy proxy is running on this address http://localhost:10000 @@ -75,11 +86,22 @@ void testQueryWithProxy() { } } + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + + // Set proxy for write api + Supplier writeApiProxy = () -> + new HttpProxyHandler(new InetSocketAddress("localhost", 10000), username, password); + nettyHttpClientConfig.configureChannelProxy(writeApiProxy); + + // Set proxy for query api + ProxyDetector proxyDetector = TestUtils.createProxyDetector(targetUrl, proxyUrl, username, password); + nettyHttpClientConfig.configureManagedChannelProxy(proxyDetector); + ClientConfig clientConfig = new ClientConfig.Builder() .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) .database(System.getenv("TESTING_INFLUXDB_DATABASE")) - .proxyUrl(proxyUrl) + .nettyHttpClientConfig(nettyHttpClientConfig) .build(); InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); @@ -96,44 +118,6 @@ void testQueryWithProxy() { } } - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") - @Test - void correctSslCertificates() throws Exception { - // This is real certificate downloaded from https://cloud2.influxdata.com - String influxDBcertificateFile = "src/test/java/com/influxdb/v3/client/testdata/influxdb-certificate.pem"; - - ClientConfig clientConfig = new ClientConfig.Builder() - .host(System.getenv("TESTING_INFLUXDB_URL")) - .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) - .database(System.getenv("TESTING_INFLUXDB_DATABASE")) - .sslRootsFilePath(influxDBcertificateFile) - .build(); - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); - } - - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") - @Test - void disableServerCertificateValidation() { - String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem"; - - ClientConfig clientConfig = new ClientConfig.Builder() - .host(System.getenv("TESTING_INFLUXDB_URL")) - .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) - .database(System.getenv("TESTING_INFLUXDB_DATABASE")) - .disableServerCertificateValidation(true) - .sslRootsFilePath(wrongCertificateFile) - .build(); - - // Test succeeded with wrong certificate file because disableServerCertificateValidation is true - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); - } - @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index a6873c56..d9b3a6f9 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -21,7 +21,9 @@ */ package com.influxdb.v3.client.internal; +import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -50,6 +52,7 @@ import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.TestUtils; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.config.NettyHttpClientConfig; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.query.QueryType; @@ -57,7 +60,7 @@ public class FlightSqlClientTest { private static final String LOCALHOST = "localhost"; private static final FlightServerMiddleware.Key HEADER_CAPTURE_KEY = - FlightServerMiddleware.Key.of("header-capture"); + FlightServerMiddleware.Key.of("header-capture"); private final Location grpcLocation = Location.forGrpcInsecure(LOCALHOST, 0); private final HeaderCaptureMiddlewareFactory headerFactory = new HeaderCaptureMiddlewareFactory(); private final int rowCount = 10; @@ -70,38 +73,38 @@ public class FlightSqlClientTest { @Test public void invalidHost() { ClientConfig clientConfig = new ClientConfig.Builder() - .host("xyz://a bc") - .token("my-token".toCharArray()) - .build(); + .host("xyz://a bc") + .token("my-token".toCharArray()) + .build(); Assertions.assertThatThrownBy(() -> { - try (FlightSqlClient ignored = new FlightSqlClient(clientConfig)) { - Assertions.fail("Should not be here"); - } - }) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(URISyntaxException.class) - .hasMessageContaining("xyz://a bc"); + try (FlightSqlClient ignored = new FlightSqlClient(clientConfig)) { + Assertions.fail("Should not be here"); + } + }) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(URISyntaxException.class) + .hasMessageContaining("xyz://a bc"); } @Test public void callHeaders() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).contains( - "authorization", - "user-agent", - GrpcUtil.MESSAGE_ACCEPT_ENCODING + "authorization", + "user-agent", + GrpcUtil.MESSAGE_ACCEPT_ENCODING ); Assertions.assertThat(receivedHeaders.get("authorization")).isEqualTo("Bearer my-token"); Assertions.assertThat(receivedHeaders.get("user-agent")).startsWith(Identity.getUserAgent()); @@ -111,20 +114,20 @@ public void callHeaders() throws Exception { @Test public void callHeadersWithoutToken() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .build(); + .host(server.getLocation().getUri().toString()) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).containsOnly( - "user-agent", - GrpcUtil.MESSAGE_ACCEPT_ENCODING, - "content-type" + "user-agent", + GrpcUtil.MESSAGE_ACCEPT_ENCODING, + "content-type" ); Assertions.assertThat(receivedHeaders.get("authorization")).isNull(); } @@ -133,20 +136,20 @@ public void callHeadersWithoutToken() throws Exception { @Test public void callHeadersEmptyToken() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("".toCharArray()) - .build(); + .host(server.getLocation().getUri().toString()) + .token("".toCharArray()) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).containsOnly( - "user-agent", - GrpcUtil.MESSAGE_ACCEPT_ENCODING, - "content-type" + "user-agent", + GrpcUtil.MESSAGE_ACCEPT_ENCODING, + "content-type" ); Assertions.assertThat(receivedHeaders.get("authorization")).isNull(); } @@ -155,23 +158,23 @@ public void callHeadersEmptyToken() throws Exception { @Test public void callHeadersCustomHeader() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .headers(Map.of("X-Tracing-Id", "123")) - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .headers(Map.of("X-Tracing-Id", "123")) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).contains( - "authorization", - "user-agent", - "x-tracing-id", - GrpcUtil.MESSAGE_ACCEPT_ENCODING + "authorization", + "user-agent", + "x-tracing-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING ); Assertions.assertThat(receivedHeaders.get("x-tracing-id")).isEqualTo("123"); } @@ -180,24 +183,24 @@ public void callHeadersCustomHeader() throws Exception { @Test public void customHeaderForRequest() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .headers(Map.of("X-Tracing-Id", "123")) - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .headers(Map.of("X-Tracing-Id", "123")) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of("X-Invoice-Id", "456"))) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of("X-Invoice-Id", "456"))) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).contains( - "authorization", - "user-agent", - "x-tracing-id", - "x-invoice-id", - GrpcUtil.MESSAGE_ACCEPT_ENCODING + "authorization", + "user-agent", + "x-tracing-id", + "x-invoice-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING ); Assertions.assertThat(receivedHeaders.get("x-invoice-id")).isEqualTo("456"); } @@ -206,23 +209,23 @@ public void customHeaderForRequest() throws Exception { @Test public void customHeaderForRequestOverrideConfig() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .headers(Map.of("X-Tracing-Id", "123")) - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .headers(Map.of("X-Tracing-Id", "123")) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of("X-Tracing-Id", "456"))) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of("X-Tracing-Id", "456"))) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).contains( - "authorization", - "user-agent", - "x-tracing-id", - GrpcUtil.MESSAGE_ACCEPT_ENCODING + "authorization", + "user-agent", + "x-tracing-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING ); Assertions.assertThat(receivedHeaders.get("x-tracing-id")).isEqualTo("456"); } @@ -231,26 +234,26 @@ public void customHeaderForRequestOverrideConfig() throws Exception { @Test public void useParamsFromQueryConfig() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .database("mydb") - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .database("mydb") + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - InfluxDBClient influxDBClient = new InfluxDBClientImpl(clientConfig, null, flightSqlClient); - Stream data = influxDBClient.query( - "select * from cpu", - new QueryOptions(Map.of("X-Tracing-Id", "987")))) { + InfluxDBClient influxDBClient = new InfluxDBClientImpl(clientConfig, null, flightSqlClient); + Stream data = influxDBClient.query( + "select * from cpu", + new QueryOptions(Map.of("X-Tracing-Id", "987")))) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).contains( - "authorization", - "x-tracing-id", - "user-agent", - GrpcUtil.MESSAGE_ACCEPT_ENCODING + "authorization", + "x-tracing-id", + "user-agent", + GrpcUtil.MESSAGE_ACCEPT_ENCODING ); Assertions.assertThat(receivedHeaders.get("x-tracing-id")).isEqualTo("987"); } @@ -259,23 +262,23 @@ public void useParamsFromQueryConfig() throws Exception { @Test public void disableGRPCCompression() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .disableGRPCCompression(true) - .build(); + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .disableGRPCCompression(true) + .build(); var qopts = new GrpcCallOptions.Builder().withCompressorName("identity").build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); - var data = executeQuery(flightSqlClient, Map.of(), Map.of(), qopts.getCallOptions())) { + var data = executeQuery(flightSqlClient, Map.of(), Map.of(), qopts.getCallOptions())) { Assertions.assertThat(data.count()).isEqualTo(rowCount); final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); Assertions.assertThat(receivedHeaders.keySet()).containsOnly( - "authorization", - "user-agent", - "content-type" + "authorization", + "user-agent", + "content-type" ); Assertions.assertThat(receivedHeaders.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING)).isNull(); } @@ -286,27 +289,27 @@ private Stream> executeQuery(final FlightSqlClient flightSql final Map headers, final CallOption... callOptions) { return flightSqlClient.execute( - "select * from cpu", - "mydb", - QueryType.SQL, - queryParameters, - headers, - callOptions) - .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> - VectorSchemaRootConverter.INSTANCE - .getMapFromVectorSchemaRoot( - vector, - rowNumber - ))); + "select * from cpu", + "mydb", + QueryType.SQL, + queryParameters, + headers, + callOptions) + .flatMap(vector -> IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> + VectorSchemaRootConverter.INSTANCE + .getMapFromVectorSchemaRoot( + vector, + rowNumber + ))); } @BeforeEach void setUp() throws Exception { allocator = new RootAllocator(Long.MAX_VALUE); server = FlightServer.builder(allocator, grpcLocation, TestUtils.simpleProducer(vectorSchemaRoot)).middleware( - HEADER_CAPTURE_KEY, - headerFactory).build().start(); + HEADER_CAPTURE_KEY, + headerFactory).build().start(); } @AfterEach @@ -324,9 +327,9 @@ void tearDown() throws Exception { void flightSqlClient() throws Exception { String correctHost = "grpc+unix://tmp/dummy.sock"; ClientConfig clientConfig = new ClientConfig.Builder() - .host(correctHost) - .token("Token".toCharArray()) - .build(); + .host(correctHost) + .token("Token".toCharArray()) + .build(); try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { Assertions.assertThat(flightSqlClient).isNotNull(); } @@ -339,35 +342,45 @@ void flightSqlClient() throws Exception { var inCorrectHost = "grpc+unix://///tmp/dummy.sock"; ClientConfig clientConfig1 = new ClientConfig.Builder() - .host(inCorrectHost) - .token("Token".toCharArray()) - .build(); + .host(inCorrectHost) + .token("Token".toCharArray()) + .build(); Assertions.assertThatThrownBy(() -> new FlightSqlClient(clientConfig1)); } @Test - void createProxyDetector() { - String targetUrl = "https://localhost:80"; + void createProxyDetector() throws IOException, URISyntaxException { + String target = "https://localhost:80"; + String proxy = "http://localhost:10000"; + + var targetUrl = new URI(target); + var proxyUrl = new URI(proxy); + String username = "username"; + String password = "password"; + + ProxyDetector proxyDetector = TestUtils.createProxyDetector(target, proxy, username, password); + + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + nettyHttpClientConfig.configureManagedChannelProxy(proxyDetector); + ClientConfig clientConfig = new ClientConfig.Builder() - .host(targetUrl) - .build(); - try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { - String proxyUrl = "http://localhost:10000"; - ProxyDetector proxyDetector = flightSqlClient.createProxyDetector(targetUrl, proxyUrl); - Assertions.assertThat(proxyDetector.proxyFor( - new InetSocketAddress("localhost", 80) - )).isEqualTo(HttpConnectProxiedSocketAddress.newBuilder() - .setProxyAddress(new InetSocketAddress("localhost", 10000)) - .setTargetAddress(new InetSocketAddress("localhost", 80)) + .host(target) + .nettyHttpClientConfig(nettyHttpClientConfig) + .build(); + + Assertions.assertThat(clientConfig.getNettyHttpClientConfig().getProxyDetector().proxyFor( + new InetSocketAddress(targetUrl.getHost(), targetUrl.getPort()) + )).isEqualTo(HttpConnectProxiedSocketAddress.newBuilder() + .setTargetAddress(new InetSocketAddress(targetUrl.getHost(), targetUrl.getPort())) + .setProxyAddress(new InetSocketAddress(proxyUrl.getHost(), proxyUrl.getPort())) + .setUsername(username) + .setPassword(password) .build()); - // Return null case - Assertions.assertThat(proxyDetector.proxyFor( + // Return null case + Assertions.assertThat(proxyDetector.proxyFor( new InetSocketAddress("123.2.3.1", 80) - )).isNull(); - } catch (Exception e) { - throw new RuntimeException(e); - } + )).isNull(); } static class HeaderCaptureMiddleware implements FlightServerMiddleware { diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 10cbd110..fb528ac2 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -21,22 +21,29 @@ */ package com.influxdb.v3.client.internal; -import java.net.Authenticator; -import java.net.InetSocketAddress; -import java.net.PasswordAuthentication; -import java.net.ProxySelector; -import java.net.http.HttpClient; -import java.net.http.HttpHeaders; +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.Base64; -import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import javax.net.ssl.SSLException; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.proxy.HttpProxyHandler; +import io.netty.handler.ssl.JdkSslContext; +import io.netty.handler.ssl.SslContext; import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; import mockwebserver3.RecordedRequest; import okhttp3.Headers; import org.assertj.core.api.Assertions; @@ -45,9 +52,11 @@ import com.influxdb.v3.client.AbstractMockServerTest; import com.influxdb.v3.client.InfluxDBApiException; -import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBApiNettyException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.TestUtils; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.config.NettyHttpClientConfig; import com.influxdb.v3.client.write.WriteOptions; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; @@ -57,14 +66,14 @@ public class RestClientTest extends AbstractMockServerTest { private RestClient restClient; @AfterEach - void tearDown() { + void tearDown() throws InterruptedException { if (restClient != null) { restClient.close(); } } @Test - public void baseUrl() { + public void baseUrl() throws URISyntaxException, SSLException { restClient = new RestClient(new ClientConfig.Builder().host("http://localhost:8086").build()); Assertions .assertThat(restClient.baseUrl) @@ -72,7 +81,7 @@ public void baseUrl() { } @Test - public void baseUrlSlashEnd() { + public void baseUrlSlashEnd() throws URISyntaxException, SSLException { restClient = new RestClient(new ClientConfig.Builder().host("http://localhost:8086/").build()); Assertions .assertThat(restClient.baseUrl) @@ -80,30 +89,32 @@ public void baseUrlSlashEnd() { } @Test - public void responseTimeout() { + public void responseTimeout() throws URISyntaxException, SSLException { restClient = new RestClient(new ClientConfig.Builder() .host("http://localhost:8086") .timeout(Duration.ofSeconds(13)) .build()); - Optional connectTimeout = restClient.client.connectTimeout(); + Optional connectTimeout = Optional.of(restClient.timeout); Assertions.assertThat(connectTimeout).isPresent(); Assertions.assertThat(connectTimeout.get()).isEqualTo(Duration.ofSeconds(13)); } - @Test - public void allowHttpRedirectsDefaults() { - restClient = new RestClient(new ClientConfig.Builder() - .host("http://localhost:8086") - .build()); - - HttpClient.Redirect redirect = restClient.client.followRedirects(); - Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NEVER); - } + //fixme how to handle redirect +// @Test +// public void allowHttpRedirectsDefaults() throws URISyntaxException, SSLException { +// restClient = new RestClient(new ClientConfig.Builder() +// .host("http://localhost:8086") +// .build()); +// +// HttpClient.Redirect redirect = restClient.client.followRedirects(); +// Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NEVER); +// } @Test - public void authenticationHeader() throws InterruptedException { + public void authenticationHeader() + throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() @@ -111,7 +122,7 @@ public void authenticationHeader() throws InterruptedException { .token("my-token".toCharArray()) .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -120,7 +131,8 @@ public void authenticationHeader() throws InterruptedException { } @Test - public void authenticationHeaderCustomAuthScheme() throws InterruptedException { + public void authenticationHeaderCustomAuthScheme() + throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() @@ -129,7 +141,7 @@ public void authenticationHeaderCustomAuthScheme() throws InterruptedException { .authScheme("my-auth-scheme") .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -138,14 +150,15 @@ public void authenticationHeaderCustomAuthScheme() throws InterruptedException { } @Test - public void authenticationHeaderNotDefined() throws InterruptedException { + public void authenticationHeaderNotDefined() + throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -154,14 +167,14 @@ public void authenticationHeaderNotDefined() throws InterruptedException { } @Test - public void userAgent() throws InterruptedException { + public void userAgent() throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -170,7 +183,7 @@ public void userAgent() throws InterruptedException { } @Test - public void customHeader() throws InterruptedException { + public void customHeader() throws InterruptedException, ExecutionException, URISyntaxException, SSLException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() @@ -179,7 +192,7 @@ public void customHeader() throws InterruptedException { .headers(Map.of("X-device", "ab-01")) .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -188,7 +201,8 @@ public void customHeader() throws InterruptedException { } @Test - public void customHeaderRequest() throws InterruptedException { + public void customHeaderRequest() + throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() @@ -197,7 +211,7 @@ public void customHeaderRequest() throws InterruptedException { .headers(Map.of("X-device", "ab-01")) .build()); - restClient.request("ping", HttpMethod.GET, null, null, Map.of("X-Request-Trace-Id", "123")); + restClient.request(HttpMethod.GET, "ping", Map.of("X-Request-Trace-Id", "123")); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -208,7 +222,8 @@ public void customHeaderRequest() throws InterruptedException { } @Test - public void useCustomHeaderFromRequest() throws InterruptedException { + public void useCustomHeaderFromRequest() + throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() @@ -217,7 +232,7 @@ public void useCustomHeaderFromRequest() throws InterruptedException { .headers(Map.of("X-device", "ab-01")) .build()); - restClient.request("ping", HttpMethod.GET, null, null, Map.of("X-device", "ab-02")); + restClient.request(HttpMethod.GET, "ping", Map.of("X-device", "ab-02")); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -249,14 +264,14 @@ public void useParamsFromWriteConfig() throws Exception { } @Test - public void uri() throws InterruptedException { + public void uri() throws InterruptedException, URISyntaxException, SSLException, ExecutionException { mockServer.enqueue(createResponse(200)); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "ping"); RecordedRequest recordedRequest = mockServer.takeRequest(); @@ -264,98 +279,47 @@ public void uri() throws InterruptedException { Assertions.assertThat(recordedRequest.getUrl().toString()).isEqualTo(baseURL + "ping"); } - @Test - public void allowHttpRedirects() { - restClient = new RestClient(new ClientConfig.Builder() - .host("http://localhost:8086") - .allowHttpRedirects(true) - .build()); - - HttpClient.Redirect redirect = restClient.client.followRedirects(); - Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NORMAL); - } + //fixme how to handle redirect??? +// @Test +// public void allowHttpRedirects() throws URISyntaxException, SSLException { +// restClient = new RestClient(new ClientConfig.Builder() +// .host("http://localhost:8086") +// .allowHttpRedirects(true) +// .build()); +// +// HttpClient.Redirect redirect = restClient.client.followRedirects(); +// Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NORMAL); +// } @Test - public void proxy() throws InterruptedException { - mockServer.enqueue(createResponse(200)); + public void proxyUrl() throws InterruptedException, URISyntaxException, IOException, ExecutionException { - restClient = new RestClient(new ClientConfig.Builder() - .host("http://foo.com:8086") - .proxy(ProxySelector.of((InetSocketAddress) mockServer.getProxyAddress().address())) - .build()); - restClient.request("ping", HttpMethod.GET, null, null, null); + try (MockWebServer proxyServer = TestUtils.customDispatchServer("localhost", 10000, + new MockResponse(200, Headers.EMPTY, ""), null, false);) { - RecordedRequest recordedRequest = mockServer.takeRequest(); - - Assertions.assertThat(recordedRequest.getUrl()).isNotNull(); - // with mockwebserver3 getUrl() returns target URL not proxy URL - // successful return implies proxy was used correctly. - Assertions.assertThat(recordedRequest.getUrl().toString()) - .isEqualTo("http://foo.com:8086/ping"); // server is used as proxy - Assertions.assertThat(recordedRequest.getRequestLine()) - .isEqualTo("GET http://foo.com:8086/ping HTTP/1.1"); - } + Supplier proxy = () -> new HttpProxyHandler(proxyServer.getSocketAddress()); + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + nettyHttpClientConfig.configureChannelProxy(proxy); + restClient = new RestClient(new ClientConfig.Builder() + .host(String.format("http://%s:%d", mockServer.getHostName(), mockServer.getPort())) + .nettyHttpClientConfig(nettyHttpClientConfig) + .build()); - @Test - public void proxyUrl() throws InterruptedException { - mockServer.enqueue(createResponse(200)); - - restClient = new RestClient(new ClientConfig.Builder() - .host("http://foo.com:8086") - .proxyUrl(String.format("http://%s:%d", mockServer.getHostName(), mockServer.getPort())) - .build()); - - restClient.request("ping", HttpMethod.GET, null, null, null); - - RecordedRequest recordedRequest = mockServer.takeRequest(); - - Assertions.assertThat(recordedRequest.getUrl()).isNotNull(); - // with mockwebserver3 getUrl() returns target URL not proxy URL - // successful return implies proxy was used correctly. - Assertions.assertThat(recordedRequest.getUrl().toString()) - .isEqualTo("http://foo.com:8086/ping"); // server is used as proxy - Assertions.assertThat(recordedRequest.getRequestLine()) - .isEqualTo("GET http://foo.com:8086/ping HTTP/1.1"); - } - - - @Test - public void proxyWithAuthentication() throws InterruptedException { - mockServer.enqueue(createResponse(407, Map.of("Proxy-Authenticate", "Basic"), null)); - mockServer.enqueue(createResponse(200)); - - restClient = new RestClient(new ClientConfig.Builder() - .host("http://foo.com:8086") - .proxyUrl(String.format("http://%s:%d", mockServer.getHostName(), mockServer.getPort())) - .authenticator(new Authenticator() { - @Override - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication("john", "secret".toCharArray()); - } - }) - .build()); - - restClient.request("ping", HttpMethod.GET, null, null, null); + restClient.request(HttpMethod.GET, "/"); - RecordedRequest recordedRequest = mockServer.takeRequest(); - RecordedRequest proxyAuthRequest = mockServer.takeRequest(); + RecordedRequest recordedRequest = proxyServer.takeRequest(); - Assertions.assertThat(recordedRequest.getUrl()).isNotNull(); - // with mockwebserver3 getUrl() returns target URL not proxy URL - // successful return implies proxy was used correctly. - Assertions.assertThat(recordedRequest.getUrl().toString()).isEqualTo("http://foo.com:8086/ping"); - Assertions.assertThat(recordedRequest.getRequestLine()).isEqualTo("GET http://foo.com:8086/ping HTTP/1.1"); - - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(2); - String proxyAuthorization = proxyAuthRequest.getHeaders().get("Proxy-Authorization"); - Assertions.assertThat(proxyAuthorization) - .isEqualTo("Basic " + Base64.getEncoder().encodeToString("john:secret".getBytes())); + Assertions.assertThat(recordedRequest.getUrl()).isNotNull(); + Assertions.assertThat(recordedRequest.getUrl().host()).isEqualTo(mockServer.getHostName()); + Assertions.assertThat(recordedRequest.getUrl().port()).isEqualTo(mockServer.getPort()); + Assertions.assertThat(recordedRequest.getMethod()).isEqualTo("CONNECT"); + } } @Test - public void error() { + public void error() throws URISyntaxException, SSLException { mockServer.enqueue(createResponse(404)); restClient = new RestClient(new ClientConfig.Builder() @@ -363,13 +327,13 @@ public void error() { .build()); Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + () -> restClient.request(HttpMethod.GET, "ping")) .isInstanceOf(InfluxDBApiException.class) .hasMessage("HTTP status code: 404; Message: Not Found"); } @Test - public void errorFromHeader() { + public void errorFromHeader() throws URISyntaxException, SSLException { mockServer.enqueue(createResponse(500, Map.of("X-Influx-Error", "not used"), null)); @@ -378,90 +342,89 @@ public void errorFromHeader() { .build()); Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + () -> restClient.request(HttpMethod.GET, "ping")) .isInstanceOf(InfluxDBApiException.class) .hasMessage("HTTP status code: 500; Message: not used"); } @Test - public void errorFromBody() { + public void errorFromBody() throws URISyntaxException, SSLException { - mockServer.enqueue(createResponse(401, - Map.of("X-Influx-Errpr", "not used"), - "{\"message\":\"token does not have sufficient permissions\"}")); + mockServer.enqueue(createResponse(401, + Map.of("X-Influx-Errpr", "not used"), + "{\"message\":\"token does not have sufficient permissions\"}")); - restClient = new RestClient(new ClientConfig.Builder() - .host(baseURL) - .build()); + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage("HTTP status code: 401; Message: token does not have sufficient permissions"); + Assertions.assertThatThrownBy( + () -> restClient.request(HttpMethod.GET, "ping") + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 401; Message: token does not have sufficient permissions"); } @Test - public void errorFromBodyEdgeWithoutMessage() { // OSS/Edge error message + public void errorFromBodyEdgeWithoutMessage() throws URISyntaxException, SSLException { // OSS/Edge error message - mockServer.enqueue(createResponse(400, - null, - "{\"error\":\"parsing failed\"}")); + mockServer.enqueue(createResponse(400, + null, + "{\"error\":\"parsing failed\"}")); - restClient = new RestClient(new ClientConfig.Builder() + restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage("HTTP status code: 400; Message: parsing failed"); + Assertions.assertThatThrownBy( + () -> restClient.request(HttpMethod.GET, "ping") + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: parsing failed"); } @Test - public void errorFromBodyEdgeWithMessage() { // OSS/Edge specific error message + public void errorFromBodyEdgeWithMessage() + throws URISyntaxException, SSLException { // OSS/Edge specific error message - mockServer.enqueue(createResponse(400, - null, - "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}")); + mockServer.enqueue(createResponse(400, + null, + "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}")); - restClient = new RestClient(new ClientConfig.Builder() - .host(baseURL) - .build()); + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage("HTTP status code: 400; Message: invalid field value"); + Assertions.assertThatThrownBy( + () -> restClient.request(HttpMethod.GET, "ping") + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: invalid field value"); } @Test - public void errorFromBodyText() { + public void errorFromBodyText() throws URISyntaxException, IOException { + mockServer.enqueue(createResponse(402, null, "token is over the limit")); - mockServer.enqueue(createResponse(402, null, "token is over the limit")); - - restClient = new RestClient(new ClientConfig.Builder() - .host(baseURL) - .build()); + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) + Assertions.assertThatThrownBy( + () -> restClient.request(HttpMethod.GET, "ping") + ) .isInstanceOf(InfluxDBApiException.class) .hasMessage("HTTP status code: 402; Message: token is over the limit"); } @Test public void generateHttpException() { - HttpHeaders headers = HttpHeaders.of(Map.of( - "content-type", List.of("application/json"), - "retry-after", List.of("300")), - (key, value) -> true); + HttpHeaders headers = new DefaultHttpHeaders(); + headers.add("content-type", "application/json"); + headers.add("retry-after", "300"); - InfluxDBApiHttpException exception = new InfluxDBApiHttpException( - new InfluxDBApiException("test exception"), headers, 418); + InfluxDBApiNettyException exception = new InfluxDBApiNettyException( + new InfluxDBApiException("test exception"), headers, 418); Assertions.assertThat(exception.headers()).isEqualTo(headers); Assertions.assertThat(exception.statusCode()).isEqualTo(418); @@ -470,41 +433,40 @@ public void generateHttpException() { } @Test - public void errorHttpExceptionThrown() { + public void errorHttpExceptionThrown() throws URISyntaxException, SSLException { String retryDate = Instant.now().plus(300, ChronoUnit.SECONDS).toString(); - mockServer.enqueue(createResponse(503, - Map.of("retry-after", retryDate, "content-type", "application/json"), - "{\"message\":\"temporarily offline\"}")); + mockServer.enqueue(createResponse(503, + Map.of("retry-after", retryDate, "content-type", "application/json"), + "{\"message\":\"temporarily offline\"}")); - restClient = new RestClient(new ClientConfig.Builder() - .host(baseURL) - .build()); + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); - Throwable thrown = catchThrowable(() -> restClient.request( - "/api/v2/write", HttpMethod.POST, null, null, null) + Throwable thrown = catchThrowable(() -> restClient.request(HttpMethod.POST, "/api/v2/write") ); Assertions.assertThat(thrown).isNotNull(); - Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); - InfluxDBApiHttpException he = (InfluxDBApiHttpException) thrown; + Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiNettyException.class); + InfluxDBApiNettyException he = (InfluxDBApiNettyException) thrown; Assertions.assertThat(he.headers()).isNotNull(); Assertions.assertThat(he.getHeader("retry-after").get(0)) - .isNotNull().isEqualTo(retryDate); + .isNotNull().isEqualTo(retryDate); Assertions.assertThat(he.getHeader("content-type").get(0)) - .isNotNull().isEqualTo("application/json"); - Assertions.assertThat(he.getHeader("wumpus")).isNull(); + .isNotNull().isEqualTo("application/json"); + Assertions.assertThat(he.getHeader("wumpus").size()).isEqualTo(0); Assertions.assertThat(he.statusCode()).isEqualTo(503); Assertions.assertThat(he.getMessage()) - .isEqualTo("HTTP status code: 503; Message: temporarily offline"); + .isEqualTo("HTTP status code: 503; Message: temporarily offline"); } @Test public void getServerVersionV2Successful() throws Exception { String influxDBVersion = "v2.1.0"; mockServer.enqueue(createResponse(200, - Map.of("x-influxdb-version", influxDBVersion), - null)); + Map.of("x-influxdb-version", influxDBVersion), + null)); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) @@ -518,8 +480,8 @@ public void getServerVersionV2Successful() throws Exception { public void getServerVersionV3Successful() throws Exception { String influxDBVersion = "3.0.0"; mockServer.enqueue(createResponse(200, - null, - "{\"version\":\"" + influxDBVersion + "\"}")); + null, + "{\"version\":\"" + influxDBVersion + "\"}")); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) @@ -530,10 +492,11 @@ public void getServerVersionV3Successful() throws Exception { } @Test - public void getServerVersionError() { + public void getServerVersionError() + throws URISyntaxException, SSLException, ExecutionException, InterruptedException { MockResponse mockResponse = new MockResponse(200, - Headers.of("something", "something"), - "not json"); + Headers.of("something", "something"), + "not json"); mockServer.enqueue(mockResponse); restClient = new RestClient(new ClientConfig.Builder() @@ -544,7 +507,8 @@ public void getServerVersionError() { } @Test - public void getServerVersionErrorNoBody() { + public void getServerVersionErrorNoBody() + throws ExecutionException, InterruptedException, URISyntaxException, SSLException { mockServer.enqueue(new MockResponse(200, Headers.of(), "Test-Version")); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) @@ -552,4 +516,78 @@ public void getServerVersionErrorNoBody() { String version = restClient.getServerVersion(); Assertions.assertThat(version).isEqualTo(null); } + + @Test + public void nettyRestMutualSslContext() + throws ExecutionException, InterruptedException, URISyntaxException, IOException, UnrecoverableKeyException, + CertificateException, KeyStoreException, NoSuchAlgorithmException { + var password = "123456"; + var format = "PKCS12"; + + var keyFilePath = "src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/keystore.p12"; + var trustFilePath = "src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/truststore.p12"; + JdkSslContext serverSslContext = + (JdkSslContext) TestUtils.createNettySslContext(true, format, password, keyFilePath, trustFilePath, + false, true); + + keyFilePath = "src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/keystore.p12"; + trustFilePath = "src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/truststore.p12"; + SslContext clientSslContext = + TestUtils.createNettySslContext(false, format, password, keyFilePath, trustFilePath, false, false); + + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + nettyHttpClientConfig.configureSsl(() -> clientSslContext); + ClientConfig config = new ClientConfig.Builder().host("https://localhost:8080") + .nettyHttpClientConfig(nettyHttpClientConfig) + .build(); + + var influxDBVersion = "4.0.0"; + try ( + MockWebServer ignored = TestUtils.customDispatchServer("localhost", 8080, + new MockResponse(200, Headers.of(), "{\"version\":\"" + influxDBVersion + "\"}"), + serverSslContext, true); + RestClient restClient = new RestClient(config); + ) { + Assertions.assertThat(restClient.getServerVersion()).isEqualTo(influxDBVersion); + } + } + + // Make the call fails because this is mTLS but the client does not send its key store, + // note that: isDisableKeyStore = true + @Test + public void nettyRestMutualSslContextFail() + throws IOException, UnrecoverableKeyException, CertificateException, KeyStoreException, + NoSuchAlgorithmException { + var password = "123456"; + var format = "PKCS12"; + + var keyFilePath = "src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/keystore.p12"; + var trustFilePath = "src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/truststore.p12"; + JdkSslContext serverSslContext = + (JdkSslContext) TestUtils.createNettySslContext(true, format, password, keyFilePath, trustFilePath, + false, true); + + keyFilePath = "src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/keystore.p12"; + trustFilePath = "src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/truststore.p12"; + // The call failed because isDisableKeyStore = true + SslContext clientSslContext = + TestUtils.createNettySslContext(false, format, password, keyFilePath, trustFilePath, true, false); + + NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig(); + nettyHttpClientConfig.configureSsl(() -> clientSslContext); + ClientConfig config = new ClientConfig.Builder().host("https://localhost:8080") + .nettyHttpClientConfig(nettyHttpClientConfig) + .build(); + + try ( + MockWebServer ignored = TestUtils.customDispatchServer("localhost", 8080, + new MockResponse(400, Headers.of(), ""), + serverSslContext, true); + RestClient restClient = new RestClient(config); + ) { + restClient.getServerVersion(); + } catch (Exception e) { + Assertions.assertThat(e.getMessage()).contains("SSL"); + } + } } diff --git a/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/keystore.p12 b/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/keystore.p12 new file mode 100644 index 00000000..2d9140b2 Binary files /dev/null and b/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/keystore.p12 differ diff --git a/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/truststore.p12 b/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/truststore.p12 new file mode 100644 index 00000000..2d9847df Binary files /dev/null and b/src/test/java/com/influxdb/v3/client/testdata/client/pkcs12/truststore.p12 differ diff --git a/src/test/java/com/influxdb/v3/client/testdata/scripts/generate-certfile.txt b/src/test/java/com/influxdb/v3/client/testdata/scripts/generate-certfile.txt new file mode 100644 index 00000000..11b29a5d --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/testdata/scripts/generate-certfile.txt @@ -0,0 +1,30 @@ +==== + The MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +==== + +Generated by +keytool -genkeypair -alias server -keyalg RSA -storetype PKCS12 -keysize 2048 -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/server/pkcs12/keystore.p12 -dname "CN=localhost, OU=Dev, O=Company, L=City, ST=State, C=US" -storepass "123456" && keytool -genkeypair -alias client -keyalg RSA -storetype PKCS12 -keysize 2048 -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/client/pkcs12/keystore.p12 -dname "CN=localhost, OU=Dev, O=Company, L=City, ST=State, C=US" -storepass "123456" + +Export +keytool -exportcert -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/server/pkcs12/keystore.p12 -storetype PKCS12 -alias server -file /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/server/pkcs12/exported_cert.cer -storepass 123456 && keytool -exportcert -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/client/pkcs12/keystore.p12 -storetype PKCS12 -alias client -file /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/client/pkcs12/exported_cert.cer -storepass 123456 + +Import +keytool -importcert -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/client/pkcs12/truststore.p12 -storetype PKCS12 -alias server -file /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/server/pkcs12/exported_cert.cer -storepass 123456 && keytool -importcert -keystore /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/server/pkcs12/truststore.p12 -storetype PKCS12 -alias client -file /Users/home/Documents/sources/influxdb3-java/examples/src/main/java/com/influxdb/v3/netty/client/pkcs12/exported_cert.cer -storepass 123456 \ No newline at end of file diff --git a/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/keystore.p12 b/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/keystore.p12 new file mode 100644 index 00000000..9f7f51b3 Binary files /dev/null and b/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/keystore.p12 differ diff --git a/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/truststore.p12 b/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/truststore.p12 new file mode 100644 index 00000000..434d7404 Binary files /dev/null and b/src/test/java/com/influxdb/v3/client/testdata/server/pkcs12/truststore.p12 differ