From 1ac4165f86459c0956a241a88e2babf7ff06fdf3 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 19 Nov 2025 23:53:13 +0100 Subject: [PATCH 1/5] fix: memory leak in query stream close --- .../v3/client/issues/MemoryLeakIssueTest.java | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java diff --git a/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java b/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java new file mode 100644 index 00000000..70438b2f --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java @@ -0,0 +1,158 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.issues; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.PointValues; +import com.influxdb.v3.client.config.ClientConfig; + +public class MemoryLeakIssueTest { + + private static final Logger LOG = Logger.getLogger(MemoryLeakIssueTest.class.getName()); + + /** + * Tests that interrupting a thread during stream consumption does not cause Arrow memory leaks. + *

+ * This test creates a query thread that slowly consumes results, then interrupts it mid-processing. + * The interrupt causes FlightStream.close() to throw InterruptedException, which previously bypassed + * cleanup code and left Apache Arrow buffers unreleased. With the fix, client.close() should complete + * successfully without throwing "Memory was leaked" errors. + */ + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + void testStreamCloseWithThreadInterrupt() throws Exception { + String host = System.getenv("TESTING_INFLUXDB_URL"); + String token = System.getenv("TESTING_INFLUXDB_TOKEN"); + String database = System.getenv("TESTING_INFLUXDB_DATABASE"); + String measurement = "memory_leak_test_" + System.currentTimeMillis(); + + + // Prepare config + ClientConfig config = new ClientConfig.Builder() + .host(host) + .token(token.toCharArray()) + .database(database) + .writeNoSync(true) + .build(); + + // Write test data + try (InfluxDBClient client = InfluxDBClient.getInstance(config)) { + LOG.info("Writing test data..."); + for (int i = 0; i < 100; i++) { + client.writeRecord(String.format("%s,id=%04d temp=%f", + measurement, i, 20.0 + Math.random() * 10)); + } + } + + TimeUnit.MILLISECONDS.sleep(500); + + // Query data + InfluxDBClient client = InfluxDBClient.getInstance(config); + //noinspection TryFinallyCanBeTryWithResources + try { + String sql = String.format("SELECT * FROM %s", measurement); + + // Synchronization to ensure we interrupt during consumption + CountDownLatch consumingStarted = new CountDownLatch(1); + AtomicInteger rowsProcessed = new AtomicInteger(0); + AtomicInteger exceptionsThrown = new AtomicInteger(0); + + Thread queryThread = new Thread(() -> { + try (Stream stream = client.queryPoints(sql)) { + LOG.info("queryPoints returned"); + stream.forEach(pv -> { + int count = rowsProcessed.incrementAndGet(); + + // Signal that we've started consuming + if (count == 1) { + LOG.info("Started consuming - ready for interrupt"); + consumingStarted.countDown(); + } + + try { + // Slow consumption to ensure we're mid-stream when interrupted + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + LOG.info("INTERRUPTED during consume! (after " + count + " rows)"); + Thread.currentThread().interrupt(); + // Throw exception to stop stream consumption immediately; try-with-resources will then + // close stream in interrupted state + throw new RuntimeException("Interrupted", e); + } + }); + } catch (Exception e) { + exceptionsThrown.incrementAndGet(); + LOG.info("Exception caught: " + e); + } + }); + + LOG.info("Starting consumer thread..."); + queryThread.start(); + + // Wait for thread to start consuming + if (!consumingStarted.await(10, TimeUnit.SECONDS)) { + Assertions.fail("Thread didn't start consuming in time!"); + } + + // Give it a moment to be mid-processing + TimeUnit.MILLISECONDS.sleep(50); + + // Interrupt during processing + LOG.info("Interrupting thread..."); + queryThread.interrupt(); + + // Wait for thread to finish + queryThread.join(10000); + + // Verify that thread started processing rows + if (rowsProcessed.get() == 0) { + Assertions.fail("Thread didn't process any rows"); + } + + // Verify that exception was thrown due to interrupt + if (exceptionsThrown.get() == 0) { + Assertions.fail("No exception was thrown - interrupt might not have worked"); + } + + } catch (Exception e) { + LOG.severe("Test failed: " + e.getMessage()); + throw e; + } finally { + // Now close the client. + // It should not throw `Memory was leaked by query. Memory leaked: (...)` error! + LOG.info("Closing the client..."); + client.close(); + } + } +} From a1df23d98ff0b9f86bd23b42f2b367c40e6b284c Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Thu, 20 Nov 2025 00:27:23 +0100 Subject: [PATCH 2/5] test: wait for data to be available --- .../v3/client/issues/MemoryLeakIssueTest.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java b/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java index 70438b2f..995ce1a3 100644 --- a/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java +++ b/src/test/java/com/influxdb/v3/client/issues/MemoryLeakIssueTest.java @@ -56,7 +56,7 @@ void testStreamCloseWithThreadInterrupt() throws Exception { String token = System.getenv("TESTING_INFLUXDB_TOKEN"); String database = System.getenv("TESTING_INFLUXDB_DATABASE"); String measurement = "memory_leak_test_" + System.currentTimeMillis(); - + String sql = String.format("SELECT * FROM %s", measurement); // Prepare config ClientConfig config = new ClientConfig.Builder() @@ -66,23 +66,40 @@ void testStreamCloseWithThreadInterrupt() throws Exception { .writeNoSync(true) .build(); - // Write test data try (InfluxDBClient client = InfluxDBClient.getInstance(config)) { + // Write test data LOG.info("Writing test data..."); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 3; i++) { client.writeRecord(String.format("%s,id=%04d temp=%f", measurement, i, 20.0 + Math.random() * 10)); } - } - TimeUnit.MILLISECONDS.sleep(500); + // Wait for data to be queryable (CI environments can be slower) + LOG.info("Waiting for data to be available..."); + int attempts = 0; + boolean hasData = false; + while (attempts < 10 && !hasData) { + try (Stream testStream = client.queryPoints(sql)) { + hasData = testStream.findFirst().isPresent(); + } + if (!hasData) { + LOG.info("Data not yet available, waiting... (attempt " + (attempts + 1) + "/10)"); + TimeUnit.MILLISECONDS.sleep(500); + attempts++; + } + } + + if (!hasData) { + Assertions.fail("No data available after writing and waiting " + (attempts * 500) + "ms"); + } + LOG.info("Data is available, starting test..."); + + } // Query data InfluxDBClient client = InfluxDBClient.getInstance(config); //noinspection TryFinallyCanBeTryWithResources try { - String sql = String.format("SELECT * FROM %s", measurement); - // Synchronization to ensure we interrupt during consumption CountDownLatch consumingStarted = new CountDownLatch(1); AtomicInteger rowsProcessed = new AtomicInteger(0); @@ -117,7 +134,6 @@ void testStreamCloseWithThreadInterrupt() throws Exception { } }); - LOG.info("Starting consumer thread..."); queryThread.start(); // Wait for thread to start consuming From 96e5f5b3d526a4dfd23ea7fff02f0a4e409cd189 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Thu, 20 Nov 2025 00:34:07 +0100 Subject: [PATCH 3/5] fix: add fix --- .../v3/client/internal/FlightSqlClient.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 88d20aff..d80c7e87 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -270,11 +270,43 @@ public VectorSchemaRoot next() { @Override public void close() { + Exception pendingException = null; + + // Try to close FlightStream try { flightStream.close(); + } catch (Exception e) { + LOG.warn("FlightStream close failed: {}", e.toString()); + pendingException = e; + + // Retry close - first attempt drained stream but threw exception before cleanup, + // retry finds stream already drained and completes cleanup successfully + try { + flightStream.close(); + // Retry succeeded - clear the exception + pendingException = null; + } catch (Exception retryException) { + // Retry also failed - keep original exception + // but continue to close collected Arrow resources anyway + LOG.error("FlightStream close failed even after retry attempt", retryException); + } + } + + // ALWAYS try to close collected Arrow resources + try { AutoCloseables.close(autoCloseable); } catch (Exception e) { - throw new RuntimeException(e); + LOG.error("AutoCloseable close failed", e); + if (pendingException != null) { + pendingException.addSuppressed(e); + } else { + pendingException = e; + } + } + + // Throw pending exceptions + if (pendingException != null) { + throw new RuntimeException(pendingException); } } } From c1638372745cb492d174d69efa2e02ed70fcfde7 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Thu, 20 Nov 2025 08:27:31 +0100 Subject: [PATCH 4/5] docs: update CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 330b77d5..ba5c7ab5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.7.0 [unreleased] +### Bug Fixes + +1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts. + ## 1.6.0 [2025-11-14] ### Features From 454802b02b352d503b9f531266d95441c2261b94 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Thu, 20 Nov 2025 08:33:02 +0100 Subject: [PATCH 5/5] fix: lint --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba5c7ab5..672a429a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Bug Fixes -1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts. +1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts. ## 1.6.0 [2025-11-14]