From 728e3de80eaa348f5a6ad84c002819d5c556d8ad Mon Sep 17 00:00:00 2001 From: evgeny Date: Fri, 17 Oct 2025 12:28:31 +0100 Subject: [PATCH] fix: resolve presence re-entry on channel re-attach - Ensure presence update messages are correctly sent. - Add logging for presence updates in `Presence`, improving debuggability. - Update presence re-entry tests. --- .../io/ably/lib/realtime/ChannelBase.java | 4 +- .../java/io/ably/lib/realtime/Presence.java | 4 +- .../test/realtime/RealtimePresenceTest.java | 182 +++++++----------- .../lib/test/realtime/RealtimeResumeTest.java | 16 +- 4 files changed, 89 insertions(+), 117 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index d12114fcb..283e9879d 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -461,13 +461,13 @@ private void setAttached(ProtocolMessage message) { if(state == ChannelState.attached) { Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name)); if (!message.hasFlag(Flag.resumed)) { // RTL12 - presence.onAttached(message.hasFlag(Flag.has_presence)); emitUpdate(message.error, false); + presence.onAttached(message.hasFlag(Flag.has_presence)); } } else { - presence.onAttached(message.hasFlag(Flag.has_presence)); setState(ChannelState.attached, message.error, message.hasFlag(Flag.resumed)); + presence.onAttached(message.hasFlag(Flag.has_presence)); } } diff --git a/lib/src/main/java/io/ably/lib/realtime/Presence.java b/lib/src/main/java/io/ably/lib/realtime/Presence.java index a74d3bf50..c56297fc0 100644 --- a/lib/src/main/java/io/ably/lib/realtime/Presence.java +++ b/lib/src/main/java/io/ably/lib/realtime/Presence.java @@ -755,9 +755,11 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr case initialized: channel.attach(); case attaching: + Log.v(TAG, "updatePresence(); put message in pending presence queue"); pendingPresence.add(new QueuedPresence(msg, listener)); break; case attached: + Log.v(TAG, "updatePresence(); send message to connection manager"); ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name); message.presence = new PresenceMessage[] { msg }; ConnectionManager connectionManager = ably.connection.connectionManager; @@ -938,7 +940,7 @@ public void onSuccess() { @Override public void onError(ErrorInfo reason) { String errorString = String.format(Locale.ROOT, "Cannot automatically re-enter %s on channel %s (%s)", - item.clientId, channel.name, reason.message); + item.clientId, channel.name, reason == null ? "" : reason.message); Log.e(TAG, errorString); channel.emitUpdate(new ErrorInfo(errorString, 91004), true); } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java index 85357acd2..2534139bd 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java @@ -2250,132 +2250,94 @@ public void realtime_presence_get_throws_when_channel_failed() throws AblyExcept * * Tests RTP17, RTP19, RTP19a, RTP5f, RTP6b */ - @Ignore("FIXME: fix exception") @Test - public void realtime_presence_suspended_reenter() throws AblyException { - AblyRealtime ably = null; - try { - MockWebsocketFactory mockTransport = new MockWebsocketFactory(); - DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr); - fillInOptions(opts); - opts.transportFactory = mockTransport; + public void realtime_presence_suspended_reenter() throws Exception { + MockWebsocketFactory mockTransport = new MockWebsocketFactory(); + DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr); + fillInOptions(opts); + opts.transportFactory = mockTransport; + final String channelName = "presence_suspended_reenter" + testParams.name; + mockTransport.allowSend(); - for (int i=0; i<2; i++) { - final String channelName = "presence_suspended_reenter" + testParams.name + i; + try (AblyRealtime ably = new AblyRealtime(opts)) { - mockTransport.allowSend(); + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + connectionWaiter.waitFor(ConnectionState.connected); - ably = new AblyRealtime(opts); + final Channel channel = ably.channels.get(channelName); + channel.attach(); + ChannelWaiter channelWaiter = new ChannelWaiter(channel); + + channelWaiter.waitFor(ChannelState.attached); - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); - connectionWaiter.waitFor(ConnectionState.connected); + final String presenceData = "PRESENCE_DATA"; + + final ArrayList leaveMessages = new ArrayList<>(); + /* Subscribe for message type, test RTP6b */ + channel.presence.subscribe(Action.leave, new Presence.PresenceListener() { + @Override + public void onPresenceMessage(PresenceMessage message) { + leaveMessages.add(message); + } + }); - final Channel channel = ably.channels.get(channelName); - channel.attach(); - ChannelWaiter channelWaiter = new ChannelWaiter(channel); + channel.presence.enterClient(testClientId1, presenceData); - channelWaiter.waitFor(ChannelState.attached); + /* + * We put testClientId2 presence data into the client library presence map but we + * don't send it to the server + */ + ProtocolMessage msg = new ProtocolMessage(); + msg.connectionId = "randomConnectionId"; + msg.action = ProtocolMessage.Action.sync; + msg.channel = channelName; + msg.presence = new PresenceMessage[]{ + new PresenceMessage() {{ + action = Action.present; + id = String.format("%s:0:0", "randomConnectionId"); + timestamp = System.currentTimeMillis(); + clientId = testClientId2; + connectionId = "randomConnectionId"; + data = presenceData; + }} + }; + ably.connection.connectionManager.onMessage(null, msg); - final String presenceData = "PRESENCE_DATA"; - final String connId = ably.connection.id; + ably.connection.connectionManager.requestState(ConnectionState.suspended); + channelWaiter.waitFor(ChannelState.suspended); - /* - * On the first run to test RTP19a we don't enter client1 so the server on - * return from suspend sees no presence data and sends ATTACHED without HAS_PRESENCE - * The client then should remove all the members from the presence map and then - * re-enter client2. On the second loop run we enter client1 and receive ATTACHED with - * HAS_PRESENCE - */ - final boolean[] wrongPresenceEmitted = new boolean[] {false}; - if (i == 1) { - CompletionWaiter completionWaiter = new CompletionWaiter(); - channel.presence.enterClient(testClientId1, presenceData, completionWaiter); - completionWaiter.waitFor(); + /* + * When restoring from suspended state server will send sync message erasing + * testClientId2 record from the presence map. + */ + ably.connection.connectionManager.requestState(ConnectionState.connected); + channelWaiter.waitFor(ChannelState.attached); + long reconnectTimestamp = System.currentTimeMillis(); - // RTP5f: after this point there should be no presence event for client1 - channel.presence.subscribe(new Presence.PresenceListener() { - @Override - public void onPresenceMessage(PresenceMessage message) { - if (message.clientId.equals(testClientId1)) - wrongPresenceEmitted[0] = true; - } - }); - } + PaginatedResult presentMembers; - final ArrayList leaveMessages = new ArrayList<>(); - /* Subscribe for message type, test RTP6b */ - channel.presence.subscribe(Action.leave, new Presence.PresenceListener() { - @Override - public void onPresenceMessage(PresenceMessage message) { - leaveMessages.add(message); - } - }); - - /* - * We put testClientId2 presence data into the client library presence map but we - * don't send it to the server - */ - - mockTransport.blockSend(); - channel.presence.enterClient(testClientId2, presenceData); - - ProtocolMessage msg = new ProtocolMessage(); - msg.connectionId = connId; - msg.action = ProtocolMessage.Action.sync; - msg.channel = channelName; - msg.presence = new PresenceMessage[]{ - new PresenceMessage() {{ - action = Action.present; - id = String.format("%s:0:0", connId); - timestamp = System.currentTimeMillis(); - clientId = testClientId2; - connectionId = connId; - data = presenceData; - }} - }; - ably.connection.connectionManager.onMessage(null, msg); - - mockTransport.allowSend(); - - ably.connection.connectionManager.requestState(ConnectionState.suspended); - channelWaiter.waitFor(ChannelState.suspended); - - /* - * When restoring from suspended state server will send sync message erasing - * testClientId2 record from the presence map. Client should re-send presence message - * for testClientId2 and restore its presence data. - */ - - ably.connection.connectionManager.requestState(ConnectionState.connected); - channelWaiter.waitFor(ChannelState.attached); - long reconnectTimestamp = System.currentTimeMillis(); - - try { - Thread.sleep(500); - } catch (InterruptedException e) { + try (AblyRest restClient = new AblyRest(opts)) { + long timeout = 10_000; + presentMembers = restClient.channels.get(channelName).presence.get(null); + while (presentMembers.items().length != 1 && System.currentTimeMillis() - reconnectTimestamp < timeout) { + Thread.sleep(250); + presentMembers = restClient.channels.get(channelName).presence.get(null); } + } - AblyRest ablyRest = new AblyRest(opts); - io.ably.lib.rest.Channel restChannel = ablyRest.channels.get(channelName); - assertEquals("Verify presence data is received by the server", - restChannel.presence.get(null).items().length, i==0 ? 1 : 2); + assertEquals("Verify presence data is received by the server", + 1, presentMembers.items().length); - /* In both cases we should have one leave message in the leaveMessages */ - assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1); + assertEquals(testClientId1, presentMembers.items()[0].clientId); + assertEquals(presenceData, presentMembers.items()[0].data); - PresenceMessage leaveMessage = leaveMessages.get(0); - assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave); - assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2); - assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData); - assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000); + assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1); - /* According to RTP5f there should be no presence event emitted for client1 */ - assertFalse("Verify no presence event emitted on return from suspend on SYNC for client1", - wrongPresenceEmitted[0]); - } - } finally { - if(ably != null) - ably.close(); + PresenceMessage leaveMessage = leaveMessages.get(0); + assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave); + assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2); + assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData); + assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000); } } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 068054528..eceb070cb 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -988,15 +990,21 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { System.out.println("presence_resume_test: sent message with client: "+presenceMessage.clientId +" " + " action:"+presenceMessage.action); } - assertEquals("Second round of messages has incorrect size", 6, transport.getSentPresenceMessages().size()); + + Set sentClientIds = transport.getSentPresenceMessages().stream() + .map(it -> it.clientId).collect(Collectors.toSet()); + + assertEquals("Second round of messages has incorrect size", + 9, + sentClientIds.size() + ); //make sure they were sent with correct client ids final Map sentPresenceMap = new HashMap<>(); for (PresenceMessage presenceMessage: transport.getSentPresenceMessages()){ sentPresenceMap.put(presenceMessage.clientId, presenceMessage); } - - for (int i = 3; i < 9; i++) { - assertTrue("Client id isn't there:" + clients[i], sentPresenceMap.containsKey(clients[i])); + for (String client: clients) { + assertTrue("Client id isn't there:" + client, sentPresenceMap.containsKey(client)); } } }