|
13 | 13 | import java.util.HashMap; |
14 | 14 | import java.util.List; |
15 | 15 | import java.util.Locale; |
| 16 | +import java.util.Map; |
| 17 | +import java.util.concurrent.atomic.AtomicInteger; |
| 18 | +import java.util.stream.Collectors; |
16 | 19 |
|
17 | 20 | import com.google.gson.Gson; |
18 | 21 | import com.google.gson.JsonArray; |
19 | 22 | import com.google.gson.JsonElement; |
20 | 23 | import com.google.gson.JsonObject; |
21 | 24 | import com.google.gson.JsonPrimitive; |
| 25 | +import io.ably.lib.types.ChannelOptions; |
22 | 26 | import io.ably.lib.types.MessageAction; |
23 | 27 | import io.ably.lib.types.MessageExtras; |
24 | 28 | import io.ably.lib.types.Param; |
@@ -1010,4 +1014,44 @@ public void should_have_serial_action_createdAt() throws AblyException { |
1010 | 1014 | assertNull(msgComplete.waitFor(1, 10_000)); |
1011 | 1015 | } |
1012 | 1016 | } |
| 1017 | + |
| 1018 | + @Test |
| 1019 | + public void should_not_duplicate_messages() throws Exception { |
| 1020 | + ClientOptions opts = createOptions(testVars.keys[0].keyStr); |
| 1021 | + String testChannelName = "my-channel" + System.currentTimeMillis(); |
| 1022 | + try (AblyRest rest = new AblyRest(opts)) { |
| 1023 | + final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName); |
| 1024 | + |
| 1025 | + Message[] messages = new Message[] { |
| 1026 | + new Message("name", "message 1"), |
| 1027 | + new Message("name", "message 2"), |
| 1028 | + new Message("name", "message 3"), |
| 1029 | + }; |
| 1030 | + |
| 1031 | + channel.publish(messages); |
| 1032 | + } |
| 1033 | + |
| 1034 | + try (AblyRealtime realtime = new AblyRealtime(opts)) { |
| 1035 | + final ChannelOptions options = new ChannelOptions(); |
| 1036 | + options.params = new HashMap<>(); |
| 1037 | + options.params.put("rewind", "10"); |
| 1038 | + final Channel channel = realtime.channels.get(testChannelName, options); |
| 1039 | + final CompletionWaiter completionWaiter = new CompletionWaiter(); |
| 1040 | + final AtomicInteger counter = new AtomicInteger(); |
| 1041 | + |
| 1042 | + channel.subscribe(message -> { |
| 1043 | + int value = counter.incrementAndGet(); |
| 1044 | + if (value == 3) completionWaiter.onSuccess(); |
| 1045 | + }); |
| 1046 | + |
| 1047 | + completionWaiter.waitFor(); |
| 1048 | + |
| 1049 | + assertEquals("Should be exactly 3 messages", 3, counter.get()); |
| 1050 | + |
| 1051 | + Thread.sleep(1500); |
| 1052 | + |
| 1053 | + assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get()); |
| 1054 | + } |
| 1055 | + } |
| 1056 | + |
1013 | 1057 | } |
0 commit comments