From 5fa24b7bf9b4a07086d4f4a1faeaa8d6ba1e9223 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 4 Nov 2025 16:22:59 +0100 Subject: [PATCH 01/17] fix: (WIP) ensure FlightStream is closed properly to avoid BaseAllocator memory leaks. --- .../v3/client/internal/FlightSqlClient.java | 5 +- .../v3/client/integration/E2ETest.java | 65 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 4337f6b3..857f4a37 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -77,6 +77,7 @@ final class FlightSqlClient implements AutoCloseable { private final Map defaultHeaders = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); + private final List autoCloseables = new ArrayList<>(); FlightSqlClient(@Nonnull final ClientConfig config) { this(config, null); @@ -133,6 +134,7 @@ Stream execute(@Nonnull final String query, Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); FlightStream stream = client.getStream(ticket, callOptionArray); FlightSqlIterator iterator = new FlightSqlIterator(stream); + autoCloseables.add(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); return StreamSupport.stream(spliterator, false).onClose(iterator::close); @@ -140,7 +142,8 @@ Stream execute(@Nonnull final String query, @Override public void close() throws Exception { - client.close(); + autoCloseables.add(client); + AutoCloseables.close(autoCloseables); } @Nonnull diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 42609398..40a423be 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -25,16 +25,22 @@ import java.net.ConnectException; import java.net.URL; import java.net.URLConnection; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; +import com.influxdb.v3.client.internal.GrpcCallOptions; +import io.grpc.Deadline; import org.apache.arrow.flight.FlightRuntimeException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -381,6 +387,65 @@ public void testGetServerVersion() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testNoAllocatorMemoryLeak() throws Exception { + // TODO need to get logger Error messages for BaseAllocator and verify none was sent + + Instant now = Instant.now(); + String measurement = "test_" + now.toEpochMilli() % 1000; + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + List points = List.of( + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now)); + + client.writePoints(points); + String query = "SELECT * FROM " + measurement; + + Assertions.assertThatNoException().isThrownBy(() -> { + // System.out.println("PASS"); + // throw new Exception("NO PASS"); + + try (Stream stream = client.queryPoints(query)) { + // PointValues[] pvs = stream.toArray(PointValues[]::new); + stream.findFirst() + .ifPresent(pointValues -> { + System.out.println(pointValues.getField("rads")); + }); + //stream.forEach(pointValues -> { + // System.out.println("DEBUG PointValues: " + pointValues.getField("pi")); + //}); + } + }); + + + + + } + + } + + private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) { influxDBClient.writePoint( Point.measurement("test1") From c2b45e8979c4c1b5299bd6fe10686e5bf19b423e Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 4 Nov 2025 16:43:20 +0100 Subject: [PATCH 02/17] chore: remove unused imports in E2E test --- .../java/com/influxdb/v3/client/integration/E2ETest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 40a423be..27e3747e 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -25,22 +25,17 @@ import java.net.ConnectException; import java.net.URL; import java.net.URLConnection; -import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; -import com.influxdb.v3.client.internal.GrpcCallOptions; -import io.grpc.Deadline; import org.apache.arrow.flight.FlightRuntimeException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -392,6 +387,7 @@ public void testGetServerVersion() throws Exception { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test public void testNoAllocatorMemoryLeak() throws Exception { + // TODO need to get logger Error messages for BaseAllocator and verify none was sent Instant now = Instant.now(); From 5b9ce2be0451604b2cbc0c4c1d64f2fe8dd114f1 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 6 Nov 2025 16:49:30 +0100 Subject: [PATCH 03/17] fix: additional workaround for closing FlightStream and test update. --- .../v3/client/internal/FlightSqlClient.java | 11 ++- .../v3/client/integration/E2ETest.java | 84 +++++++++---------- 2 files changed, 49 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 857f4a37..4dbd5641 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -257,7 +257,16 @@ private FlightSqlIterator(@Nonnull final FlightStream flightStream) { @Override public boolean hasNext() { - return flightStream.next(); + boolean nextable = flightStream.next(); + if (!nextable) { + // Nothing left to read - close the stream + try { + flightStream.close(); + } catch (Exception e) { + LOG.error("Error while closing FlightStream: ", e); + } + } + return nextable; } @Override diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 27e3747e..250b5ad5 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -53,7 +52,7 @@ public class E2ETest { - private static final java.util.logging.Logger LOG = Logger.getLogger(E2ETest.class.getName()); + private static final java.util.logging.Logger LOG = java.util.logging.Logger.getLogger(E2ETest.class.getName()); @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @@ -386,58 +385,53 @@ public void testGetServerVersion() throws Exception { @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - public void testNoAllocatorMemoryLeak() throws Exception { - - // TODO need to get logger Error messages for BaseAllocator and verify none was sent + public void testNoAllocatorMemoryLeak() { Instant now = Instant.now(); String measurement = "test_" + now.toEpochMilli() % 1000; - try (InfluxDBClient client = InfluxDBClient.getInstance( - System.getenv("TESTING_INFLUXDB_URL"), - System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), - System.getenv("TESTING_INFLUXDB_DATABASE"), - null)) { - - List points = List.of( - Point.measurement(measurement) - .setTag("type", "test") - .setFloatField("rads", 3.14) - .setIntegerField("life", 42) - .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), - Point.measurement(measurement) - .setTag("type", "test") - .setFloatField("rads", 3.14) - .setIntegerField("life", 42) - .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), - Point.measurement(measurement) - .setTag("type", "test") - .setFloatField("rads", 3.14) - .setIntegerField("life", 42) - .setTimestamp(now)); - - client.writePoints(points); - String query = "SELECT * FROM " + measurement; - - Assertions.assertThatNoException().isThrownBy(() -> { - // System.out.println("PASS"); - // throw new Exception("NO PASS"); + + Assertions.assertThatNoException().isThrownBy(() -> { + + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + List points = List.of( + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now)); + + client.writePoints(points); + String query = "SELECT * FROM " + measurement; try (Stream stream = client.queryPoints(query)) { - // PointValues[] pvs = stream.toArray(PointValues[]::new); + // N.B. with other items remaining in the stream, an unclosed FlightStream + // will not clean up the residual items and will cause the BaseAllocator + // to throw an IllegalStateException: Memory was leaked... + // Test to ensure FlightStream was closed even though two more records + // remain in the stream stream.findFirst() .ifPresent(pointValues -> { - System.out.println(pointValues.getField("rads")); + Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L); + Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14); }); - //stream.forEach(pointValues -> { - // System.out.println("DEBUG PointValues: " + pointValues.getField("pi")); - //}); } - }); - - - - - } + } + }); } From ce5da130d05f2a898a7ae0e65fbd1771515682fc Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 7 Nov 2025 14:05:02 +0100 Subject: [PATCH 04/17] chore: free references to unused FlightStreams to help GC --- .../v3/client/internal/FlightSqlClient.java | 29 ++++++++++- .../v3/client/integration/E2ETest.java | 49 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 4dbd5641..c3bad9b5 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -30,10 +30,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nonnull; @@ -72,6 +74,8 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); + private static int AUTOCLOSEABLE_CHECK_LIMIT = 10; + private static Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); private final FlightClient client; @@ -134,12 +138,34 @@ Stream execute(@Nonnull final String query, Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); FlightStream stream = client.getStream(ticket, callOptionArray); FlightSqlIterator iterator = new FlightSqlIterator(stream); - autoCloseables.add(stream); + addToAutoCloseable(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); return StreamSupport.stream(spliterator, false).onClose(iterator::close); } + private void addToAutoCloseable(AutoCloseable closeable) { + // need to occasionally clean up references to closed streams + // in order to ensure memory can get freed. + if(autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { + LOG.info("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); + + ListIterator iter = autoCloseables.listIterator(); + while(iter.hasNext()){ + AutoCloseable autoCloseable = iter.next(); + if(CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)){ + LOG.info("removing closed stream {}", autoCloseable); + CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable); + iter.remove(); + } + } + } + + autoCloseables.add(closeable); + CLOSEABLE_CLOSED_LEDGER.put(closeable, false); + LOG.debug("autoCloseables count {}, LEDGER count {}", autoCloseables.size(), CLOSEABLE_CLOSED_LEDGER.size()); + } + @Override public void close() throws Exception { autoCloseables.add(client); @@ -262,6 +288,7 @@ public boolean hasNext() { // Nothing left to read - close the stream try { flightStream.close(); + CLOSEABLE_CLOSED_LEDGER.replace(flightStream, true); } catch (Exception e) { LOG.error("Error while closing FlightStream: ", e); } diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 250b5ad5..59642524 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -435,6 +435,55 @@ public void testNoAllocatorMemoryLeak() { } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testMultipleQueries() throws Exception { + + Instant now = Instant.now(); + String measurement = "test_" + now.toEpochMilli() % 1000; + + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + List points = List.of( + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(2, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now.minus(1, ChronoUnit.SECONDS)), + Point.measurement(measurement) + .setTag("type", "test") + .setFloatField("rads", 3.14) + .setIntegerField("life", 42) + .setTimestamp(now)); + + client.writePoints(points); + String query = "SELECT * FROM " + measurement; + + // TODO just checking FlightStream cleanup should be test in FlightSqlClient + for(int i = 0; i < 20; i++) { + try (Stream stream = client.queryPoints(query)) { + stream.forEach(pointValues -> { + Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L); + Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14); + }); + } + } + } + + + } + private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) { influxDBClient.writePoint( From 5142e6cb84b27ade0c131f7fb9bc9afefc9264e0 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 7 Nov 2025 14:19:50 +0100 Subject: [PATCH 05/17] chore: clean up lint issues --- .../influxdb/v3/client/internal/FlightSqlClient.java | 12 ++++++------ .../com/influxdb/v3/client/integration/E2ETest.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index c3bad9b5..27f29594 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -74,8 +74,8 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); - private static int AUTOCLOSEABLE_CHECK_LIMIT = 10; - private static Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); + private static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; + private static final Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); private final FlightClient client; @@ -144,16 +144,16 @@ Stream execute(@Nonnull final String query, return StreamSupport.stream(spliterator, false).onClose(iterator::close); } - private void addToAutoCloseable(AutoCloseable closeable) { + private void addToAutoCloseable(@Nonnull final AutoCloseable closeable) { // need to occasionally clean up references to closed streams // in order to ensure memory can get freed. - if(autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { + if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { LOG.info("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); ListIterator iter = autoCloseables.listIterator(); - while(iter.hasNext()){ + while (iter.hasNext()) { AutoCloseable autoCloseable = iter.next(); - if(CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)){ + if (CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)) { LOG.info("removing closed stream {}", autoCloseable); CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable); iter.remove(); diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 59642524..416e3083 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -471,7 +471,7 @@ public void testMultipleQueries() throws Exception { String query = "SELECT * FROM " + measurement; // TODO just checking FlightStream cleanup should be test in FlightSqlClient - for(int i = 0; i < 20; i++) { + for (int i = 0; i < 20; i++) { try (Stream stream = client.queryPoints(query)) { stream.forEach(pointValues -> { Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L); From 4a151d76a8cc8159728db8b34fa2a03ca6266352 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 7 Nov 2025 15:27:09 +0100 Subject: [PATCH 06/17] test: add test of FlightStream overhead. --- .../v3/client/internal/FlightSqlClient.java | 10 +++--- .../v3/client/integration/E2ETest.java | 1 - .../client/internal/FlightSqlClientTest.java | 31 +++++++++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 27f29594..57748d98 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -75,13 +75,13 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); private static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; - private static final Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); + static final Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); private final FlightClient client; private final Map defaultHeaders = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); - private final List autoCloseables = new ArrayList<>(); + final List autoCloseables = new ArrayList<>(); FlightSqlClient(@Nonnull final ClientConfig config) { this(config, null); @@ -144,17 +144,17 @@ Stream execute(@Nonnull final String query, return StreamSupport.stream(spliterator, false).onClose(iterator::close); } - private void addToAutoCloseable(@Nonnull final AutoCloseable closeable) { + private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closeable) { // need to occasionally clean up references to closed streams // in order to ensure memory can get freed. if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { - LOG.info("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); + LOG.debug("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); ListIterator iter = autoCloseables.listIterator(); while (iter.hasNext()) { AutoCloseable autoCloseable = iter.next(); if (CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)) { - LOG.info("removing closed stream {}", autoCloseable); + LOG.debug("removing closed stream {}", autoCloseable); CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable); iter.remove(); } diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 416e3083..94773923 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -470,7 +470,6 @@ public void testMultipleQueries() throws Exception { client.writePoints(points); String query = "SELECT * FROM " + measurement; - // TODO just checking FlightStream cleanup should be test in FlightSqlClient for (int i = 0; i < 20; i++) { try (Stream stream = client.queryPoints(query)) { stream.forEach(pointValues -> { diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index a6873c56..05e31384 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -370,6 +370,36 @@ void createProxyDetector() { } } + + @Test + public void multipleFlightStreamsFreed() throws Exception { + ClientConfig clientConfig = new ClientConfig.Builder() + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .build(); + + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { + + for (int i = 0; i < 20; i++) { + Stream stream = flightSqlClient.execute( + "select * from cpu", + "mydb", + QueryType.SQL, + Map.of(), + Map.of()); + + stream.forEach(VectorSchemaRoot::contentToTSVString); + } + Assertions.assertThat(flightSqlClient.autoCloseables.size()).isEqualTo(9); + // N.B. can pick up references from other tests... + Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()).isLessThan(20); + for (AutoCloseable closeable : FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.keySet()) { + Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.get(closeable)).isTrue(); // Is closed + } + + } + } + static class HeaderCaptureMiddleware implements FlightServerMiddleware { private final Map headers = new HashMap<>(); @@ -418,4 +448,5 @@ public HeaderCaptureMiddleware onCallStarted(final CallInfo callInfo, return lastInstance; } } + } From 02635449eb3b29ce695afc49e23927bbee26617a Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 7 Nov 2025 16:40:02 +0100 Subject: [PATCH 07/17] test: fix flakey test for FlightStream overhead. --- .../v3/client/internal/FlightSqlClient.java | 2 +- .../client/internal/FlightSqlClientTest.java | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 57748d98..bc1e4729 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -74,7 +74,7 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); - private static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; + static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; static final Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); private final FlightClient client; diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 05e31384..1ba3ad57 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -23,7 +23,9 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -380,6 +382,7 @@ public void multipleFlightStreamsFreed() throws Exception { try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { + List autoCloseables = new ArrayList<>(); for (int i = 0; i < 20; i++) { Stream stream = flightSqlClient.execute( "select * from cpu", @@ -389,14 +392,19 @@ public void multipleFlightStreamsFreed() throws Exception { Map.of()); stream.forEach(VectorSchemaRoot::contentToTSVString); + autoCloseables.add(flightSqlClient.autoCloseables.get(flightSqlClient.autoCloseables.size() - 1)); + } Assertions.assertThat(flightSqlClient.autoCloseables.size()).isEqualTo(9); - // N.B. can pick up references from other tests... - Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()).isLessThan(20); - for (AutoCloseable closeable : FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.keySet()) { - Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.get(closeable)).isTrue(); // Is closed + for (int i = 0; i < autoCloseables.size(); i++) { + if (i < FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT + 1) { + Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isFalse(); + Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.get(autoCloseables.get(i))).isNull(); + } else { + Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); + Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); + } } - } } From 5f2328bcdeb1846fe5f5e5868c58e78175f9b725 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 7 Nov 2025 16:56:29 +0100 Subject: [PATCH 08/17] docs: update CHANGELOG.md --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a5488f..0331578f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## 1.6.0 [unreleased] +### Features + +1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`. + 1. Track new streams in an AutoCloseable list. + 2. Close streams automatically when they are empty. + 3. Ensure that references to closed streams are regularly removed to aid garbage collection. + ## 1.5.0 [2025-10-22] ### Features From 91f57a1f32a5ab33cc0621019fa090d8f8f45032 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 10:50:13 +0100 Subject: [PATCH 09/17] chore: cleanup flightStream ledger of FlightSqlClient close. --- .../java/com/influxdb/v3/client/internal/FlightSqlClient.java | 3 +++ .../com/influxdb/v3/client/internal/FlightSqlClientTest.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index bc1e4729..58bdfe44 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -170,6 +170,9 @@ private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closea public void close() throws Exception { autoCloseables.add(client); AutoCloseables.close(autoCloseables); + for (AutoCloseable closeable : autoCloseables) { + CLOSEABLE_CLOSED_LEDGER.remove(closeable); + } } @Nonnull diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 1ba3ad57..498ff354 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -405,7 +405,11 @@ public void multipleFlightStreamsFreed() throws Exception { Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); } } + Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()) + .isEqualTo(FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT - 1); } + Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()).isEqualTo(0); + } static class HeaderCaptureMiddleware implements FlightServerMiddleware { From 6541ea59cfe671b57115ae46abbd1e002d44178a Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 10:57:37 +0100 Subject: [PATCH 10/17] test: suppress assertion that makes test flakey in CI. --- .../com/influxdb/v3/client/internal/FlightSqlClientTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 498ff354..1ba3ad57 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -405,11 +405,7 @@ public void multipleFlightStreamsFreed() throws Exception { Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); } } - Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()) - .isEqualTo(FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT - 1); } - Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()).isEqualTo(0); - } static class HeaderCaptureMiddleware implements FlightServerMiddleware { From cda5fb7b954543dc99ccb18954794345d2e6d846 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 13:48:14 +0100 Subject: [PATCH 11/17] chore: remove static LEDGER field from FlightSqlClient --- .../v3/client/internal/FlightSqlClient.java | 64 +++++++++++-------- .../client/internal/FlightSqlClientTest.java | 1 - 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 58bdfe44..6783e842 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -35,7 +35,6 @@ import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nonnull; @@ -75,7 +74,6 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; - static final Map CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>(); private final FlightClient client; @@ -136,7 +134,7 @@ Stream execute(@Nonnull final String query, CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption); Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - FlightStream stream = client.getStream(ticket, callOptionArray); + StatefulFlightStream stream = new StatefulFlightStream(client.getStream(ticket, callOptionArray)); FlightSqlIterator iterator = new FlightSqlIterator(stream); addToAutoCloseable(stream); @@ -150,29 +148,30 @@ private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closea if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { LOG.debug("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); - ListIterator iter = autoCloseables.listIterator(); - while (iter.hasNext()) { - AutoCloseable autoCloseable = iter.next(); - if (CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)) { - LOG.debug("removing closed stream {}", autoCloseable); - CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable); + cleanAutoCloseables(); + } + + autoCloseables.add(closeable); + LOG.debug("autoCloseables count {}", autoCloseables.size()); + } + + public void cleanAutoCloseables() { + ListIterator iter = autoCloseables.listIterator(); + while (iter.hasNext()) { + AutoCloseable autoCloseable = iter.next(); + if (autoCloseable.getClass() == FlightSqlClient.StatefulFlightStream.class) { + if (((FlightSqlClient.StatefulFlightStream) autoCloseable).closed) { iter.remove(); } } } - - autoCloseables.add(closeable); - CLOSEABLE_CLOSED_LEDGER.put(closeable, false); - LOG.debug("autoCloseables count {}, LEDGER count {}", autoCloseables.size(), CLOSEABLE_CLOSED_LEDGER.size()); } @Override public void close() throws Exception { autoCloseables.add(client); AutoCloseables.close(autoCloseables); - for (AutoCloseable closeable : autoCloseables) { - CLOSEABLE_CLOSED_LEDGER.remove(closeable); - } + cleanAutoCloseables(); } @Nonnull @@ -274,24 +273,39 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina }; } + private static final class StatefulFlightStream implements AutoCloseable { + FlightStream flightStream; + Boolean closed; + + public StatefulFlightStream(@Nonnull final FlightStream flightStream) { + this.flightStream = flightStream; + this.closed = false; + } + + @Override + public void close() throws Exception { + this.flightStream.close(); + this.closed = true; + } + } + private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); - private final FlightStream flightStream; + private final StatefulFlightStream sFlightStream; - private FlightSqlIterator(@Nonnull final FlightStream flightStream) { - this.flightStream = flightStream; + private FlightSqlIterator(@Nonnull final StatefulFlightStream sFlightStream) { + this.sFlightStream = sFlightStream; } @Override public boolean hasNext() { - boolean nextable = flightStream.next(); + boolean nextable = sFlightStream.flightStream.next(); if (!nextable) { // Nothing left to read - close the stream try { - flightStream.close(); - CLOSEABLE_CLOSED_LEDGER.replace(flightStream, true); + sFlightStream.close(); } catch (Exception e) { LOG.error("Error while closing FlightStream: ", e); } @@ -301,13 +315,13 @@ public boolean hasNext() { @Override public VectorSchemaRoot next() { - if (flightStream.getRoot() == null) { + if (sFlightStream.flightStream.getRoot() == null) { throw new NoSuchElementException(); } - autoCloseable.add(flightStream.getRoot()); + autoCloseable.add(sFlightStream.flightStream.getRoot()); - return flightStream.getRoot(); + return sFlightStream.flightStream.getRoot(); } @Override diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 1ba3ad57..2e427ae9 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -399,7 +399,6 @@ public void multipleFlightStreamsFreed() throws Exception { for (int i = 0; i < autoCloseables.size(); i++) { if (i < FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT + 1) { Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isFalse(); - Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.get(autoCloseables.get(i))).isNull(); } else { Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); From cfd772ac1e9fab73bceab6558502d5fbae35aca9 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 15:03:17 +0100 Subject: [PATCH 12/17] fix: remove List from FlightSqlClient and simplify closing FlightStream. --- .../v3/client/internal/FlightSqlClient.java | 74 +++---------------- .../client/internal/FlightSqlClientTest.java | 35 --------- 2 files changed, 10 insertions(+), 99 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 6783e842..7b6e2643 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -73,13 +73,11 @@ final class FlightSqlClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class); - static final int AUTOCLOSEABLE_CHECK_LIMIT = 10; private final FlightClient client; private final Map defaultHeaders = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); - final List autoCloseables = new ArrayList<>(); FlightSqlClient(@Nonnull final ClientConfig config) { this(config, null); @@ -134,44 +132,16 @@ Stream execute(@Nonnull final String query, CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption); Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - StatefulFlightStream stream = new StatefulFlightStream(client.getStream(ticket, callOptionArray)); + FlightStream stream = client.getStream(ticket, callOptionArray); FlightSqlIterator iterator = new FlightSqlIterator(stream); - addToAutoCloseable(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); return StreamSupport.stream(spliterator, false).onClose(iterator::close); } - private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closeable) { - // need to occasionally clean up references to closed streams - // in order to ensure memory can get freed. - if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) { - LOG.debug("checking to cleanup stale flight streams from {} known streams", autoCloseables.size()); - - cleanAutoCloseables(); - } - - autoCloseables.add(closeable); - LOG.debug("autoCloseables count {}", autoCloseables.size()); - } - - public void cleanAutoCloseables() { - ListIterator iter = autoCloseables.listIterator(); - while (iter.hasNext()) { - AutoCloseable autoCloseable = iter.next(); - if (autoCloseable.getClass() == FlightSqlClient.StatefulFlightStream.class) { - if (((FlightSqlClient.StatefulFlightStream) autoCloseable).closed) { - iter.remove(); - } - } - } - } - @Override public void close() throws Exception { - autoCloseables.add(client); - AutoCloseables.close(autoCloseables); - cleanAutoCloseables(); + client.close(); } @Nonnull @@ -273,60 +243,36 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina }; } - private static final class StatefulFlightStream implements AutoCloseable { - FlightStream flightStream; - Boolean closed; - - public StatefulFlightStream(@Nonnull final FlightStream flightStream) { - this.flightStream = flightStream; - this.closed = false; - } - - @Override - public void close() throws Exception { - this.flightStream.close(); - this.closed = true; - } - } - private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); - private final StatefulFlightStream sFlightStream; + private final FlightStream flightStream; - private FlightSqlIterator(@Nonnull final StatefulFlightStream sFlightStream) { - this.sFlightStream = sFlightStream; + private FlightSqlIterator(@Nonnull final FlightStream flightStream) { + this.flightStream = flightStream; } @Override public boolean hasNext() { - boolean nextable = sFlightStream.flightStream.next(); - if (!nextable) { - // Nothing left to read - close the stream - try { - sFlightStream.close(); - } catch (Exception e) { - LOG.error("Error while closing FlightStream: ", e); - } - } - return nextable; + return flightStream.next(); } @Override public VectorSchemaRoot next() { - if (sFlightStream.flightStream.getRoot() == null) { + if (flightStream.getRoot() == null) { throw new NoSuchElementException(); } - autoCloseable.add(sFlightStream.flightStream.getRoot()); + autoCloseable.add(flightStream.getRoot()); - return sFlightStream.flightStream.getRoot(); + return flightStream.getRoot(); } @Override public void close() { try { + flightStream.close(); AutoCloseables.close(autoCloseable); } catch (Exception e) { throw new RuntimeException(e); diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 2e427ae9..912450d7 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -372,41 +372,6 @@ void createProxyDetector() { } } - - @Test - public void multipleFlightStreamsFreed() throws Exception { - ClientConfig clientConfig = new ClientConfig.Builder() - .host(server.getLocation().getUri().toString()) - .token("my-token".toCharArray()) - .build(); - - try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { - - List autoCloseables = new ArrayList<>(); - for (int i = 0; i < 20; i++) { - Stream stream = flightSqlClient.execute( - "select * from cpu", - "mydb", - QueryType.SQL, - Map.of(), - Map.of()); - - stream.forEach(VectorSchemaRoot::contentToTSVString); - autoCloseables.add(flightSqlClient.autoCloseables.get(flightSqlClient.autoCloseables.size() - 1)); - - } - Assertions.assertThat(flightSqlClient.autoCloseables.size()).isEqualTo(9); - for (int i = 0; i < autoCloseables.size(); i++) { - if (i < FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT + 1) { - Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isFalse(); - } else { - Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); - Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue(); - } - } - } - } - static class HeaderCaptureMiddleware implements FlightServerMiddleware { private final Map headers = new HashMap<>(); From 6554a2a1d5ad3b7833d72a87bd0f97a0c1de4324 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 15:08:52 +0100 Subject: [PATCH 13/17] chore: fix lint issues --- .../java/com/influxdb/v3/client/internal/FlightSqlClient.java | 1 - .../com/influxdb/v3/client/internal/FlightSqlClientTest.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 7b6e2643..88d20aff 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; import java.util.Spliterator; diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 912450d7..21bd04dd 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -23,9 +23,7 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.stream.IntStream; import java.util.stream.Stream; From 3180a0746902a4428eb83349b2d73908372a1fdd Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 15:31:12 +0100 Subject: [PATCH 14/17] docs: remove no-longer pertinent information from CHANGELOG.md --- CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0331578f..e4611dca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,6 @@ ### Features 1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`. - 1. Track new streams in an AutoCloseable list. - 2. Close streams automatically when they are empty. - 3. Ensure that references to closed streams are regularly removed to aid garbage collection. ## 1.5.0 [2025-10-22] From cd0c122974af1939331fcc82c0f27e9130cc13dd Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 15:48:41 +0100 Subject: [PATCH 15/17] tests: restore logging in E2E test. --- src/test/java/com/influxdb/v3/client/integration/E2ETest.java | 3 ++- .../com/influxdb/v3/client/internal/FlightSqlClientTest.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 94773923..63c95afa 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -52,7 +53,7 @@ public class E2ETest { - private static final java.util.logging.Logger LOG = java.util.logging.Logger.getLogger(E2ETest.class.getName()); + private static final Logger LOG = Logger.getLogger(E2ETest.class.getName()); @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 21bd04dd..a6873c56 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -418,5 +418,4 @@ public HeaderCaptureMiddleware onCallStarted(final CallInfo callInfo, return lastInstance; } } - } From 694caaee31445d43e61b5ce9ebd3250fdbddf3a1 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 10 Nov 2025 17:27:47 +0100 Subject: [PATCH 16/17] chore: attempt SNAPSHOT publish --- .circleci/config.yml | 4 +++- pom.xml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5212dd91..2b24c933 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -232,4 +232,6 @@ workflows: - tests-java filters: branches: - only: main + only: + - main + - fix/arrowBaseAllocatorMemoryLeak diff --git a/pom.xml b/pom.xml index 70be09cb..cc549e63 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ com.influxdb influxdb3-java jar - 1.6.0-SNAPSHOT + 1.6.0.1-SNAPSHOT InfluxDB 3 Java Client From dbc6f66451b4e0e6056ecd2e1290c0add3672e00 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 11 Nov 2025 10:31:01 +0100 Subject: [PATCH 17/17] chore: revert SNAPSHOT version and CI config - snapshot was deployed. --- .circleci/config.yml | 4 +--- pom.xml | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 2b24c933..5212dd91 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -232,6 +232,4 @@ workflows: - tests-java filters: branches: - only: - - main - - fix/arrowBaseAllocatorMemoryLeak + only: main diff --git a/pom.xml b/pom.xml index cc549e63..70be09cb 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ com.influxdb influxdb3-java jar - 1.6.0.1-SNAPSHOT + 1.6.0-SNAPSHOT InfluxDB 3 Java Client