From c517a0b877fcc60996447294388e175b5e420919 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 15:31:50 +0100 Subject: [PATCH 01/11] fix: mutating QueryOptions arg in query method can lead to 0 and stale deadlines. --- .../client/internal/InfluxDBClientImpl.java | 21 +-- .../v3/client/query/QueryOptions.java | 59 +++++++++ .../client/internal/GrpcCallOptionsTest.java | 21 +++ .../v3/client/query/QueryOptionsTest.java | 121 ++++++++++++------ 4 files changed, 175 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 5eb05f23..c7ed5bcc 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -397,14 +397,7 @@ private Stream queryData(@Nonnull final String query, Arguments.checkNonEmpty(query, "query"); Arguments.checkNotNull(parameters, "parameters"); Arguments.checkNotNull(options, "options"); - - if (options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) { - options.setGrpcCallOptions(new GrpcCallOptions.Builder() - .fromGrpcCallOptions(options.grpcCallOptions()) - .withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)) - .build()); - } - + if (closed) { throw new IllegalStateException("InfluxDBClient has been closed."); } @@ -421,7 +414,17 @@ private Stream queryData(@Nonnull final String query, } }); - CallOption[] callOptions = options.grpcCallOptions().getCallOptions(); + // Copy call options to create a new deadline for this query + // if queryTimeout is configured and deadline isn't explicitly set + GrpcCallOptions grpcCallOptionsCopy = options.grpcCallOptions(); + if (grpcCallOptionsCopy.getDeadline() == null && config.getQueryTimeout() != null) { + grpcCallOptionsCopy = new GrpcCallOptions.Builder() + .fromGrpcCallOptions(grpcCallOptionsCopy) + .withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)) + .build(); + } + + CallOption[] callOptions = grpcCallOptionsCopy.getCallOptions(); return flightSqlClient.execute( query, diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 02f2a200..71034b25 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -21,11 +21,16 @@ */ package com.influxdb.v3.client.query; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import io.grpc.Deadline; + import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.Arguments; import com.influxdb.v3.client.internal.GrpcCallOptions; @@ -190,4 +195,58 @@ public GrpcCallOptions grpcCallOptions() { private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } + + @Override + protected QueryOptions clone() { + QueryOptions clone; + HashMap cloneHeaders = new HashMap<>(this.headers); + for (String key : this.headers.keySet()) { + cloneHeaders.put(key, this.headers.get(key)); + } + try { + clone = (QueryOptions) super.clone(); + } catch (final CloneNotSupportedException e) { + clone = new QueryOptions(this.database, this.queryType, cloneHeaders); + } + if (this.grpcCallOptions != null) { + GrpcCallOptions.Builder grpcOptsBuilder = new GrpcCallOptions.Builder(); + if (this.grpcCallOptions.getDeadline() != null) { + grpcOptsBuilder.withDeadline(Deadline.after(this.grpcCallOptions + .getDeadline() + .timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)); + } + grpcOptsBuilder.withExecutor(this.grpcCallOptions.getExecutor()); + grpcOptsBuilder.withCompressorName(this.grpcCallOptions.getCompressorName()); + if (this.grpcCallOptions.getWaitForReady() != null + && this.grpcCallOptions.getWaitForReady()) { + grpcOptsBuilder.withWaitForReady(); + } + grpcOptsBuilder.withMaxInboundMessageSize(this.grpcCallOptions.getMaxInboundMessageSize()); + grpcOptsBuilder.withMaxOutboundMessageSize(this.grpcCallOptions.getMaxOutboundMessageSize()); + clone.grpcCallOptions = grpcOptsBuilder.build(); + } else { + clone.setGrpcCallOptions(null); + } + return clone; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + QueryOptions that = (QueryOptions) o; + return Objects.equals(this.database, that.database) + && Objects.equals(this.queryType, that.queryType) + && Objects.equals(this.headers, that.headers) + && Objects.equals(this.grpcCallOptions, that.grpcCallOptions); + } + + @Override + public int hashCode() { + return Objects.hash(database, queryType, headers, grpcCallOptions); + } } diff --git a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java index 6bb2cd22..fdb9c5d9 100644 --- a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java @@ -252,4 +252,25 @@ void testToString() { + "maxOutboundMessageSize=5000}"; assertEquals(expected, options.toString()); } + + @Test + void testDeadlineIsNotZeroOrStale() throws InterruptedException { + // Create a deadline 3 seconds from now + Deadline deadline = Deadline.after(3000, TimeUnit.MILLISECONDS); + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withDeadline(deadline) + .build(); + + // Verify the deadline is in the future + assertNotNull(options.getDeadline()); + long timeRemaining = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS); + Assertions.assertTrue(timeRemaining > 0, "Deadline should be in the future"); + Assertions.assertTrue(timeRemaining <= 3000, "Deadline should not exceed 3 seconds"); + + // Wait a bit and verify deadline is still valid but closer + TimeUnit.MILLISECONDS.sleep(100); + long timeRemainingAfter = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS); + Assertions.assertTrue(timeRemainingAfter > 0, "Deadline should still be in the future"); + Assertions.assertTrue(timeRemainingAfter < timeRemaining, "Deadline should be closer to expiration"); + } } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index ecfd988c..8f375fa8 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.URI; +import java.time.Duration; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -65,15 +66,15 @@ private static int findFreePort() throws IOException { @BeforeEach void before() { configBuilder = new ClientConfig.Builder() - .host("http://localhost:8086") - .token("my-token".toCharArray()); + .host("http://localhost:8086") + .token("my-token".toCharArray()); } @Test void optionsOverrideAll() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions("your-database", QueryType.InfluxQL); @@ -84,8 +85,8 @@ void optionsOverrideAll() { @Test void optionsOverrideDatabase() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions("your-database"); @@ -96,8 +97,8 @@ void optionsOverrideDatabase() { @Test void optionsOverrideQueryType() { ClientConfig config = configBuilder - .database("my-database") - .build(); + .database("my-database") + .build(); QueryOptions options = new QueryOptions(QueryType.InfluxQL); @@ -119,21 +120,21 @@ void setInboundMessageSizeSmall() throws Exception { String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); ClientConfig.Builder builder = new ClientConfig.Builder() - .host(host) - .database("test"); + .host(host) + .database("test"); // Set very small message size for testing GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(200) - .build(); + .withMaxInboundMessageSize(200) + .build(); QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { try (Stream stream = influxDBClient.queryPoints( - "Select * from \"nothing\"", - queryOptions + "Select * from \"nothing\"", + queryOptions )) { try { Assertions.assertThatThrownBy(stream::count); @@ -159,13 +160,13 @@ void setInboundMessageSizeLarge() throws Exception { String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); ClientConfig clientConfig = new ClientConfig.Builder() - .host(host) - .database("test") - .build(); + .host(host) + .database("test") + .build(); GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(1024 * 1024 * 1024) - .build(); + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); @@ -173,8 +174,8 @@ void setInboundMessageSizeLarge() throws Exception { try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { Assertions.assertThatNoException().isThrownBy(() -> { Stream stream = influxDBClient.queryPoints( - "Select * from \"nothing\"", - queryOptions); + "Select * from \"nothing\"", + queryOptions); Assertions.assertThat(stream.count()).isEqualTo(rowCount); stream.close(); }); @@ -197,12 +198,12 @@ void setGrpcCallOptions() { String compressorName = "name"; GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder().withExecutor(executor) - .withMaxInboundMessageSize(1024) - .withMaxOutboundMessageSize(1024) - .withWaitForReady() - .withDeadline(deadline) - .withCompressorName(compressorName) - .build(); + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withWaitForReady() + .withDeadline(deadline) + .withCompressorName(compressorName) + .build(); QueryOptions options = new QueryOptions("test"); options.setGrpcCallOptions(grpcCallOption); @@ -221,16 +222,16 @@ void grpcCallOptions() { Executor executor = Executors.newSingleThreadExecutor(); Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(1024) - .withMaxOutboundMessageSize(1024) - .withCompressorName("my-compressor") - .withWaitForReady() - .withExecutor(executor) - .withDeadline(deadline) - .build(); + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withCompressorName("my-compressor") + .withWaitForReady() + .withExecutor(executor) + .withDeadline(deadline) + .build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) - .usePlaintext() - .build(); + .usePlaintext() + .build(); FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); for (CallOption option : grpcCallOption.getCallOptions()) { stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); @@ -238,13 +239,57 @@ void grpcCallOptions() { io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()) - .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); + .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()) - .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); + .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); Assertions.assertThat(stubCallOptions.getExecutor()).isEqualTo(grpcCallOption.getExecutor()); Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); } + @Test + public void queryOptionsUnchangedByCall() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .queryTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = queryOptions.clone(); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } } From 2452b0923364756daba6ab2026ad2d3bd4c0b852 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 15:44:48 +0100 Subject: [PATCH 02/11] chore: cleanup lint --- .../com/influxdb/v3/client/internal/InfluxDBClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index c7ed5bcc..885b8592 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -397,7 +397,7 @@ private Stream queryData(@Nonnull final String query, Arguments.checkNonEmpty(query, "query"); Arguments.checkNotNull(parameters, "parameters"); Arguments.checkNotNull(options, "options"); - + if (closed) { throw new IllegalStateException("InfluxDBClient has been closed."); } From 6b4ebfed064a2280be3c26c10de53ccc5298e2fb Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 17:24:49 +0100 Subject: [PATCH 03/11] tests: add test of QueryOptions.clone and comparison. --- .../v3/client/query/QueryOptions.java | 6 +---- .../v3/client/query/QueryOptionsTest.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 71034b25..b0085166 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -29,8 +29,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import io.grpc.Deadline; - import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.Arguments; import com.influxdb.v3.client.internal.GrpcCallOptions; @@ -211,9 +209,7 @@ protected QueryOptions clone() { if (this.grpcCallOptions != null) { GrpcCallOptions.Builder grpcOptsBuilder = new GrpcCallOptions.Builder(); if (this.grpcCallOptions.getDeadline() != null) { - grpcOptsBuilder.withDeadline(Deadline.after(this.grpcCallOptions - .getDeadline() - .timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)); + grpcOptsBuilder.withDeadline(this.grpcCallOptions.getDeadline().offset(0, TimeUnit.MILLISECONDS)); } grpcOptsBuilder.withExecutor(this.grpcCallOptions.getExecutor()); grpcOptsBuilder.withCompressorName(this.grpcCallOptions.getCompressorName()); diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 8f375fa8..28f35c28 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -25,6 +25,7 @@ import java.net.ServerSocket; import java.net.URI; import java.time.Duration; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -248,6 +249,30 @@ void grpcCallOptions() { Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); } + @Test + public void queryOptionsCloneTest() { + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .withMaxOutboundMessageSize(1024 * 1024 * 1024) + .withDeadline(Deadline.after(2, TimeUnit.MINUTES)) + .build(); + + Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); + + QueryOptions queryOptions = new QueryOptions("myQueryOptions", QueryType.SQL, headers); + queryOptions.setGrpcCallOptions(grpcCallOption); + + QueryOptions clone = queryOptions.clone(); + Assertions.assertThat(clone).isEqualTo(queryOptions); + // not the same object + Assertions.assertThat(queryOptions == clone).isFalse(); + // deep copy grpc options + Assertions.assertThat(queryOptions.grpcCallOptions() == clone.grpcCallOptions()).isFalse(); + // deep copy headers + Assertions.assertThat(queryOptions.headersSafe() == clone.headersSafe()).isFalse(); + } + @Test public void queryOptionsUnchangedByCall() throws IOException { int freePort = findFreePort(); @@ -276,6 +301,8 @@ public void queryOptionsUnchangedByCall() throws IOException { queryOptions.setGrpcCallOptions(grpcCallOption); QueryOptions originalQueryOptions = queryOptions.clone(); Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + System.out.println("DEBUG queryOptions.grpcOptions " + queryOptions.grpcCallOptions()); + System.out.println("DEBUG originalQueryOptions.grpcOptions " + originalQueryOptions.grpcCallOptions()); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { Assertions.assertThatNoException().isThrownBy(() -> { From 743a8aafe45b88bc739cc4f5890dcde0d61c1264 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 17:50:27 +0100 Subject: [PATCH 04/11] docs: update CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4611dca..74ffc98d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ 1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`. +### Bug Fixes + +1. [#310](https://github.com/InfluxCommunity/influxdb3-java/pull/310): Ensure `QueryOptions` objects are left unchanged within the `queryData` implementation. + ## 1.5.0 [2025-10-22] ### Features From d5239c63e9a9342eaf9ea5692ced7b3e4fa4b48c Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 17:58:04 +0100 Subject: [PATCH 05/11] tests: additional QueryOptions test --- .../influxdb/v3/client/query/QueryOptionsTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 28f35c28..e62e4b4c 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -256,6 +256,8 @@ public void queryOptionsCloneTest() { .withMaxInboundMessageSize(1024 * 1024 * 1024) .withMaxOutboundMessageSize(1024 * 1024 * 1024) .withDeadline(Deadline.after(2, TimeUnit.MINUTES)) + .withCompressorName("my-compressor") + .withWaitForReady() .build(); Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); @@ -273,6 +275,16 @@ public void queryOptionsCloneTest() { Assertions.assertThat(queryOptions.headersSafe() == clone.headersSafe()).isFalse(); } + @Test + public void queryOptionsCompareTest() { + Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); + + QueryOptions queryOptions = new QueryOptions("myQueryOptions", QueryType.SQL, headers); + Assertions.assertThat(queryOptions).isEqualTo(queryOptions); + Assertions.assertThat(queryOptions).isNotEqualTo("Some String"); + Assertions.assertThat(queryOptions).isNotEqualTo(null); + } + @Test public void queryOptionsUnchangedByCall() throws IOException { int freePort = findFreePort(); From 3e4f4be18c12fb28986c0e0b2482fd9a49977fb3 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 12 Nov 2025 18:13:36 +0100 Subject: [PATCH 06/11] tests: add assert --- src/main/java/com/influxdb/v3/client/query/QueryOptions.java | 2 -- .../java/com/influxdb/v3/client/query/QueryOptionsTest.java | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index b0085166..a2d53e26 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -220,8 +220,6 @@ protected QueryOptions clone() { grpcOptsBuilder.withMaxInboundMessageSize(this.grpcCallOptions.getMaxInboundMessageSize()); grpcOptsBuilder.withMaxOutboundMessageSize(this.grpcCallOptions.getMaxOutboundMessageSize()); clone.grpcCallOptions = grpcOptsBuilder.build(); - } else { - clone.setGrpcCallOptions(null); } return clone; } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index e62e4b4c..70e60120 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -266,7 +266,9 @@ public void queryOptionsCloneTest() { queryOptions.setGrpcCallOptions(grpcCallOption); QueryOptions clone = queryOptions.clone(); + Assertions.assertThat(clone).isEqualTo(queryOptions); + Assertions.assertThat(queryOptions.hashCode()).isEqualTo(clone.hashCode()); // not the same object Assertions.assertThat(queryOptions == clone).isFalse(); // deep copy grpc options From 924e01cf5479594a05bc04be1eb1ea2482152508 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 13 Nov 2025 10:05:03 +0100 Subject: [PATCH 07/11] chore: remove debug messages from test --- .../java/com/influxdb/v3/client/query/QueryOptionsTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 70e60120..93d9f4c9 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -315,8 +315,6 @@ public void queryOptionsUnchangedByCall() throws IOException { queryOptions.setGrpcCallOptions(grpcCallOption); QueryOptions originalQueryOptions = queryOptions.clone(); Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); - System.out.println("DEBUG queryOptions.grpcOptions " + queryOptions.grpcCallOptions()); - System.out.println("DEBUG originalQueryOptions.grpcOptions " + originalQueryOptions.grpcCallOptions()); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { Assertions.assertThatNoException().isThrownBy(() -> { From 178f1bb325a2e0d004de86736e864443d53736af Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 13 Nov 2025 10:59:11 +0100 Subject: [PATCH 08/11] chore: refactor gRPC Deadline checks and add logger warning for impractical values. --- .../client/internal/InfluxDBClientImpl.java | 25 +++++++---- .../v3/client/query/QueryOptionsTest.java | 44 +++++++++++++++++++ 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 885b8592..bec91dd7 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -414,17 +414,24 @@ private Stream queryData(@Nonnull final String query, } }); - // Copy call options to create a new deadline for this query - // if queryTimeout is configured and deadline isn't explicitly set - GrpcCallOptions grpcCallOptionsCopy = options.grpcCallOptions(); - if (grpcCallOptionsCopy.getDeadline() == null && config.getQueryTimeout() != null) { - grpcCallOptionsCopy = new GrpcCallOptions.Builder() - .fromGrpcCallOptions(grpcCallOptionsCopy) - .withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)) - .build(); + GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder() + .fromGrpcCallOptions(options.grpcCallOptions()); + + if (options.grpcCallOptions().getDeadline() == null) { + if (config.getQueryTimeout() != null) { + builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + } + } else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.NANOSECONDS) <= 0) { + LOG.warning("Received impractical gRPC call options deadline " + + options.grpcCallOptions().getDeadline()); + if (config.getQueryTimeout() != null) { + LOG.warning("Using configuration query timeout " + + config.getQueryTimeout() + " as a replacement."); + builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + } } - CallOption[] callOptions = grpcCallOptionsCopy.getCallOptions(); + CallOption[] callOptions = builder.build().getCallOptions(); return flightSqlClient.execute( query, diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 93d9f4c9..b04df729 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -329,6 +329,50 @@ public void queryOptionsUnchangedByCall() throws IOException { } catch (Exception e) { throw new RuntimeException(e); } + } + + @Test + public void impracticalGRPCDeadlineReplacedByQueryTimeout() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .queryTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .withDeadline(Deadline.after(-2, TimeUnit.MILLISECONDS)) + .build(); + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = queryOptions.clone(); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } } } From 719e8b11f2cf1ab212f59096d44f19f4c650378c Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 13 Nov 2025 14:39:22 +0100 Subject: [PATCH 09/11] chore: clarify logic of choosing Deadline value, update tests, remove QueryOptions.clone() --- .../v3/client/internal/GrpcCallOptions.java | 11 +++ .../client/internal/InfluxDBClientImpl.java | 25 +++-- .../v3/client/query/QueryOptions.java | 32 ------ .../com/influxdb/v3/client/ITQueryWrite.java | 21 ++-- .../v3/client/query/QueryOptionsTest.java | 97 +++++++++++++------ 5 files changed, 105 insertions(+), 81 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 365be44e..ae1462e0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -234,6 +234,17 @@ public Builder withDeadline(final @Nonnull Deadline deadline) { return this; } + /** + * Unsets absolute deadline. Note deadline may have been set + * via {@link #fromGrpcCallOptions(GrpcCallOptions)} method. + * + * @return this + */ + public Builder withoutDeadline() { + this.deadline = null; + return this; + } + /** * Sets an {@code executor} to be used instead of the default * executor specified with {@link ManagedChannelBuilder#executor}. diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index bec91dd7..7b7034c0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -417,16 +417,23 @@ private Stream queryData(@Nonnull final String query, GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder() .fromGrpcCallOptions(options.grpcCallOptions()); - if (options.grpcCallOptions().getDeadline() == null) { - if (config.getQueryTimeout() != null) { - builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + if (config.getQueryTimeout() == null) { + if (options.grpcCallOptions().getDeadline() != null + && options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { + LOG.warning("Query timeout " + + options.grpcCallOptions().getDeadline() + + " is 0 or negative and will be ignored."); + builder.withoutDeadline(); } - } else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.NANOSECONDS) <= 0) { - LOG.warning("Received impractical gRPC call options deadline " - + options.grpcCallOptions().getDeadline()); - if (config.getQueryTimeout() != null) { - LOG.warning("Using configuration query timeout " - + config.getQueryTimeout() + " as a replacement."); + } else { + if (options.grpcCallOptions().getDeadline() == null) { + builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); + } else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) { + LOG.warning("Query timeout " + + options.grpcCallOptions().getDeadline() + + " is 0 or negative. Using config.queryTimeout " + + config.getQueryTimeout() + + " instead."); builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS)); } } diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index a2d53e26..a2611c0b 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -21,10 +21,8 @@ */ package com.influxdb.v3.client.query; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -194,36 +192,6 @@ private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } - @Override - protected QueryOptions clone() { - QueryOptions clone; - HashMap cloneHeaders = new HashMap<>(this.headers); - for (String key : this.headers.keySet()) { - cloneHeaders.put(key, this.headers.get(key)); - } - try { - clone = (QueryOptions) super.clone(); - } catch (final CloneNotSupportedException e) { - clone = new QueryOptions(this.database, this.queryType, cloneHeaders); - } - if (this.grpcCallOptions != null) { - GrpcCallOptions.Builder grpcOptsBuilder = new GrpcCallOptions.Builder(); - if (this.grpcCallOptions.getDeadline() != null) { - grpcOptsBuilder.withDeadline(this.grpcCallOptions.getDeadline().offset(0, TimeUnit.MILLISECONDS)); - } - grpcOptsBuilder.withExecutor(this.grpcCallOptions.getExecutor()); - grpcOptsBuilder.withCompressorName(this.grpcCallOptions.getCompressorName()); - if (this.grpcCallOptions.getWaitForReady() != null - && this.grpcCallOptions.getWaitForReady()) { - grpcOptsBuilder.withWaitForReady(); - } - grpcOptsBuilder.withMaxInboundMessageSize(this.grpcCallOptions.getMaxInboundMessageSize()); - grpcOptsBuilder.withMaxOutboundMessageSize(this.grpcCallOptions.getMaxOutboundMessageSize()); - clone.grpcCallOptions = grpcOptsBuilder.build(); - } - return clone; - } - @Override public boolean equals(final Object o) { if (this == o) { diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index 49e9110c..0b299322 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -412,7 +412,7 @@ public void queryTimeoutSuperceededByGrpcOptTest() { .host(System.getenv("TESTING_INFLUXDB_URL")) .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) .database(System.getenv("TESTING_INFLUXDB_DATABASE")) - .queryTimeout(Duration.ofSeconds(3)) + .queryTimeout(Duration.ofNanos(3)) .build()); String measurement = "timeout_test_" + Math.round(Math.random() * 100_000); @@ -423,20 +423,21 @@ public void queryTimeoutSuperceededByGrpcOptTest() { QueryOptions queryOptions = QueryOptions.defaultQueryOptions(); queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder() - .withDeadline(Deadline.after(5000, TimeUnit.NANOSECONDS)) + .withDeadline(Deadline.after(500, TimeUnit.MILLISECONDS)) .build() ); - Throwable thrown = catchThrowable(() -> { - Stream stream = client.query(sql, queryOptions); - stream.forEach(row -> { - Assertions.assertThat(row).hasSize(1); - Assertions.assertThat(row[0]).isEqualTo(123.0); - }); + Assertions.assertThatNoException().isThrownBy(() -> { + try (Stream stream = client.query(sql, queryOptions)) { + stream.forEach(row -> { + Assertions.assertThat(row).hasSize(1); + Assertions.assertThat(row[0]).isEqualTo(123.0); + }); + } }); - Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); - Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*"); + // Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); + // Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*"); } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index b04df729..c17e118f 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -25,11 +25,13 @@ import java.net.ServerSocket; import java.net.URI; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import javax.annotation.Nonnull; import io.grpc.Deadline; import io.grpc.ManagedChannel; @@ -249,34 +251,6 @@ void grpcCallOptions() { Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); } - @Test - public void queryOptionsCloneTest() { - - GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxInboundMessageSize(1024 * 1024 * 1024) - .withMaxOutboundMessageSize(1024 * 1024 * 1024) - .withDeadline(Deadline.after(2, TimeUnit.MINUTES)) - .withCompressorName("my-compressor") - .withWaitForReady() - .build(); - - Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); - - QueryOptions queryOptions = new QueryOptions("myQueryOptions", QueryType.SQL, headers); - queryOptions.setGrpcCallOptions(grpcCallOption); - - QueryOptions clone = queryOptions.clone(); - - Assertions.assertThat(clone).isEqualTo(queryOptions); - Assertions.assertThat(queryOptions.hashCode()).isEqualTo(clone.hashCode()); - // not the same object - Assertions.assertThat(queryOptions == clone).isFalse(); - // deep copy grpc options - Assertions.assertThat(queryOptions.grpcCallOptions() == clone.grpcCallOptions()).isFalse(); - // deep copy headers - Assertions.assertThat(queryOptions.headersSafe() == clone.headersSafe()).isFalse(); - } - @Test public void queryOptionsCompareTest() { Map headers = Map.of("k1", "v1", "k2", "v2", "k3", "v3"); @@ -313,7 +287,7 @@ public void queryOptionsUnchangedByCall() throws IOException { QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); - QueryOptions originalQueryOptions = queryOptions.clone(); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { @@ -358,7 +332,7 @@ public void impracticalGRPCDeadlineReplacedByQueryTimeout() throws IOException { QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOptions(grpcCallOption); - QueryOptions originalQueryOptions = queryOptions.clone(); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { @@ -375,4 +349,67 @@ public void impracticalGRPCDeadlineReplacedByQueryTimeout() throws IOException { throw new RuntimeException(e); } } + + @Test + public void impracticalGRPCTimeoutIgnored() throws IOException { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + int rowCount = 10; + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, + TestUtils.simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .writeTimeout(Duration.ofSeconds(60)) + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .withDeadline(Deadline.after(-2, TimeUnit.MILLISECONDS)) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + QueryOptions originalQueryOptions = cloneQueryOptions(queryOptions, clientConfig); + Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"sensors\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static QueryOptions cloneQueryOptions(@Nonnull final QueryOptions orig, + @Nonnull final ClientConfig clientConfig) { + + HashMap cloneHeaders = new HashMap<>(orig.headersSafe()); + for (String key : orig.headersSafe().keySet()) { + cloneHeaders.put(key, orig.headersSafe().get(key)); + } + + QueryOptions clone = new QueryOptions(orig.databaseSafe(clientConfig), + orig.queryTypeSafe(), + cloneHeaders); + + GrpcCallOptions.Builder grpcOptsBuilder = new GrpcCallOptions.Builder() + .fromGrpcCallOptions(orig.grpcCallOptions()); + + clone.setGrpcCallOptions(grpcOptsBuilder.build()); + return clone; + } } From bf987dd3325427a7bdfea146409f9dab991ffbfb Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 13 Nov 2025 15:08:11 +0100 Subject: [PATCH 10/11] chore: remove commented assertions. --- src/test/java/com/influxdb/v3/client/ITQueryWrite.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index 0b299322..d9bf4dd2 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -435,9 +435,6 @@ public void queryTimeoutSuperceededByGrpcOptTest() { }); } }); - - // Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); - // Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*"); } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") From e169db7d3dc6929f2a2fbf3a832b75d40da32061 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 13 Nov 2025 15:23:00 +0100 Subject: [PATCH 11/11] docs: update Javadoc with preferred Deadline approach. --- .../com/influxdb/v3/client/internal/GrpcCallOptions.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index ae1462e0..acc5f344 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -21,6 +21,7 @@ */ package com.influxdb.v3.client.internal; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -226,6 +227,11 @@ public Builder() { /** * Sets the absolute deadline for a rpc call. * + *

Please note the preferred approach is to set a queryTimeout + * Duration value globally in the ClientConfig + * ({@link com.influxdb.v3.client.config.ClientConfig.Builder#queryTimeout(Duration)}). + * This value will then be used to calculate a new Deadline with each call.

+ * * @param deadline The deadline * @return this */