Skip to content

Commit f76f5a8

Browse files
authored
Merge pull request #1173 from ably/ECO-5621/fix-orphan-timer-threads
fix: orphan Timer instances on release
2 parents 112e5f2 + 935a265 commit f76f5a8

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Set;
99
import java.util.Timer;
1010
import java.util.TimerTask;
11+
import java.util.concurrent.atomic.AtomicBoolean;
1112

1213
import io.ably.lib.http.BasePaginatedQuery;
1314
import io.ably.lib.http.Http;
@@ -95,7 +96,7 @@ public abstract class ChannelBase extends EventEmitter<ChannelEvent, ChannelStat
9596
/**
9697
* @see #markAsReleased()
9798
*/
98-
private boolean released = false;
99+
private AtomicBoolean released = new AtomicBoolean(false);
99100

100101
@Nullable private final LiveObjectsPlugin liveObjectsPlugin;
101102

@@ -330,8 +331,8 @@ public void detach() throws AblyException {
330331
/**
331332
* Mark channel as released that means we can't perform any operation on this channel anymore
332333
*/
333-
public synchronized void markAsReleased() {
334-
released = true;
334+
public void markAsReleased() {
335+
released.set(true);
335336
}
336337

337338
/**
@@ -395,7 +396,7 @@ private void sendDetachMessage(CompletionListener listener) throws AblyException
395396
}
396397

397398
this.attachResume = false;
398-
if (released) {
399+
if (released.get()) {
399400
setDetached(null);
400401
} else {
401402
setState(ChannelState.detaching, null);
@@ -572,7 +573,7 @@ public void run() {
572573
}
573574

574575
private void checkChannelIsNotReleased() {
575-
if (released) throw new IllegalStateException("Unable to perform any operation on released channel");
576+
if (released.get()) throw new IllegalStateException("Unable to perform any operation on released channel");
576577
}
577578

578579
/**
@@ -621,17 +622,17 @@ synchronized private void detachWithTimeout(final CompletionListener listener) {
621622
final ChannelState originalState = state;
622623
Timer currentDetachTimer;
623624
try {
624-
currentDetachTimer = new Timer();
625+
currentDetachTimer = released.get() ? null : new Timer();
625626
} catch(Throwable t) {
626627
/* an exception instancing the timer can arise because the runtime is exiting */
627628
callCompletionListenerError(listener, ErrorInfo.fromThrowable(t));
628629
return;
629630
}
630-
attachTimer = released ? null : currentDetachTimer;
631+
attachTimer = currentDetachTimer;
631632

632633
try {
633634
// If channel has been released, completionListener won't be invoked anyway
634-
CompletionListener completionListener = released ? null : new CompletionListener() {
635+
CompletionListener completionListener = released.get() ? null : new CompletionListener() {
635636
@Override
636637
public void onSuccess() {
637638
clearAttachTimers();

lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Comparator;
3737
import java.util.HashMap;
3838
import java.util.List;
39+
import java.util.Set;
3940

4041
import static org.hamcrest.Matchers.equalTo;
4142
import static org.hamcrest.Matchers.is;
@@ -2573,6 +2574,29 @@ public void connect_on_closing_client_should_reinitialize_channels() throws Ably
25732574
}
25742575
}
25752576

2577+
@Test
2578+
public void release_should_not_prevent_graceful_test_end() throws Exception {
2579+
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
2580+
try (AblyRealtime ably = new AblyRealtime(opts)) {
2581+
ably.channels.get("channel_should_be_released");
2582+
ably.channels.release("channel_should_be_released");
2583+
}
2584+
2585+
Set<Thread> threads = Thread.getAllStackTraces().keySet();
2586+
List<Thread> timers = threads.stream()
2587+
.filter(t -> t.getName().startsWith("Timer-"))
2588+
.toList();
2589+
2590+
long timeout = 1_000;
2591+
long start = System.currentTimeMillis();
2592+
2593+
while(timers.stream().anyMatch(Thread::isAlive) && System.currentTimeMillis() - start < timeout) {
2594+
Thread.sleep(100);
2595+
}
2596+
2597+
assertFalse("Found orphan Timer threads", timers.stream().anyMatch(Thread::isAlive));
2598+
}
2599+
25762600
/**
25772601
* This test ensures that when the connection is manually triggered, the channel can successfully
25782602
* transition to the attached state without interference or rewriting of its immediate attach action.

0 commit comments

Comments
 (0)