Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ public AsyncHttpScheduler exchangeHttpCore(HttpCore httpCore) {
return new AsyncHttpScheduler(httpCore, this.executor);
}

public void connect() {
((CloseableThreadPoolExecutor) executor).connect();
}

private static class CloseableThreadPoolExecutor implements CloseableExecutor {
private final ThreadPoolExecutor executor;
// can be accessed by multiple threads, so needs to be volatile
private volatile ThreadPoolExecutor executor;
private final ClientOptions options;

CloseableThreadPoolExecutor(final ClientOptions options) {
this.options = options;
executor = new ThreadPoolExecutor(
options.asyncHttpThreadpoolSize,
options.asyncHttpThreadpoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
new LinkedBlockingQueue<>()
);
}

Expand All @@ -59,9 +66,16 @@ public void close() throws Exception {
}
}

@Override
protected void finalize() throws Throwable {
close();
public void connect() {
if (executor.isShutdown()) {
executor = new ThreadPoolExecutor(
options.asyncHttpThreadpoolSize,
options.asyncHttpThreadpoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
}
}
}
}
4 changes: 4 additions & 0 deletions lib/src/main/java/io/ably/lib/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public void close() throws Exception {
asyncHttp.close();
}

public void connect() {
asyncHttp.connect();
}

/**
* [Internal Method]
* <p>
Expand Down
2 changes: 2 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public AblyRealtime(ClientOptions options) throws AblyException {
* <p>
* Spec: RTN11
*/
@Override
public void connect() {
super.connect(); // resets thread pool for async requests
connection.connect();
}

Expand Down
7 changes: 7 additions & 0 deletions lib/src/main/java/io/ably/lib/rest/AblyBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ public void close() throws Exception {
http.close();
}

/**
* Internal method, used for `RealtimeClient` only
*/
protected void connect() {
http.connect();
}

/**
* A collection of Channels associated with an Ably instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.ably.lib.test.common.Helpers;
import io.ably.lib.types.AsyncHttpPaginatedResponse;
import io.ably.lib.types.ErrorInfo;
import org.junit.Ignore;
import org.junit.Test;

Expand All @@ -23,6 +26,8 @@
import io.ably.lib.types.ProtocolMessage;
import org.junit.rules.Timeout;

import java.util.concurrent.atomic.AtomicLong;

public class RealtimeConnectTest extends ParameterizedTest {

public Timeout testTimeout = Timeout.seconds(30);
Expand Down Expand Up @@ -248,4 +253,35 @@ public void close_when_connecting() {
}
}

@Test
public void reopened_connection_rest_works() throws Exception {
try (AblyRealtime realtimeClient = new AblyRealtime(createOptions(testVars.keys[0].keyStr))) {
realtimeClient.close();
realtimeClient.connect();

long timestamp = System.currentTimeMillis();
AtomicLong ablyTime = new AtomicLong(0);
Helpers.CompletionWaiter waiter = new Helpers.CompletionWaiter();

realtimeClient.requestAsync("GET", "/time", null, null, null, new AsyncHttpPaginatedResponse.Callback() {
@Override
public void onResponse(AsyncHttpPaginatedResponse response) {
ablyTime.set(response.items()[0].getAsLong());
waiter.onSuccess();
}

@Override
public void onError(ErrorInfo reason) {
waiter.onError(reason);
}
});

waiter.waitFor();

long thirtySeconds = 30_000L;

assertTrue(ablyTime.get() > timestamp - thirtySeconds && ablyTime.get() < timestamp + thirtySeconds);
}
}

}
Loading