diff --git a/CHANGELOG.md b/CHANGELOG.md index e4611dca..74ffc98d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ 1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`. +### Bug Fixes + +1. [#310](https://github.com/InfluxCommunity/influxdb3-java/pull/310): Ensure `QueryOptions` objects are left unchanged within the `queryData` implementation. + ## 1.5.0 [2025-10-22] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 365be44e..acc5f344 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -21,6 +21,7 @@ */ package com.influxdb.v3.client.internal; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -226,6 +227,11 @@ public Builder() { /** * Sets the absolute deadline for a rpc call. * + *

Please note the preferred approach is to set a queryTimeout + * Duration value globally in the ClientConfig + * ({@link com.influxdb.v3.client.config.ClientConfig.Builder#queryTimeout(Duration)}). + * This value will then be used to calculate a new Deadline with each call.

+ * * @param deadline The deadline * @return this */ @@ -234,6 +240,17 @@ public Builder withDeadline(final @Nonnull Deadline deadline) { return this; } + /** + * Unsets absolute deadline. Note deadline may have been set + * via {@link #fromGrpcCallOptions(GrpcCallOptions)} method. + * + * @return this + */ + public Builder withoutDeadline() { + this.deadline = null; + return this; + } + /** * Sets an {@code executor} to be used instead of the default * executor specified with {@link ManagedChannelBuilder#executor}. diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 5eb05f23..7b7034c0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -398,13 +398,6 @@ private Stream queryData(@Nonnull final String query, Arguments.checkNotNull(parameters, "parameters"); Arguments.checkNotNull(options, "options"); - if (options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) { - options.setGrpcCallOptions(new GrpcCallOptions.Builder() - .fromGrpcCallOptions(options.grpcCallOptions()) - .withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)) - .build()); - } - if (closed) { throw new IllegalStateException("InfluxDBClient has been closed."); } @@ -421,7 +414,31 @@ private Stream queryData(@Nonnull final String query, } }); - CallOption[] callOptions = options.grpcCallOptions().getCallOptions(); + GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder() + .fromGrpcCallOptions(options.grpcCallOptions()); + + if (config.getQueryTimeout() == null) { + if (options.grpcCallOptions().getDeadline() != null + && options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { + LOG.warning("Query timeout " + + options.grpcCallOptions().getDeadline() + + " is 0 or negative and will be ignored."); + builder.withoutDeadline(); + } + } else { + if (options.grpcCallOptions().getDeadline() == null) { + builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + } else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { + LOG.warning("Query timeout " + + options.grpcCallOptions().getDeadline() + + " is 0 or negative. Using config.queryTimeout " + + config.getQueryTimeout() + + " instead."); + builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + } + } + + CallOption[] callOptions = builder.build().getCallOptions(); return flightSqlClient.execute( query, diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 02f2a200..a2611c0b 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -22,6 +22,7 @@ package com.influxdb.v3.client.query; import java.util.Map; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -190,4 +191,24 @@ public GrpcCallOptions grpcCallOptions() { private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + QueryOptions that = (QueryOptions) o; + return Objects.equals(this.database, that.database) + && Objects.equals(this.queryType, that.queryType) + && Objects.equals(this.headers, that.headers) + && Objects.equals(this.grpcCallOptions, that.grpcCallOptions); + } + + @Override + public int hashCode() { + return Objects.hash(database, queryType, headers, grpcCallOptions); + } } diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index 49e9110c..d9bf4dd2 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -412,7 +412,7 @@ public void queryTimeoutSuperceededByGrpcOptTest() { .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) .database(System.getenv("TESTING_INFLUXDB_DATABASE")) - .queryTimeout(Duration.ofSeconds(3)) + .queryTimeout(Duration.ofNanos(3)) .build()); String measurement = "timeout_test_" + Math.round(Math.random() * 100_000); @@ -423,20 +423,18 @@ public void queryTimeoutSuperceededByGrpcOptTest() { QueryOptions queryOptions = QueryOptions.defaultQueryOptions(); queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder() - .withDeadline(Deadline.after(5000, TimeUnit.NANOSECONDS)) + .withDeadline(Deadline.after(500, TimeUnit.MILLISECONDS)) .build() ); - Throwable thrown = catchThrowable(() -> { - Stream stream = client.query(sql, queryOptions); - stream.forEach(row -> { - Assertions.assertThat(row).hasSize(1); - Assertions.assertThat(row[0]).isEqualTo(123.0); - }); + Assertions.assertThatNoException().isThrownBy(() -> { + try (Stream stream = client.query(sql, queryOptions)) { + stream.forEach(row -> { + Assertions.assertThat(row).hasSize(1); + Assertions.assertThat(row[0]).isEqualTo(123.0); + }); + } }); - - Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); - Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*"); } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java index 6bb2cd22..fdb9c5d9 100644 --- a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java @@ -252,4 +252,25 @@ void testToString() { + "maxOutboundMessageSize=5000}"; assertEquals(expected, options.toString()); } + + @Test + void testDeadlineIsNotZeroOrStale() throws InterruptedException { + // Create a deadline 3 seconds from now + Deadline deadline = Deadline.after(3000, TimeUnit.MILLISECONDS); + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withDeadline(deadline) + .build(); + + // Verify the deadline is in the future + assertNotNull(options.getDeadline()); + long timeRemaining = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS); + Assertions.assertTrue(timeRemaining > 0, "Deadline should be in the future"); + Assertions.assertTrue(timeRemaining <= 3000, "Deadline should not exceed 3 seconds"); + + // Wait a bit and verify deadline is still valid but closer + TimeUnit.MILLISECONDS.sleep(100); + long timeRemainingAfter = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS); + Assertions.assertTrue(timeRemainingAfter > 0, "Deadline should still be in the future"); + Assertions.assertTrue(timeRemainingAfter < timeRemaining, "Deadline should be closer to expiration"); + } } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index ecfd988c..c17e118f 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -24,10 +24,14 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import javax.annotation.Nonnull; import io.grpc.Deadline; import io.grpc.ManagedChannel; @@ -65,15 +69,15 @@ private static int findFreePort() throws IOException { @BeforeEach void before() { configBuilder = new ClientConfig.Builder() - .host("http://localhost:8086") - .token("my-token".toCharArray()); + .host("http://localhost:8086") + .token("my-token".toCharArray()); } @Test void optionsOverrideAll() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions("your-database", QueryType.InfluxQL); @@ -84,8 +88,8 @@ void optionsOverrideAll() { @Test void optionsOverrideDatabase() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions("your-database"); @@ -96,8 +100,8 @@ void optionsOverrideDatabase() { @Test void optionsOverrideQueryType() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions(QueryType.InfluxQL); @@ -119,21 +123,21 @@ void setInboundMessageSizeSmall() throws Exception { String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); ClientConfig.Builder builder = new ClientConfig.Builder() - .host(host) - .database("test"); + .host(host) + .database("test"); // Set very small message size for testing GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(200) - .build(); + .withMaxInboundMessageSize(200) + .build(); QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { try (Stream stream = influxDBClient.queryPoints( - "Select * from \"nothing\"", - queryOptions + "Select * from \"nothing\"", + queryOptions )) { try { Assertions.assertThatThrownBy(stream::count); @@ -159,13 +163,13 @@ void setInboundMessageSizeLarge() throws Exception { String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); ClientConfig clientConfig = new ClientConfig.Builder() - .host(host) - .database("test") - .build(); + .host(host) + .database("test") + .build(); GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(1024 * 1024 * 1024) - .build(); + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); @@ -173,8 +177,8 @@ void setInboundMessageSizeLarge() throws Exception { try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { Assertions.assertThatNoException().isThrownBy(() -> { Stream stream = influxDBClient.queryPoints( - "Select * from \"nothing\"", - queryOptions); + "Select * from \"nothing\"", + queryOptions); Assertions.assertThat(stream.count()).isEqualTo(rowCount); stream.close(); }); @@ -197,12 +201,12 @@ void setGrpcCallOptions() { String compressorName = "name"; GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder().withExecutor(executor) - .withMaxInboundMessageSize(1024) - .withMaxOutboundMessageSize(1024) - .withWaitForReady() - .withDeadline(deadline) - .withCompressorName(compressorName) - .build(); + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withWaitForReady() + .withDeadline(deadline) + .withCompressorName(compressorName) + .build(); QueryOptions options = new QueryOptions("test"); options.setGrpcCallOptions(grpcCallOption); @@ -221,16 +225,16 @@ void grpcCallOptions() { Executor executor = Executors.newSingleThreadExecutor(); Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(1024) - .withMaxOutboundMessageSize(1024) - .withCompressorName("my-compressor") - .withWaitForReady() - .withExecutor(executor) - .withDeadline(deadline) - .build(); + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withCompressorName("my-compressor") + .withWaitForReady() + .withExecutor(executor) + .withDeadline(deadline) + .build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) - .usePlaintext() - .build(); + .usePlaintext() + .build(); FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); for (CallOption option : grpcCallOption.getCallOptions()) { stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); @@ -238,13 +242,174 @@ void grpcCallOptions() { io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()) - .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); + .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()) - .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); + .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); Assertions.assertThat(stubCallOptions.getExecutor()).isEqualTo(grpcCallOption.getExecutor()); Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); } + @Test + public void queryOptionsCompareTest() { + Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); + + QueryOptions queryOptions = new QueryOptions("myQueryOptions", QueryType.SQL, headers); + Assertions.assertThat(queryOptions).isEqualTo(queryOptions); + Assertions.assertThat(queryOptions).isNotEqualTo("Some String"); + Assertions.assertThat(queryOptions).isNotEqualTo(null); + } + + @Test + public void queryOptionsUnchangedByCall() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .queryTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void impracticalGRPCDeadlineReplacedByQueryTimeout() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .queryTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .withDeadline(Deadline.after(-2, TimeUnit.MILLISECONDS)) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void impracticalGRPCTimeoutIgnored() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .withDeadline(Deadline.after(-2, TimeUnit.MILLISECONDS)) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static QueryOptions cloneQueryOptions(@Nonnull final QueryOptions orig, + @Nonnull final ClientConfig clientConfig) { + + HashMap cloneHeaders = new HashMap<>(orig.headersSafe()); + for (String key : orig.headersSafe().keySet()) { + cloneHeaders.put(key, orig.headersSafe().get(key)); + } + + QueryOptions clone = new QueryOptions(orig.databaseSafe(clientConfig), + orig.queryTypeSafe(), + cloneHeaders); + + GrpcCallOptions.Builder grpcOptsBuilder = new GrpcCallOptions.Builder() + .fromGrpcCallOptions(orig.grpcCallOptions()); + + clone.setGrpcCallOptions(grpcOptsBuilder.build()); + return clone; + } }