Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,13 @@ private void setAttached(ProtocolMessage message) {
if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
if (!message.hasFlag(Flag.resumed)) { // RTL12
presence.onAttached(message.hasFlag(Flag.has_presence));
emitUpdate(message.error, false);
presence.onAttached(message.hasFlag(Flag.has_presence));
}
}
else {
presence.onAttached(message.hasFlag(Flag.has_presence));
setState(ChannelState.attached, message.error, message.hasFlag(Flag.resumed));
presence.onAttached(message.hasFlag(Flag.has_presence));
}
}

Expand Down
4 changes: 3 additions & 1 deletion lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,11 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
case initialized:
channel.attach();
case attaching:
Log.v(TAG, "updatePresence(); put message in pending presence queue");
pendingPresence.add(new QueuedPresence(msg, listener));
break;
case attached:
Log.v(TAG, "updatePresence(); send message to connection manager");
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
message.presence = new PresenceMessage[] { msg };
ConnectionManager connectionManager = ably.connection.connectionManager;
Expand Down Expand Up @@ -938,7 +940,7 @@ public void onSuccess() {
@Override
public void onError(ErrorInfo reason) {
String errorString = String.format(Locale.ROOT, "Cannot automatically re-enter %s on channel %s (%s)",
item.clientId, channel.name, reason.message);
item.clientId, channel.name, reason == null ? "" : reason.message);
Log.e(TAG, errorString);
channel.emitUpdate(new ErrorInfo(errorString, 91004), true);
}
Expand Down
182 changes: 72 additions & 110 deletions lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2250,132 +2250,94 @@ public void realtime_presence_get_throws_when_channel_failed() throws AblyExcept
*
* Tests RTP17, RTP19, RTP19a, RTP5f, RTP6b
*/
@Ignore("FIXME: fix exception")
@Test
public void realtime_presence_suspended_reenter() throws AblyException {
AblyRealtime ably = null;
try {
MockWebsocketFactory mockTransport = new MockWebsocketFactory();
DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr);
fillInOptions(opts);
opts.transportFactory = mockTransport;
public void realtime_presence_suspended_reenter() throws Exception {
MockWebsocketFactory mockTransport = new MockWebsocketFactory();
DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr);
fillInOptions(opts);
opts.transportFactory = mockTransport;
final String channelName = "presence_suspended_reenter" + testParams.name;
mockTransport.allowSend();

for (int i=0; i<2; i++) {
final String channelName = "presence_suspended_reenter" + testParams.name + i;
try (AblyRealtime ably = new AblyRealtime(opts)) {

mockTransport.allowSend();
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
connectionWaiter.waitFor(ConnectionState.connected);

ably = new AblyRealtime(opts);
final Channel channel = ably.channels.get(channelName);
channel.attach();
ChannelWaiter channelWaiter = new ChannelWaiter(channel);

channelWaiter.waitFor(ChannelState.attached);

ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
connectionWaiter.waitFor(ConnectionState.connected);
final String presenceData = "PRESENCE_DATA";

final ArrayList<PresenceMessage> leaveMessages = new ArrayList<>();
/* Subscribe for message type, test RTP6b */
channel.presence.subscribe(Action.leave, new Presence.PresenceListener() {
@Override
public void onPresenceMessage(PresenceMessage message) {
leaveMessages.add(message);
}
});

final Channel channel = ably.channels.get(channelName);
channel.attach();
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.presence.enterClient(testClientId1, presenceData);

channelWaiter.waitFor(ChannelState.attached);
/*
* We put testClientId2 presence data into the client library presence map but we
* don't send it to the server
*/
ProtocolMessage msg = new ProtocolMessage();
msg.connectionId = "randomConnectionId";
msg.action = ProtocolMessage.Action.sync;
msg.channel = channelName;
msg.presence = new PresenceMessage[]{
new PresenceMessage() {{
action = Action.present;
id = String.format("%s:0:0", "randomConnectionId");
timestamp = System.currentTimeMillis();
clientId = testClientId2;
connectionId = "randomConnectionId";
data = presenceData;
}}
};
ably.connection.connectionManager.onMessage(null, msg);

final String presenceData = "PRESENCE_DATA";
final String connId = ably.connection.id;
ably.connection.connectionManager.requestState(ConnectionState.suspended);
channelWaiter.waitFor(ChannelState.suspended);

/*
* On the first run to test RTP19a we don't enter client1 so the server on
* return from suspend sees no presence data and sends ATTACHED without HAS_PRESENCE
* The client then should remove all the members from the presence map and then
* re-enter client2. On the second loop run we enter client1 and receive ATTACHED with
* HAS_PRESENCE
*/
final boolean[] wrongPresenceEmitted = new boolean[] {false};
if (i == 1) {
CompletionWaiter completionWaiter = new CompletionWaiter();
channel.presence.enterClient(testClientId1, presenceData, completionWaiter);
completionWaiter.waitFor();
/*
* When restoring from suspended state server will send sync message erasing
* testClientId2 record from the presence map.
*/
ably.connection.connectionManager.requestState(ConnectionState.connected);
channelWaiter.waitFor(ChannelState.attached);
long reconnectTimestamp = System.currentTimeMillis();

// RTP5f: after this point there should be no presence event for client1
channel.presence.subscribe(new Presence.PresenceListener() {
@Override
public void onPresenceMessage(PresenceMessage message) {
if (message.clientId.equals(testClientId1))
wrongPresenceEmitted[0] = true;
}
});
}
PaginatedResult<PresenceMessage> presentMembers;

final ArrayList<PresenceMessage> leaveMessages = new ArrayList<>();
/* Subscribe for message type, test RTP6b */
channel.presence.subscribe(Action.leave, new Presence.PresenceListener() {
@Override
public void onPresenceMessage(PresenceMessage message) {
leaveMessages.add(message);
}
});

/*
* We put testClientId2 presence data into the client library presence map but we
* don't send it to the server
*/

mockTransport.blockSend();
channel.presence.enterClient(testClientId2, presenceData);

ProtocolMessage msg = new ProtocolMessage();
msg.connectionId = connId;
msg.action = ProtocolMessage.Action.sync;
msg.channel = channelName;
msg.presence = new PresenceMessage[]{
new PresenceMessage() {{
action = Action.present;
id = String.format("%s:0:0", connId);
timestamp = System.currentTimeMillis();
clientId = testClientId2;
connectionId = connId;
data = presenceData;
}}
};
ably.connection.connectionManager.onMessage(null, msg);

mockTransport.allowSend();

ably.connection.connectionManager.requestState(ConnectionState.suspended);
channelWaiter.waitFor(ChannelState.suspended);

/*
* When restoring from suspended state server will send sync message erasing
* testClientId2 record from the presence map. Client should re-send presence message
* for testClientId2 and restore its presence data.
*/

ably.connection.connectionManager.requestState(ConnectionState.connected);
channelWaiter.waitFor(ChannelState.attached);
long reconnectTimestamp = System.currentTimeMillis();

try {
Thread.sleep(500);
} catch (InterruptedException e) {
try (AblyRest restClient = new AblyRest(opts)) {
long timeout = 10_000;
presentMembers = restClient.channels.get(channelName).presence.get(null);
while (presentMembers.items().length != 1 && System.currentTimeMillis() - reconnectTimestamp < timeout) {
Thread.sleep(250);
presentMembers = restClient.channels.get(channelName).presence.get(null);
}
}

AblyRest ablyRest = new AblyRest(opts);
io.ably.lib.rest.Channel restChannel = ablyRest.channels.get(channelName);
assertEquals("Verify presence data is received by the server",
restChannel.presence.get(null).items().length, i==0 ? 1 : 2);
assertEquals("Verify presence data is received by the server",
1, presentMembers.items().length);

/* In both cases we should have one leave message in the leaveMessages */
assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1);
assertEquals(testClientId1, presentMembers.items()[0].clientId);
assertEquals(presenceData, presentMembers.items()[0].data);

PresenceMessage leaveMessage = leaveMessages.get(0);
assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave);
assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2);
assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData);
assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000);
assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1);

/* According to RTP5f there should be no presence event emitted for client1 */
assertFalse("Verify no presence event emitted on return from suspend on SYNC for client1",
wrongPresenceEmitted[0]);
}
} finally {
if(ably != null)
ably.close();
PresenceMessage leaveMessage = leaveMessages.get(0);
assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave);
assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2);
assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData);
assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -988,15 +990,21 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException {
System.out.println("presence_resume_test: sent message with client: "+presenceMessage.clientId +" " +
" action:"+presenceMessage.action);
}
assertEquals("Second round of messages has incorrect size", 6, transport.getSentPresenceMessages().size());

Set<String> sentClientIds = transport.getSentPresenceMessages().stream()
.map(it -> it.clientId).collect(Collectors.toSet());

assertEquals("Second round of messages has incorrect size",
9,
sentClientIds.size()
);
//make sure they were sent with correct client ids
final Map<String,PresenceMessage> sentPresenceMap = new HashMap<>();
for (PresenceMessage presenceMessage: transport.getSentPresenceMessages()){
sentPresenceMap.put(presenceMessage.clientId, presenceMessage);
}

for (int i = 3; i < 9; i++) {
assertTrue("Client id isn't there:" + clients[i], sentPresenceMap.containsKey(clients[i]));
for (String client: clients) {
assertTrue("Client id isn't there:" + client, sentPresenceMap.containsKey(client));
}
}
}
Expand Down
Loading