Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.6.0 [unreleased]

### Features

1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`.

## 1.5.0 [2025-10-22]

### Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ public VectorSchemaRoot next() {
@Override
public void close() {
try {
flightStream.close();
AutoCloseables.close(autoCloseable);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
106 changes: 105 additions & 1 deletion src/test/java/com/influxdb/v3/client/integration/E2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URL;
import java.net.URLConnection;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -52,7 +53,7 @@

public class E2ETest {

private static final java.util.logging.Logger LOG = Logger.getLogger(E2ETest.class.getName());
private static final Logger LOG = Logger.getLogger(E2ETest.class.getName());

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
Expand Down Expand Up @@ -381,6 +382,109 @@ public void testGetServerVersion() throws Exception {
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testNoAllocatorMemoryLeak() {

Instant now = Instant.now();
String measurement = "test_" + now.toEpochMilli() % 1000;

Assertions.assertThatNoException().isThrownBy(() -> {

try (InfluxDBClient client = InfluxDBClient.getInstance(
System.getenv("TESTING_INFLUXDB_URL"),
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
System.getenv("TESTING_INFLUXDB_DATABASE"),
null)) {

List<Point> points = List.of(
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now));

client.writePoints(points);
String query = "SELECT * FROM " + measurement;

try (Stream<PointValues> stream = client.queryPoints(query)) {
// N.B. with other items remaining in the stream, an unclosed FlightStream
// will not clean up the residual items and will cause the BaseAllocator
// to throw an IllegalStateException: Memory was leaked...
// Test to ensure FlightStream was closed even though two more records
// remain in the stream
stream.findFirst()
.ifPresent(pointValues -> {
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
});
}
}
});

}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testMultipleQueries() throws Exception {

Instant now = Instant.now();
String measurement = "test_" + now.toEpochMilli() % 1000;

try (InfluxDBClient client = InfluxDBClient.getInstance(
System.getenv("TESTING_INFLUXDB_URL"),
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
System.getenv("TESTING_INFLUXDB_DATABASE"),
null)) {

List<Point> points = List.of(
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
Point.measurement(measurement)
.setTag("type", "test")
.setFloatField("rads", 3.14)
.setIntegerField("life", 42)
.setTimestamp(now));

client.writePoints(points);
String query = "SELECT * FROM " + measurement;

for (int i = 0; i < 20; i++) {
try (Stream<PointValues> stream = client.queryPoints(query)) {
stream.forEach(pointValues -> {
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
});
}
}
}


}


private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) {
influxDBClient.writePoint(
Point.measurement("test1")
Expand Down
Loading