From 6e8a5a62bf00d0db6acb7e58414066cb52fb1d21 Mon Sep 17 00:00:00 2001 From: evgeny Date: Mon, 27 Oct 2025 17:07:55 +0000 Subject: [PATCH] fix: reconnect breaks async requests Introduced `connect()` in `AblyBase` to reset thread pools when `AblyRealtime.connect()` is called after `close()` --- .../io/ably/lib/http/AsyncHttpScheduler.java | 24 ++++++++++--- lib/src/main/java/io/ably/lib/http/Http.java | 4 +++ .../io/ably/lib/realtime/AblyRealtime.java | 2 ++ .../main/java/io/ably/lib/rest/AblyBase.java | 7 ++++ .../test/realtime/RealtimeConnectTest.java | 36 +++++++++++++++++++ 5 files changed, 68 insertions(+), 5 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java b/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java index 285cd856c..38b45905f 100644 --- a/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java +++ b/lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java @@ -33,16 +33,23 @@ public AsyncHttpScheduler exchangeHttpCore(HttpCore httpCore) { return new AsyncHttpScheduler(httpCore, this.executor); } + public void connect() { + ((CloseableThreadPoolExecutor) executor).connect(); + } + private static class CloseableThreadPoolExecutor implements CloseableExecutor { - private final ThreadPoolExecutor executor; + // can be accessed by multiple threads, so needs to be volatile + private volatile ThreadPoolExecutor executor; + private final ClientOptions options; CloseableThreadPoolExecutor(final ClientOptions options) { + this.options = options; executor = new ThreadPoolExecutor( options.asyncHttpThreadpoolSize, options.asyncHttpThreadpoolSize, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue() + new LinkedBlockingQueue<>() ); } @@ -59,9 +66,16 @@ public void close() throws Exception { } } - @Override - protected void finalize() throws Throwable { - close(); + public void connect() { + if (executor.isShutdown()) { + executor = new ThreadPoolExecutor( + options.asyncHttpThreadpoolSize, + options.asyncHttpThreadpoolSize, + KEEP_ALIVE_TIME, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>() + ); + } } } } diff --git a/lib/src/main/java/io/ably/lib/http/Http.java b/lib/src/main/java/io/ably/lib/http/Http.java index 708ccf13b..40ced89e7 100644 --- a/lib/src/main/java/io/ably/lib/http/Http.java +++ b/lib/src/main/java/io/ably/lib/http/Http.java @@ -21,6 +21,10 @@ public void close() throws Exception { asyncHttp.close(); } + public void connect() { + asyncHttp.connect(); + } + /** * [Internal Method] *

diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 174fcfbf6..d9053c8d2 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -96,7 +96,9 @@ public AblyRealtime(ClientOptions options) throws AblyException { *

* Spec: RTN11 */ + @Override public void connect() { + super.connect(); // resets thread pool for async requests connection.connect(); } diff --git a/lib/src/main/java/io/ably/lib/rest/AblyBase.java b/lib/src/main/java/io/ably/lib/rest/AblyBase.java index 54bbb539e..67f1d800b 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyBase.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyBase.java @@ -138,6 +138,13 @@ public void close() throws Exception { http.close(); } + /** + * Internal method, used for `RealtimeClient` only + */ + protected void connect() { + http.connect(); + } + /** * A collection of Channels associated with an Ably instance. */ diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectTest.java index 2e744f7df..044d1a979 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectTest.java @@ -6,6 +6,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.ably.lib.test.common.Helpers; +import io.ably.lib.types.AsyncHttpPaginatedResponse; +import io.ably.lib.types.ErrorInfo; import org.junit.Ignore; import org.junit.Test; @@ -23,6 +26,8 @@ import io.ably.lib.types.ProtocolMessage; import org.junit.rules.Timeout; +import java.util.concurrent.atomic.AtomicLong; + public class RealtimeConnectTest extends ParameterizedTest { public Timeout testTimeout = Timeout.seconds(30); @@ -248,4 +253,35 @@ public void close_when_connecting() { } } + @Test + public void reopened_connection_rest_works() throws Exception { + try (AblyRealtime realtimeClient = new AblyRealtime(createOptions(testVars.keys[0].keyStr))) { + realtimeClient.close(); + realtimeClient.connect(); + + long timestamp = System.currentTimeMillis(); + AtomicLong ablyTime = new AtomicLong(0); + Helpers.CompletionWaiter waiter = new Helpers.CompletionWaiter(); + + realtimeClient.requestAsync("GET", "/time", null, null, null, new AsyncHttpPaginatedResponse.Callback() { + @Override + public void onResponse(AsyncHttpPaginatedResponse response) { + ablyTime.set(response.items()[0].getAsLong()); + waiter.onSuccess(); + } + + @Override + public void onError(ErrorInfo reason) { + waiter.onError(reason); + } + }); + + waiter.waitFor(); + + long thirtySeconds = 30_000L; + + assertTrue(ablyTime.get() > timestamp - thirtySeconds && ablyTime.get() < timestamp + thirtySeconds); + } + } + }