|
1 | 1 | package io.vertx.tests.redis.client; |
2 | 2 |
|
| 3 | +import io.vertx.core.json.JsonObject; |
3 | 4 | import io.vertx.ext.unit.Async; |
4 | 5 | import io.vertx.ext.unit.TestContext; |
5 | 6 | import io.vertx.ext.unit.junit.RunTestOnContext; |
|
18 | 19 | import org.junit.Test; |
19 | 20 | import org.junit.runner.RunWith; |
20 | 21 |
|
| 22 | +import java.util.concurrent.atomic.AtomicInteger; |
| 23 | + |
| 24 | +import static io.vertx.tests.redis.client.TestUtils.executeWhenConditionSatisfied; |
| 25 | + |
21 | 26 | @RunWith(VertxUnitRunner.class) |
22 | 27 | public class RedisPubSubClusterTest { |
23 | 28 |
|
@@ -69,22 +74,29 @@ public void testSubscribeMultipleTimes(TestContext should) { |
69 | 74 | }); |
70 | 75 | } |
71 | 76 |
|
72 | | - rule.vertx().eventBus().consumer("io.vertx.redis." + channel, msg -> { |
73 | | - rule.vertx().eventBus().publish(channel, msg.body()); |
| 77 | + AtomicInteger subs = new AtomicInteger(); |
| 78 | + rule.vertx().eventBus().<JsonObject>consumer("io.vertx.redis." + channel, msg -> { |
| 79 | + if ("subscribe".equals(msg.body().getString("type"))) { |
| 80 | + subs.incrementAndGet(); |
| 81 | + } |
| 82 | + if ("message".equals(msg.body().getString("type"))) { |
| 83 | + rule.vertx().eventBus().publish(channel, msg.body()); |
| 84 | + } |
74 | 85 | }); |
75 | 86 |
|
76 | 87 | subConn.handler(EventBusHandler.create(rule.vertx())); |
77 | 88 | subUnsub(channel, N, should.async(N)); |
78 | 89 |
|
79 | | - rule.vertx().setTimer(1000, id -> { |
80 | | - pubConn.send(Request.cmd(Command.PUBLISH).arg(channel).arg("hello")) |
81 | | - .onComplete(should.asyncAssertSuccess()); |
| 90 | + executeWhenConditionSatisfied(rule.vertx(), () -> subs.get() == 10, () -> { |
| 91 | + pubConn.send(Request.cmd(Command.PUBLISH).arg(channel).arg("hello")).onComplete(should.asyncAssertSuccess(publish -> { |
| 92 | + should.assertEquals(1, publish.toInteger()); |
| 93 | + })); |
82 | 94 | }); |
83 | 95 | } |
84 | 96 |
|
85 | 97 | private void subUnsub(String channel, int attempts, Async testSub) { |
86 | | - subConn.send(Request.cmd(Command.UNSUBSCRIBE).arg(channel)).onComplete(unreply -> { |
87 | | - subConn.send(Request.cmd(Command.SUBSCRIBE).arg(channel)).onComplete(reply -> { |
| 98 | + subConn.send(Request.cmd(Command.UNSUBSCRIBE).arg(channel)).onComplete(unsub -> { |
| 99 | + subConn.send(Request.cmd(Command.SUBSCRIBE).arg(channel)).onComplete(sub -> { |
88 | 100 | testSub.countDown(); |
89 | 101 | if (attempts > 1) { |
90 | 102 | subUnsub(channel, attempts - 1, testSub); |
|
0 commit comments