diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a5488f..e4611dca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.6.0 [unreleased] +### Features + +1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`. + ## 1.5.0 [2025-10-22] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 4337f6b3..88d20aff 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -271,6 +271,7 @@ public VectorSchemaRoot next() { @Override public void close() { try { + flightStream.close(); AutoCloseables.close(autoCloseable); } catch (Exception e) { throw new RuntimeException(e); diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 42609398..63c95afa 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -26,6 +26,7 @@ import java.net.URL; import java.net.URLConnection; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -52,7 +53,7 @@ public class E2ETest { - private static final java.util.logging.Logger LOG = Logger.getLogger(E2ETest.class.getName()); + private static final Logger LOG = Logger.getLogger(E2ETest.class.getName()); @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @@ -381,6 +382,109 @@ public void testGetServerVersion() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testNoAllocatorMemoryLeak() { + + Instant now = Instant.now(); + String measurement = "test_" + now.toEpochMilli() % 1000; + + Assertions.assertThatNoException().isThrownBy(() -> { + + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + List points = List.of( + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now)); + + client.writePoints(points); + String query = "SELECT * FROM " + measurement; + + try (Stream stream = client.queryPoints(query)) { + // N.B. with other items remaining in the stream, an unclosed FlightStream + // will not clean up the residual items and will cause the BaseAllocator + // to throw an IllegalStateException: Memory was leaked... + // Test to ensure FlightStream was closed even though two more records + // remain in the stream + stream.findFirst() + .ifPresent(pointValues -> { + Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L); + Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14); + }); + } + } + }); + + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testMultipleQueries() throws Exception { + + Instant now = Instant.now(); + String measurement = "test_" + now.toEpochMilli() % 1000; + + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + List points = List.of( + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now)); + + client.writePoints(points); + String query = "SELECT * FROM " + measurement; + + for (int i = 0; i < 20; i++) { + try (Stream stream = client.queryPoints(query)) { + stream.forEach(pointValues -> { + Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L); + Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14); + }); + } + } + } + + + } + + private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) { influxDBClient.writePoint( Point.measurement("test1")