From ef118806bda69f6558a47dd2894f92370c3a6aa2 Mon Sep 17 00:00:00 2001 From: evgeny Date: Fri, 14 Mar 2025 13:28:52 +0000 Subject: [PATCH] [ECO-5246] fix: Realtime Client Reconnection Logic Previously, we cleared all channels on the close event, causing all previously acquired channels to become orphaned. We have now removed this logic and fixed the reconnection behavior to align with the spec --- .../io/ably/lib/realtime/AblyRealtime.java | 8 -- .../io/ably/lib/realtime/ChannelBase.java | 10 ++ .../ably/lib/transport/ConnectionManager.java | 32 +++++++ .../test/realtime/RealtimeChannelTest.java | 96 +++++++++++++++++++ 4 files changed, 138 insertions(+), 8 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 8e7c99a63..5c29a0dea 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -64,14 +64,6 @@ public AblyRealtime(ClientOptions options) throws AblyException { this.channels = channels; connection = new Connection(this, channels, platformAgentProvider); - /* remove all channels when the connection is closed, to avoid stalled state */ - connection.on(ConnectionEvent.closed, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) { - channels.clear(); - } - }); - if (!StringUtils.isNullOrEmpty(options.recover)) { RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover); if (recoveryKeyContext != null) { diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 800d7342e..028f6b23c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -681,6 +681,16 @@ public synchronized void setSuspended(ErrorInfo reason, boolean notifyStateChang } } + /** + * Internal + *

+ * (RTN11d) Resets channels back to initialized and clears error reason + */ + public synchronized void setReinitialized() { + clearAttachTimers(); + setState(ChannelState.initialized, null); + } + @Override protected void apply(ChannelStateListener listener, ChannelEvent event, Object... args) { try { diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 40763209d..26bd74cb7 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -232,8 +232,26 @@ StateIndication validateTransition(StateIndication target) { @Override void enact(StateIndication stateIndication, ConnectionStateChange change) { super.enact(stateIndication, change); + + if (hasConnectBeenInvokeOnClosedOrFailedState(change)) { + cleanMsgSerialAndErrorReason(); + } + connectImpl(stateIndication); } + + @Override + void enactForChannel(StateIndication stateIndication, ConnectionStateChange change, Channel channel) { + // (RTN11b) + if (change.previous == ConnectionState.closing) { + channel.setConnectionClosed(REASON_CLOSED); + } + + // (RTN11d) + if (hasConnectBeenInvokeOnClosedOrFailedState(change)) { + channel.setReinitialized(); + } + } } /************************************************** @@ -1559,6 +1577,20 @@ private void connectImpl(StateIndication request) { } } + /** + * (RTN11d) + */ + private void cleanMsgSerialAndErrorReason() { + this.msgSerial = 0; + this.connection.reason = null; + } + + private boolean hasConnectBeenInvokeOnClosedOrFailedState(ConnectionStateChange change) { + return change.previous == ConnectionState.failed + || change.previous == ConnectionState.closed + || change.previous == ConnectionState.closing; + } + /** * Close any existing transport * @param shouldAwaitConnection true if `CONNECTING` state, moves immediately to `CLOSING` diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index e6c8b8a6a..98605f775 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -979,6 +979,7 @@ public void attach_success_callback_for_channel_in_failed_state() { assertEquals("Simulated connection failure", channel.reason.message); ably.connect(); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); Helpers.CompletionWaiter attachListener = new Helpers.CompletionWaiter(); channel.attach(attachListener); @@ -2462,6 +2463,101 @@ public void detach_message_to_released_channel_is_dropped() throws AblyException } } + /* + * Spec: RTN11d + * Checks that all channels become if the state is CLOSED transitions all the channels to + * INITIALIZED and unsets: + * - RealtimeChannel.errorReason + * - Connection.errorReason + * - msgSerial + */ + @Test + public void connect_on_closed_client_should_reinitialize_channels() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + /* push a message to increase msgSerial */ + channel.publish("test", "test"); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + ably.close(); + new ChannelWaiter(channel).waitFor(ChannelState.detached); + assertEquals(ConnectionState.closed, ably.connection.state); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + ably.connect(); + + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals(ChannelState.initialized, channel.state); + + assertNull(channel.reason); + assertNull(ably.connection.reason); + assertEquals(ChannelState.initialized, channel.state); + assertEquals(0, ably.connection.connectionManager.msgSerial); + } + } + + /* + * Spec: RTN11b + * Checks that all channels become if the state is CLOSING transitions all the channels to + * INITIALIZED and unsets: + * - RealtimeChannel.errorReason + * - Connection.errorReason + * - msgSerial + */ + @Test + public void connect_on_closing_client_should_reinitialize_channels() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + /* push a message to increase msgSerial */ + channel.publish("test", "test"); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + List observedChannelStates = new ArrayList<>(); + channel.on(stateChange -> observedChannelStates.add(stateChange.current)); + + List observedConnectionStates = new ArrayList<>(); + ably.connection.on(stateChange -> observedConnectionStates.add(stateChange.current)); + + ably.close(); + ably.connect(); + + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.closing); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + + assertEquals(List.of(ConnectionState.closing, ConnectionState.connecting, ConnectionState.connected), observedConnectionStates); + assertEquals(ChannelState.initialized, channel.state); + + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + + assertNull(channel.reason); + assertEquals(0, ably.connection.connectionManager.msgSerial); + assertEquals(List.of(ChannelState.detached, ChannelState.initialized, ChannelState.attaching, ChannelState.attached), observedChannelStates); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel;