Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`.

### Bug Fixes

1. [#310](https://github.com/InfluxCommunity/influxdb3-java/pull/310): Ensure `QueryOptions` objects are left unchanged within the `queryData` implementation.

## 1.5.0 [2025-10-22]

### Features
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package com.influxdb.v3.client.internal;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -226,6 +227,11 @@ public Builder() {
/**
* Sets the absolute deadline for a rpc call.
*
* <p><i>Please note</i> the preferred approach is to set a <code>queryTimeout</code>
* Duration value globally in the ClientConfig
* ({@link com.influxdb.v3.client.config.ClientConfig.Builder#queryTimeout(Duration)}).
* This value will then be used to calculate a new Deadline with each call.</p>
*
* @param deadline The deadline
* @return this
*/
Expand All @@ -234,6 +240,17 @@ public Builder withDeadline(final @Nonnull Deadline deadline) {
return this;
}

/**
* Unsets absolute deadline. Note deadline may have been set
* via {@link #fromGrpcCallOptions(GrpcCallOptions)} method.
*
* @return this
*/
public Builder withoutDeadline() {
this.deadline = null;
return this;
}

/**
* Sets an {@code executor} to be used instead of the default
* executor specified with {@link ManagedChannelBuilder#executor}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,6 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
Arguments.checkNotNull(parameters, "parameters");
Arguments.checkNotNull(options, "options");

if (options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) {
options.setGrpcCallOptions(new GrpcCallOptions.Builder()
.fromGrpcCallOptions(options.grpcCallOptions())
.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS))
.build());
}

if (closed) {
throw new IllegalStateException("InfluxDBClient has been closed.");
}
Expand All @@ -421,7 +414,31 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
}
});

CallOption[] callOptions = options.grpcCallOptions().getCallOptions();
GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder()
.fromGrpcCallOptions(options.grpcCallOptions());

if (config.getQueryTimeout() == null) {
if (options.grpcCallOptions().getDeadline() != null
&& options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) {
LOG.warning("Query timeout "
+ options.grpcCallOptions().getDeadline()
+ " is 0 or negative and will be ignored.");
builder.withoutDeadline();
}
} else {
if (options.grpcCallOptions().getDeadline() == null) {
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
} else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) {
LOG.warning("Query timeout "
+ options.grpcCallOptions().getDeadline()
+ " is 0 or negative. Using config.queryTimeout "
+ config.getQueryTimeout()
+ " instead.");
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
}
}

CallOption[] callOptions = builder.build().getCallOptions();

return flightSqlClient.execute(
query,
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/influxdb/v3/client/query/QueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.influxdb.v3.client.query;

import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -190,4 +191,24 @@ public GrpcCallOptions grpcCallOptions() {
private boolean isNotDefined(final String option) {
return option == null || option.isEmpty();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QueryOptions that = (QueryOptions) o;
return Objects.equals(this.database, that.database)
&& Objects.equals(this.queryType, that.queryType)
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.grpcCallOptions, that.grpcCallOptions);
}

@Override
public int hashCode() {
return Objects.hash(database, queryType, headers, grpcCallOptions);
}
}
20 changes: 9 additions & 11 deletions src/test/java/com/influxdb/v3/client/ITQueryWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public void queryTimeoutSuperceededByGrpcOptTest() {
.host(System.getenv("TESTING_INFLUXDB_URL"))
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
.queryTimeout(Duration.ofSeconds(3))
.queryTimeout(Duration.ofNanos(3))
.build());

String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
Expand All @@ -423,20 +423,18 @@ public void queryTimeoutSuperceededByGrpcOptTest() {

QueryOptions queryOptions = QueryOptions.defaultQueryOptions();
queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder()
.withDeadline(Deadline.after(5000, TimeUnit.NANOSECONDS))
.withDeadline(Deadline.after(500, TimeUnit.MILLISECONDS))
.build()
);

Throwable thrown = catchThrowable(() -> {
Stream<Object[]> stream = client.query(sql, queryOptions);
stream.forEach(row -> {
Assertions.assertThat(row).hasSize(1);
Assertions.assertThat(row[0]).isEqualTo(123.0);
});
Assertions.assertThatNoException().isThrownBy(() -> {
try (Stream<Object[]> stream = client.query(sql, queryOptions)) {
stream.forEach(row -> {
Assertions.assertThat(row).hasSize(1);
Assertions.assertThat(row[0]).isEqualTo(123.0);
});
}
});

Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*");
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,25 @@ void testToString() {
+ "maxOutboundMessageSize=5000}";
assertEquals(expected, options.toString());
}

@Test
void testDeadlineIsNotZeroOrStale() throws InterruptedException {
// Create a deadline 3 seconds from now
Deadline deadline = Deadline.after(3000, TimeUnit.MILLISECONDS);
GrpcCallOptions options = new GrpcCallOptions.Builder()
.withDeadline(deadline)
.build();

// Verify the deadline is in the future
assertNotNull(options.getDeadline());
long timeRemaining = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
Assertions.assertTrue(timeRemaining > 0, "Deadline should be in the future");
Assertions.assertTrue(timeRemaining <= 3000, "Deadline should not exceed 3 seconds");

// Wait a bit and verify deadline is still valid but closer
TimeUnit.MILLISECONDS.sleep(100);
long timeRemainingAfter = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
Assertions.assertTrue(timeRemainingAfter > 0, "Deadline should still be in the future");
Assertions.assertTrue(timeRemainingAfter < timeRemaining, "Deadline should be closer to expiration");
}
}
Loading
Loading