Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ public AblyRealtime(ClientOptions options) throws AblyException {
this.channels = channels;
connection = new Connection(this, channels, platformAgentProvider);

/* remove all channels when the connection is closed, to avoid stalled state */
connection.on(ConnectionEvent.closed, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) {
channels.clear();
}
});

if (!StringUtils.isNullOrEmpty(options.recover)) {
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,16 @@ public synchronized void setSuspended(ErrorInfo reason, boolean notifyStateChang
}
}

/**
* Internal
* <p>
* (RTN11d) Resets channels back to initialized and clears error reason
*/
public synchronized void setReinitialized() {
clearAttachTimers();
setState(ChannelState.initialized, null);
}

@Override
protected void apply(ChannelStateListener listener, ChannelEvent event, Object... args) {
try {
Expand Down
32 changes: 32 additions & 0 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,26 @@ StateIndication validateTransition(StateIndication target) {
@Override
void enact(StateIndication stateIndication, ConnectionStateChange change) {
super.enact(stateIndication, change);

if (hasConnectBeenInvokeOnClosedOrFailedState(change)) {
cleanMsgSerialAndErrorReason();
}

connectImpl(stateIndication);
}

@Override
void enactForChannel(StateIndication stateIndication, ConnectionStateChange change, Channel channel) {
// (RTN11b)
if (change.previous == ConnectionState.closing) {
channel.setConnectionClosed(REASON_CLOSED);
}

// (RTN11d)
if (hasConnectBeenInvokeOnClosedOrFailedState(change)) {
channel.setReinitialized();
}
}
}

/**************************************************
Expand Down Expand Up @@ -1559,6 +1577,20 @@ private void connectImpl(StateIndication request) {
}
}

/**
* (RTN11d)
*/
private void cleanMsgSerialAndErrorReason() {
this.msgSerial = 0;
this.connection.reason = null;
}

private boolean hasConnectBeenInvokeOnClosedOrFailedState(ConnectionStateChange change) {
return change.previous == ConnectionState.failed
|| change.previous == ConnectionState.closed
|| change.previous == ConnectionState.closing;
}

/**
* Close any existing transport
* @param shouldAwaitConnection true if `CONNECTING` state, moves immediately to `CLOSING`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ public void attach_success_callback_for_channel_in_failed_state() {
assertEquals("Simulated connection failure", channel.reason.message);

ably.connect();
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);

Helpers.CompletionWaiter attachListener = new Helpers.CompletionWaiter();
channel.attach(attachListener);
Expand Down Expand Up @@ -2462,6 +2463,101 @@ public void detach_message_to_released_channel_is_dropped() throws AblyException
}
}

/*
* Spec: RTN11d
* Checks that all channels become if the state is CLOSED transitions all the channels to
* INITIALIZED and unsets:
* - RealtimeChannel.errorReason
* - Connection.errorReason
* - msgSerial
*/
@Test
public void connect_on_closed_client_should_reinitialize_channels() throws AblyException {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
try (AblyRealtime ably = new AblyRealtime(opts)) {

/* wait until connected */
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);

/* create a channel and attach */
final Channel channel = ably.channels.get("channel");
channel.attach();
new ChannelWaiter(channel).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached", channel.state, ChannelState.attached);

/* push a message to increase msgSerial */
channel.publish("test", "test");
assertEquals(1, ably.connection.connectionManager.msgSerial);

ably.close();
new ChannelWaiter(channel).waitFor(ChannelState.detached);
assertEquals(ConnectionState.closed, ably.connection.state);
assertEquals(1, ably.connection.connectionManager.msgSerial);

ably.connect();

new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
assertEquals(ChannelState.initialized, channel.state);

assertNull(channel.reason);
assertNull(ably.connection.reason);
assertEquals(ChannelState.initialized, channel.state);
assertEquals(0, ably.connection.connectionManager.msgSerial);
}
}

/*
* Spec: RTN11b
* Checks that all channels become if the state is CLOSING transitions all the channels to
* INITIALIZED and unsets:
* - RealtimeChannel.errorReason
* - Connection.errorReason
* - msgSerial
*/
@Test
public void connect_on_closing_client_should_reinitialize_channels() throws AblyException {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
try (AblyRealtime ably = new AblyRealtime(opts)) {

/* wait until connected */
(new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected);
assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);

/* create a channel and attach */
final Channel channel = ably.channels.get("channel");
channel.attach();
new ChannelWaiter(channel).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached", channel.state, ChannelState.attached);

/* push a message to increase msgSerial */
channel.publish("test", "test");
assertEquals(1, ably.connection.connectionManager.msgSerial);

List<ChannelState> observedChannelStates = new ArrayList<>();
channel.on(stateChange -> observedChannelStates.add(stateChange.current));

List<ConnectionState> observedConnectionStates = new ArrayList<>();
ably.connection.on(stateChange -> observedConnectionStates.add(stateChange.current));

ably.close();
ably.connect();

new ConnectionWaiter(ably.connection).waitFor(ConnectionState.closing);
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);

assertEquals(List.of(ConnectionState.closing, ConnectionState.connecting, ConnectionState.connected), observedConnectionStates);
assertEquals(ChannelState.initialized, channel.state);

channel.attach();
new ChannelWaiter(channel).waitFor(ChannelState.attached);

assertNull(channel.reason);
assertEquals(0, ably.connection.connectionManager.msgSerial);
assertEquals(List.of(ChannelState.detached, ChannelState.initialized, ChannelState.attaching, ChannelState.attached), observedChannelStates);
}
}

static class DetachingProtocolListener implements DebugOptions.RawProtocolListener {

public Channel theChannel;
Expand Down
Loading