diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 8e7c99a63..5c29a0dea 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -64,14 +64,6 @@ public AblyRealtime(ClientOptions options) throws AblyException { this.channels = channels; connection = new Connection(this, channels, platformAgentProvider); - /* remove all channels when the connection is closed, to avoid stalled state */ - connection.on(ConnectionEvent.closed, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) { - channels.clear(); - } - }); - if (!StringUtils.isNullOrEmpty(options.recover)) { RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover); if (recoveryKeyContext != null) { diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 800d7342e..028f6b23c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -681,6 +681,16 @@ public synchronized void setSuspended(ErrorInfo reason, boolean notifyStateChang } } + /** + * Internal + *

+ * (RTN11d) Resets channels back to initialized and clears error reason + */ + public synchronized void setReinitialized() { + clearAttachTimers(); + setState(ChannelState.initialized, null); + } + @Override protected void apply(ChannelStateListener listener, ChannelEvent event, Object... args) { try { diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 40763209d..26bd74cb7 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -232,8 +232,26 @@ StateIndication validateTransition(StateIndication target) { @Override void enact(StateIndication stateIndication, ConnectionStateChange change) { super.enact(stateIndication, change); + + if (hasConnectBeenInvokeOnClosedOrFailedState(change)) { + cleanMsgSerialAndErrorReason(); + } + connectImpl(stateIndication); } + + @Override + void enactForChannel(StateIndication stateIndication, ConnectionStateChange change, Channel channel) { + // (RTN11b) + if (change.previous == ConnectionState.closing) { + channel.setConnectionClosed(REASON_CLOSED); + } + + // (RTN11d) + if (hasConnectBeenInvokeOnClosedOrFailedState(change)) { + channel.setReinitialized(); + } + } } /************************************************** @@ -1559,6 +1577,20 @@ private void connectImpl(StateIndication request) { } } + /** + * (RTN11d) + */ + private void cleanMsgSerialAndErrorReason() { + this.msgSerial = 0; + this.connection.reason = null; + } + + private boolean hasConnectBeenInvokeOnClosedOrFailedState(ConnectionStateChange change) { + return change.previous == ConnectionState.failed + || change.previous == ConnectionState.closed + || change.previous == ConnectionState.closing; + } + /** * Close any existing transport * @param shouldAwaitConnection true if `CONNECTING` state, moves immediately to `CLOSING` diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index e6c8b8a6a..98605f775 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -979,6 +979,7 @@ public void attach_success_callback_for_channel_in_failed_state() { assertEquals("Simulated connection failure", channel.reason.message); ably.connect(); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); Helpers.CompletionWaiter attachListener = new Helpers.CompletionWaiter(); channel.attach(attachListener); @@ -2462,6 +2463,101 @@ public void detach_message_to_released_channel_is_dropped() throws AblyException } } + /* + * Spec: RTN11d + * Checks that all channels become if the state is CLOSED transitions all the channels to + * INITIALIZED and unsets: + * - RealtimeChannel.errorReason + * - Connection.errorReason + * - msgSerial + */ + @Test + public void connect_on_closed_client_should_reinitialize_channels() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + /* push a message to increase msgSerial */ + channel.publish("test", "test"); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + ably.close(); + new ChannelWaiter(channel).waitFor(ChannelState.detached); + assertEquals(ConnectionState.closed, ably.connection.state); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + ably.connect(); + + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals(ChannelState.initialized, channel.state); + + assertNull(channel.reason); + assertNull(ably.connection.reason); + assertEquals(ChannelState.initialized, channel.state); + assertEquals(0, ably.connection.connectionManager.msgSerial); + } + } + + /* + * Spec: RTN11b + * Checks that all channels become if the state is CLOSING transitions all the channels to + * INITIALIZED and unsets: + * - RealtimeChannel.errorReason + * - Connection.errorReason + * - msgSerial + */ + @Test + public void connect_on_closing_client_should_reinitialize_channels() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + /* push a message to increase msgSerial */ + channel.publish("test", "test"); + assertEquals(1, ably.connection.connectionManager.msgSerial); + + List observedChannelStates = new ArrayList<>(); + channel.on(stateChange -> observedChannelStates.add(stateChange.current)); + + List observedConnectionStates = new ArrayList<>(); + ably.connection.on(stateChange -> observedConnectionStates.add(stateChange.current)); + + ably.close(); + ably.connect(); + + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.closing); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + + assertEquals(List.of(ConnectionState.closing, ConnectionState.connecting, ConnectionState.connected), observedConnectionStates); + assertEquals(ChannelState.initialized, channel.state); + + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + + assertNull(channel.reason); + assertEquals(0, ably.connection.connectionManager.msgSerial); + assertEquals(List.of(ChannelState.detached, ChannelState.initialized, ChannelState.attaching, ChannelState.attached), observedChannelStates); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel;