From ffc9e48b8c193899c87cdd2a6c3a81ff5ed0687e Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 24 Sep 2025 16:40:27 +0100 Subject: [PATCH 1/3] [ECO-5514] fix: improve WebSocket transport lifecycle and activity management * Force-cancel hanging socket connections * Replace synchronized blocks with fine-grained locking using `activityTimerMonitor` to avoid deadlocks. * Refactor `close()` for safer access to shared fields and handle uninitialized cases gracefully. * Simplify activity timer logic and ensure consistent disposal of `WebSocketClient` and `WebSocketHandler`. --- .../ably/lib/transport/ConnectionManager.java | 10 ++ .../lib/transport/WebSocketTransport.java | 144 +++++++++++------- 2 files changed, 101 insertions(+), 53 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 89107d91e..9ec2061fa 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -857,6 +857,16 @@ public void requestState(StateIndication state) { requestState(null, state); } + /** + * Determines if the given WebSocketTransport instance is the currently active transport. + * + * @param transport the WebSocketTransport instance to check against the active transport + * @return true if the provided transport is the currently active transport, false otherwise + */ + boolean isActiveTransport(WebSocketTransport transport) { + return transport == this.transport; + } + private synchronized void requestState(ITransport transport, StateIndication stateIndication) { Log.v(TAG, "requestState(): requesting " + stateIndication.state + "; id = " + connection.id); addAction(new AsynchronousStateChangeAction(transport, stateIndication)); diff --git a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java index 69ce91a34..c8634577b 100644 --- a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java +++ b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java @@ -52,8 +52,11 @@ public class WebSocketTransport implements ITransport { private ConnectListener connectListener; private WebSocketClient webSocketClient; private final WebSocketEngine webSocketEngine; + private WebSocketHandler webSocketHandler; private boolean activityCheckTurnedOff = false; + private boolean connectHasBeenCalled = false; + /****************** * protected constructor ******************/ @@ -94,8 +97,13 @@ private static WebSocketEngine createWebSocketEngine(TransportParams params) { * ITransport methods ******************/ + /** + * Connect is called once when we create transport; + * after transport is closed, we never call `connect` again + */ @Override public void connect(ConnectListener connectListener) { + ensureConnectCalledOnce(); this.connectListener = connectListener; try { boolean isTls = params.options.tls; @@ -107,9 +115,8 @@ public void connect(ConnectListener connectListener) { wsUri = HttpUtils.encodeParams(wsUri, connectParams); Log.d(TAG, "connect(); wsUri = " + wsUri); - synchronized (this) { - webSocketClient = this.webSocketEngine.create(wsUri, new WebSocketHandler(this::receive)); - } + webSocketHandler = new WebSocketHandler(this::receive); + webSocketClient = this.webSocketEngine.create(wsUri, webSocketHandler); webSocketClient.connect(); } catch (AblyException e) { Log.e(TAG, "Unexpected exception attempting connection; wsUri = " + wsUri, e); @@ -120,14 +127,36 @@ public void connect(ConnectListener connectListener) { } } + /** + * `connect()` can't be called more than once + */ + private synchronized void ensureConnectCalledOnce() { + if (connectHasBeenCalled) throw new IllegalStateException("WebSocketTransport is already initialized"); + connectHasBeenCalled = true; + } + @Override public void close() { Log.d(TAG, "close()"); - synchronized (this) { - if (webSocketClient != null) { - webSocketClient.close(); - webSocketClient = null; - } + // Take local snapshots of the shared references. Callback threads (e.g., onClose) + // may concurrently set these fields to null. + // + // Intentionally avoid synchronizing here: + // - The WebSocket library may invoke our WebSocketHandler while holding its own + // internal locks. + // - If close() also acquired a lock on WebSocketTransport, we could invert the + // lock order and create a circular wait (deadlock): close() waits for the WS + // library to release its lock, while the WS library waits for a lock on + // WebSocketTransport. + final WebSocketClient client = webSocketClient; + final WebSocketHandler handler = webSocketHandler; + if (client != null && handler != null) { + // Record activity so the activity timer remains armed. If a graceful close + // stalls, the timer can detect inactivity and force-cancel the socket. + handler.flagActivity(); + client.close(); + } else { + Log.w(TAG, "close() called on uninitialized or already closed transport"); } } @@ -191,6 +220,11 @@ public String toString() { public String getURL() { return wsUri; } + + private boolean isActiveTransport() { + return connectionManager.isActiveTransport(this); + } + //interface to transfer Protocol message from websocket interface WebSocketReceiver { void onMessage(ProtocolMessage protocolMessage) throws AblyException; @@ -217,10 +251,15 @@ class WebSocketHandler implements WebSocketListener { * WsClient private members ***************************/ - private Timer timer = new Timer(); + private final Timer timer = new Timer(); private TimerTask activityTimerTask = null; private long lastActivityTime; + /** + * Monitor for activity timer events + */ + private final Object activityTimerMonitor = new Object(); + WebSocketHandler(WebSocketReceiver receiver) { this.receiver = receiver; } @@ -318,66 +357,65 @@ public void onOldJavaVersionDetected(Throwable throwable) { Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", throwable); } - private synchronized void dispose() { - /* dispose timer */ - try { - timer.cancel(); - timer = null; - } catch (IllegalStateException e) { - } + private void dispose() { + timer.cancel(); } - private synchronized void flagActivity() { - lastActivityTime = System.currentTimeMillis(); - connectionManager.setLastActivity(lastActivityTime); - if (activityTimerTask == null && connectionManager.maxIdleInterval != 0 && !activityCheckTurnedOff) { - /* No timer currently running because previously there was no - * maxIdleInterval configured, but now there is a - * maxIdleInterval configured. Call checkActivity so a timer - * gets started. This happens when flagActivity gets called - * just after processing the connect message that configures - * maxIdleInterval. */ - checkActivity(); + private void flagActivity() { + if (isActiveTransport()) { + lastActivityTime = System.currentTimeMillis(); + connectionManager.setLastActivity(lastActivityTime); } + + if (connectionManager.maxIdleInterval == 0) { + Log.v(TAG, "checkActivity: turned off because maxIdleInterval is 0"); + return; + } + + if (activityCheckTurnedOff) { + Log.v(TAG, "checkActivity: turned off for test purpose"); + return; + } + + checkActivity(); } - private synchronized void checkActivity() { + private void checkActivity() { long timeout = getActivityTimeout(); + if (timeout == 0) { Log.v(TAG, "checkActivity: infinite timeout"); return; } - // Check if timer already running - if (activityTimerTask != null) { - return; + synchronized (activityTimerMonitor) { + // Check if timer already running + if (activityTimerTask == null) { + // Start the activity timer task + startActivityTimer(timeout + 100); + } } - - // Start the activity timer task - startActivityTimer(timeout + 100); } - private synchronized void startActivityTimer(long timeout) { - if (activityTimerTask == null) { - schedule((activityTimerTask = new TimerTask() { - public void run() { - try { - onActivityTimerExpiry(); - } catch (Throwable t) { - Log.e(TAG, "Unexpected exception in activity timer handler", t); - } + private void startActivityTimer(long timeout) { + activityTimerTask = new TimerTask() { + public void run() { + try { + onActivityTimerExpiry(); + } catch (Exception exception) { + Log.e(TAG, "Unexpected exception in activity timer handler", exception); + webSocketClient.cancel(ABNORMAL_CLOSE, "Activity timer closed unexpectedly"); } - }), timeout); - } + } + }; + schedule(activityTimerTask, timeout); } - private synchronized void schedule(TimerTask task, long delay) { - if (timer != null) { - try { - timer.schedule(task, delay); - } catch (IllegalStateException ise) { - Log.e(TAG, "Unexpected exception scheduling activity timer", ise); - } + private void schedule(TimerTask task, long delay) { + try { + timer.schedule(task, delay); + } catch (IllegalStateException ise) { + Log.w(TAG, "Timer has already been canceled", ise); } } @@ -392,7 +430,7 @@ private void onActivityTimerExpiry() { return; } - synchronized (this) { + synchronized (activityTimerMonitor) { activityTimerTask = null; // Otherwise, we've had some activity, restart the timer for the next timeout Log.v(TAG, "onActivityTimerExpiry: ok"); From a6f0f7583406ddd8b91b9728c846783e00e16db3 Mon Sep 17 00:00:00 2001 From: evgeny Date: Thu, 25 Sep 2025 14:56:59 +0100 Subject: [PATCH 2/3] test: add unit tests for WebSocketTransport lifecycle and robustness - Test edge cases like multiple `connect()` calls, forced closures, and redundant `onClose` events. --- .../ably/lib/transport/ConnectionManager.java | 8 + .../lib/transport/WebSocketTransport.java | 6 +- .../java/io/ably/lib/test/common/Helpers.java | 10 ++ .../lib/transport/WebSocketTransportTest.java | 153 ++++++++++++++++++ 4 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 lib/src/test/java/io/ably/lib/transport/WebSocketTransportTest.java diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 9ec2061fa..6ce013b7c 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -31,6 +31,7 @@ import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ConnectionDetails; import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Param; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.ProtocolSerializer; import io.ably.lib.util.Log; @@ -857,6 +858,13 @@ public void requestState(StateIndication state) { requestState(null, state); } + /** + * Get query params representing the current authentication method and credentials. + */ + Param[] getAuthParams() throws AblyException { + return ably.auth.getAuthParams(); + } + /** * Determines if the given WebSocketTransport instance is the currently active transport. * diff --git a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java index c8634577b..d417cd23c 100644 --- a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java +++ b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java @@ -109,7 +109,7 @@ public void connect(ConnectListener connectListener) { boolean isTls = params.options.tls; String wsScheme = isTls ? "wss://" : "ws://"; wsUri = wsScheme + params.host + ':' + params.port + "/"; - Param[] authParams = connectionManager.ably.auth.getAuthParams(); + Param[] authParams = connectionManager.getAuthParams(); Param[] connectParams = params.getConnectParams(authParams); if (connectParams.length > 0) wsUri = HttpUtils.encodeParams(wsUri, connectParams); @@ -415,7 +415,7 @@ private void schedule(TimerTask task, long delay) { try { timer.schedule(task, delay); } catch (IllegalStateException ise) { - Log.w(TAG, "Timer has already been canceled", ise); + Log.w(TAG, "Timer has already has been canceled", ise); } } @@ -439,7 +439,7 @@ private void onActivityTimerExpiry() { } private long getActivityTimeout() { - return connectionManager.maxIdleInterval + connectionManager.ably.options.realtimeRequestTimeout; + return connectionManager.maxIdleInterval + params.options.realtimeRequestTimeout; } } diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index 5b0f328c8..724e818af 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -969,6 +969,16 @@ public static boolean equalNullableStrings(String one, String two) { return (one == null) ? (two == null) : one.equals(two); } + public static void setPrivateField(Object object, String fieldName, Object value) { + try { + Field connectionStateField = object.getClass().getDeclaredField(fieldName); + connectionStateField.setAccessible(true); + connectionStateField.set(object, value); + } catch (Exception e) { + fail("Failed accessing " + fieldName + " with error " + e); + } + } + public static class RawHttpRequest { public String id; public URL url; diff --git a/lib/src/test/java/io/ably/lib/transport/WebSocketTransportTest.java b/lib/src/test/java/io/ably/lib/transport/WebSocketTransportTest.java new file mode 100644 index 000000000..c9baad554 --- /dev/null +++ b/lib/src/test/java/io/ably/lib/transport/WebSocketTransportTest.java @@ -0,0 +1,153 @@ +package io.ably.lib.transport; + +import io.ably.lib.network.WebSocketClient; +import io.ably.lib.network.WebSocketEngine; +import io.ably.lib.network.WebSocketListener; +import io.ably.lib.test.common.Helpers; +import io.ably.lib.test.util.EmptyPlatformAgentProvider; +import io.ably.lib.transport.ITransport.TransportParams; +import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.Param; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for WebSocketTransport, specifically testing activity timer behavior + * when WebSocket close operations get stuck or fail to trigger onClose handlers. + */ +public class WebSocketTransportTest { + + private ConnectionManager mockConnectionManager; + + private WebSocketEngine mockEngine; + + private WebSocketTransport transport; + + private WebSocketClient mockWebSocketClient; + + private TransportParams transportParams; + + @Before + public void setUp() throws Exception { + mockConnectionManager = mock(ConnectionManager.class); + mockEngine = mock(WebSocketEngine.class); + mockWebSocketClient = mock(WebSocketClient.class); + when(mockEngine.isPingListenerSupported()).thenReturn(true); + when(mockEngine.create(any(), any())).thenReturn(mockWebSocketClient); + when(mockConnectionManager.getAuthParams()).thenReturn(new Param[]{}); + + mockConnectionManager.maxIdleInterval = 10; + + // Setup transport params + transportParams = new TransportParams(new ClientOptions(), new EmptyPlatformAgentProvider()); + transportParams.host = "realtime.ably.io"; + transportParams.port = 443; + transportParams.options.realtimeRequestTimeout = 10; + } + + private WebSocketTransport createWebSocketTransport() { + WebSocketTransport transport = new WebSocketTransport(transportParams, mockConnectionManager); + Helpers.setPrivateField(transport, "webSocketEngine", mockEngine); + return transport; + } + + @Test + public void throwExceptionsIfConnectCalledTwice() { + final WebSocketTransport transport = createWebSocketTransport(); + ITransport.ConnectListener connectListener = mock(ITransport.ConnectListener.class); + transport.connect(connectListener); + assertThrows(IllegalStateException.class, () -> + transport.connect(connectListener) + ); + } + + @Test + public void shouldCallCancelIfNotClosedGracefully() { + AtomicReference webSocketListenerRef = new AtomicReference<>(); + + when(mockEngine.create(any(), any())).thenAnswer(invocation -> { + webSocketListenerRef.set(invocation.getArgumentAt(1, WebSocketListener.class)); + return mockWebSocketClient; + }); + + doAnswer(invocation -> { + webSocketListenerRef.get().onClose( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class) + ); + return null; + }).when(mockWebSocketClient).cancel(anyInt(), anyString()); + + final WebSocketTransport transport = createWebSocketTransport(); + ITransport.ConnectListener connectListener = mock(ITransport.ConnectListener.class); + transport.connect(connectListener); + transport.close(); + // check that we tried to close gracefully + verify(mockWebSocketClient).close(); + // check that we closed forcibly at the end + verify(mockWebSocketClient, timeout(1_000)).cancel(eq(1006), anyString()); + // verify that we call listener at the end + verify(connectListener).onTransportUnavailable(eq(transport), any()); + } + + /** + * `onClose` can be called twice, e.g. from activity timer force close and from manual `close()` + * It shouldn't result in any exceptions + */ + @Test + public void shouldNotThrowExceptionIfSeveralCloseEventsHappened() { + AtomicReference listenerRef = new AtomicReference<>(); + + when(mockEngine.create(any(), any())).thenAnswer(invocation -> { + listenerRef.set(invocation.getArgumentAt(1, WebSocketListener.class)); + return mockWebSocketClient; + }); + + + final WebSocketTransport transport = createWebSocketTransport(); + ITransport.ConnectListener connectListener = mock(ITransport.ConnectListener.class); + transport.connect(connectListener); + + listenerRef.get().onClose(1000, "OK"); + listenerRef.get().onClose(1006, "Abnormal close"); + + verify(connectListener, times(2)).onTransportUnavailable(eq(transport), any()); + } + + /** + * Calling `close()` on transport triggers the activity timer. + * Test checks that if it has been disposed it won't do anything. + */ + @Test + public void shouldNotThrowExceptionIfCloseCalledOnAlreadyClosedTransport() { + AtomicReference listenerRef = new AtomicReference<>(); + + when(mockEngine.create(any(), any())).thenAnswer(invocation -> { + listenerRef.set(invocation.getArgumentAt(1, WebSocketListener.class)); + return mockWebSocketClient; + }); + + + final WebSocketTransport transport = createWebSocketTransport(); + ITransport.ConnectListener connectListener = mock(ITransport.ConnectListener.class); + transport.connect(connectListener); + + listenerRef.get().onClose(1006, "Abnormal close"); + transport.close(); + verify(connectListener, timeout(1_000)).onTransportUnavailable(eq(transport), any()); + } +} From f14844a15298a34186f3cbd8c88917006634b6bc Mon Sep 17 00:00:00 2001 From: evgeny Date: Fri, 26 Sep 2025 09:57:56 +0100 Subject: [PATCH 3/3] chore: ensure thread-safe access to activity timer fields in WebSocketTransport - marked `lastActivityTime` as volatile, it's low cost, but it will ensure that the timer reads fresh value, otherwise it can read very old value (although it's very unlikely) - check activity timer before going to the synchronized block --- .../main/java/io/ably/lib/transport/ConnectionManager.java | 2 +- .../java/io/ably/lib/transport/WebSocketTransport.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 6ce013b7c..3a680fb2a 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -2020,7 +2020,7 @@ private boolean isFatalError(ErrorInfo err) { private ErrorInfo stateError; private ConnectParams pendingConnect; private boolean suppressRetry; /* for tests only; modified via reflection */ - private ITransport transport; + private volatile ITransport transport; private long suspendTime; public long msgSerial; private long lastActivity; diff --git a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java index d417cd23c..6d7c087f0 100644 --- a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java +++ b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java @@ -252,8 +252,8 @@ class WebSocketHandler implements WebSocketListener { ***************************/ private final Timer timer = new Timer(); - private TimerTask activityTimerTask = null; - private long lastActivityTime; + private volatile TimerTask activityTimerTask = null; + private volatile long lastActivityTime; /** * Monitor for activity timer events @@ -388,6 +388,9 @@ private void checkActivity() { return; } + // prevent going to the synchronized block if the timer is active + if (activityTimerTask != null) return; + synchronized (activityTimerMonitor) { // Check if timer already running if (activityTimerTask == null) {