diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 8e7c99a63..5c29a0dea 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -64,14 +64,6 @@ public AblyRealtime(ClientOptions options) throws AblyException { this.channels = channels; connection = new Connection(this, channels, platformAgentProvider); - /* remove all channels when the connection is closed, to avoid stalled state */ - connection.on(ConnectionEvent.closed, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) { - channels.clear(); - } - }); - if (!StringUtils.isNullOrEmpty(options.recover)) { RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover); if (recoveryKeyContext != null) { diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 800d7342e..028f6b23c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -681,6 +681,16 @@ public synchronized void setSuspended(ErrorInfo reason, boolean notifyStateChang } } + /** + * Internal + *
+ * (RTN11d) Resets channels back to initialized and clears error reason
+ */
+ public synchronized void setReinitialized() {
+ clearAttachTimers();
+ setState(ChannelState.initialized, null);
+ }
+
@Override
protected void apply(ChannelStateListener listener, ChannelEvent event, Object... args) {
try {
diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
index 40763209d..26bd74cb7 100644
--- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
+++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
@@ -232,8 +232,26 @@ StateIndication validateTransition(StateIndication target) {
@Override
void enact(StateIndication stateIndication, ConnectionStateChange change) {
super.enact(stateIndication, change);
+
+ if (hasConnectBeenInvokeOnClosedOrFailedState(change)) {
+ cleanMsgSerialAndErrorReason();
+ }
+
connectImpl(stateIndication);
}
+
+ @Override
+ void enactForChannel(StateIndication stateIndication, ConnectionStateChange change, Channel channel) {
+ // (RTN11b)
+ if (change.previous == ConnectionState.closing) {
+ channel.setConnectionClosed(REASON_CLOSED);
+ }
+
+ // (RTN11d)
+ if (hasConnectBeenInvokeOnClosedOrFailedState(change)) {
+ channel.setReinitialized();
+ }
+ }
}
/**************************************************
@@ -1559,6 +1577,20 @@ private void connectImpl(StateIndication request) {
}
}
+ /**
+ * (RTN11d)
+ */
+ private void cleanMsgSerialAndErrorReason() {
+ this.msgSerial = 0;
+ this.connection.reason = null;
+ }
+
+ private boolean hasConnectBeenInvokeOnClosedOrFailedState(ConnectionStateChange change) {
+ return change.previous == ConnectionState.failed
+ || change.previous == ConnectionState.closed
+ || change.previous == ConnectionState.closing;
+ }
+
/**
* Close any existing transport
* @param shouldAwaitConnection true if `CONNECTING` state, moves immediately to `CLOSING`
diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
index e6c8b8a6a..98605f775 100644
--- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
+++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
@@ -979,6 +979,7 @@ public void attach_success_callback_for_channel_in_failed_state() {
assertEquals("Simulated connection failure", channel.reason.message);
ably.connect();
+ new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
Helpers.CompletionWaiter attachListener = new Helpers.CompletionWaiter();
channel.attach(attachListener);
@@ -2462,6 +2463,101 @@ public void detach_message_to_released_channel_is_dropped() throws AblyException
}
}
+ /*
+ * Spec: RTN11d
+ * Checks that all channels become if the state is CLOSED transitions all the channels to
+ * INITIALIZED and unsets:
+ * - RealtimeChannel.errorReason
+ * - Connection.errorReason
+ * - msgSerial
+ */
+ @Test
+ public void connect_on_closed_client_should_reinitialize_channels() throws AblyException {
+ ClientOptions opts = createOptions(testVars.keys[0].keyStr);
+ try (AblyRealtime ably = new AblyRealtime(opts)) {
+
+ /* wait until connected */
+ new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
+ assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);
+
+ /* create a channel and attach */
+ final Channel channel = ably.channels.get("channel");
+ channel.attach();
+ new ChannelWaiter(channel).waitFor(ChannelState.attached);
+ assertEquals("Verify attached state reached", channel.state, ChannelState.attached);
+
+ /* push a message to increase msgSerial */
+ channel.publish("test", "test");
+ assertEquals(1, ably.connection.connectionManager.msgSerial);
+
+ ably.close();
+ new ChannelWaiter(channel).waitFor(ChannelState.detached);
+ assertEquals(ConnectionState.closed, ably.connection.state);
+ assertEquals(1, ably.connection.connectionManager.msgSerial);
+
+ ably.connect();
+
+ new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
+ assertEquals(ChannelState.initialized, channel.state);
+
+ assertNull(channel.reason);
+ assertNull(ably.connection.reason);
+ assertEquals(ChannelState.initialized, channel.state);
+ assertEquals(0, ably.connection.connectionManager.msgSerial);
+ }
+ }
+
+ /*
+ * Spec: RTN11b
+ * Checks that all channels become if the state is CLOSING transitions all the channels to
+ * INITIALIZED and unsets:
+ * - RealtimeChannel.errorReason
+ * - Connection.errorReason
+ * - msgSerial
+ */
+ @Test
+ public void connect_on_closing_client_should_reinitialize_channels() throws AblyException {
+ ClientOptions opts = createOptions(testVars.keys[0].keyStr);
+ try (AblyRealtime ably = new AblyRealtime(opts)) {
+
+ /* wait until connected */
+ (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected);
+ assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);
+
+ /* create a channel and attach */
+ final Channel channel = ably.channels.get("channel");
+ channel.attach();
+ new ChannelWaiter(channel).waitFor(ChannelState.attached);
+ assertEquals("Verify attached state reached", channel.state, ChannelState.attached);
+
+ /* push a message to increase msgSerial */
+ channel.publish("test", "test");
+ assertEquals(1, ably.connection.connectionManager.msgSerial);
+
+ List