Skip to content

Commit 479fd42

Browse files
committed
fix RedisPubSubTest
This test used to fail intermittently. The problem is that when the `SUBSCRIBE` command returns, there's no guarantee that the subscription has actually completed. That is only guaranteed after the subscription connection receives the `subscribe` message. The fix is to simply wait until that happens and only then run `PUBLISH` on the publish connection.
1 parent a9aed02 commit 479fd42

File tree

2 files changed

+54
-11
lines changed

2 files changed

+54
-11
lines changed

src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
import org.junit.Test;
1919
import org.junit.runner.RunWith;
2020

21-
import java.util.Set;
21+
import java.util.List;
2222
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
import static io.vertx.redis.client.Command.PSUBSCRIBE;
2525
import static io.vertx.redis.client.Command.PUBLISH;
2626
import static io.vertx.redis.client.Command.SUBSCRIBE;
2727
import static io.vertx.redis.client.Request.cmd;
28+
import static io.vertx.tests.redis.client.TestUtils.executeWhenConditionSatisfied;
2829

2930
@RunWith(VertxUnitRunner.class)
3031
public class RedisPubSubTest {
@@ -73,8 +74,12 @@ public void publishSubscribe_withHandler(TestContext test) {
7374
rule.vertx().eventBus().consumer("io.vertx.redis.news", msg -> async.countDown());
7475

7576
subConn.handler(EventBusHandler.create(rule.vertx()));
76-
subConn.send(Request.cmd(Command.SUBSCRIBE).arg("news")).onComplete(test.asyncAssertSuccess(result -> {
77-
pubConn.send(Request.cmd(Command.PUBLISH).arg("news").arg("foo"));
77+
subConn.send(Request.cmd(Command.SUBSCRIBE).arg("news")).onComplete(test.asyncAssertSuccess(subscribe -> {
78+
executeWhenConditionSatisfied(rule.vertx(), () -> async.count() == 1, () -> {
79+
pubConn.send(Request.cmd(Command.PUBLISH).arg("news").arg("foo")).onComplete(test.asyncAssertSuccess(publish -> {
80+
test.assertEquals(1, publish.toInteger());
81+
}));
82+
});
7883
}));
7984
}
8085

@@ -85,8 +90,12 @@ public void publishPSubscribe_withHandler(TestContext test) {
8590
rule.vertx().eventBus().consumer("io.vertx.redis.new*", msg -> async.countDown());
8691

8792
subConn.handler(EventBusHandler.create(rule.vertx()));
88-
subConn.send(Request.cmd(Command.PSUBSCRIBE).arg("new*")).onComplete(test.asyncAssertSuccess(result -> {
89-
pubConn.send(Request.cmd(Command.PUBLISH).arg("news").arg("foo"));
93+
subConn.send(Request.cmd(Command.PSUBSCRIBE).arg("new*")).onComplete(test.asyncAssertSuccess(subscribe -> {
94+
executeWhenConditionSatisfied(rule.vertx(), () -> async.count() == 1, () -> {
95+
pubConn.send(Request.cmd(Command.PUBLISH).arg("news").arg("foo")).onComplete(test.asyncAssertSuccess(publish -> {
96+
test.assertEquals(1, publish.toInteger());
97+
}));
98+
});
9099
}));
91100
}
92101

@@ -119,8 +128,11 @@ public void publishSubscribe_naive(TestContext test) {
119128
});
120129

121130
subConn.send(cmd(SUBSCRIBE).arg("mychannel")).onComplete(test.asyncAssertSuccess(subscribe -> {
122-
pubConn.send(cmd(PUBLISH).arg("mychannel").arg(123456))
123-
.onComplete(test.asyncAssertSuccess(test::assertNotNull));
131+
executeWhenConditionSatisfied(rule.vertx(), () -> subscribeCnt.get() == 1, () -> {
132+
pubConn.send(cmd(PUBLISH).arg("mychannel").arg(123456)).onComplete(test.asyncAssertSuccess(publish -> {
133+
test.assertEquals(1, publish.toInteger());
134+
}));
135+
});
124136
}));
125137
}
126138

@@ -152,15 +164,19 @@ public void publishPSubscribe_naive(TestContext test) {
152164
}
153165
});
154166

155-
Set<String> patterns = Set.of("A*", "B*", "C*", "D*", "E*", "F*");
167+
List<String> patterns = List.of("A*", "B*", "C*", "D*", "E*", "F*");
168+
List<String> matchingChannels = List.of("A", "B1", "Co", "DDD", "E234", "F");
156169

157170
Request psub = cmd(PSUBSCRIBE);
158171
patterns.forEach(psub::arg);
159172

160173
subConn.send(psub).onComplete(test.asyncAssertSuccess(subscribe -> {
161-
patterns.forEach(p -> {
162-
pubConn.send(cmd(PUBLISH).arg(p).arg(System.nanoTime()))
163-
.onComplete(test.asyncAssertSuccess(test::assertNotNull));
174+
executeWhenConditionSatisfied(rule.vertx(), () -> psubscribeCnt.get() == 6, () -> {
175+
for (String ch : matchingChannels) {
176+
pubConn.send(cmd(PUBLISH).arg(ch).arg(System.nanoTime())).onComplete(test.asyncAssertSuccess(publish -> {
177+
test.assertEquals(1, publish.toInteger());
178+
}));
179+
}
164180
});
165181
}));
166182
}

src/test/java/io/vertx/tests/redis/client/TestUtils.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,44 @@
22

33
import io.vertx.core.Completable;
44
import io.vertx.core.Future;
5+
import io.vertx.core.Handler;
56
import io.vertx.core.Promise;
67
import io.vertx.core.Vertx;
78

89
import java.util.UUID;
10+
import java.util.function.BooleanSupplier;
911
import java.util.function.Supplier;
1012

1113
public class TestUtils {
14+
/**
15+
* Returns a short random string that is usable as a key in Redis with very high probability
16+
* of being unique.
17+
* <p>
18+
* Implementation note: this is currently a random UUID, but that is <em>not</em> guaranteed
19+
* and may change without notice.
20+
*/
1221
public static String randomKey() {
1322
return UUID.randomUUID().toString();
1423
}
1524

25+
/**
26+
* Waits until the given {@code condition} is {@code true} and then invokes the given {@code action} once,
27+
* returning its result. Waiting is not active, it uses the Vert.x {@linkplain Vertx#setTimer(long, Handler) timer}
28+
* facility.
29+
*/
30+
public static void executeWhenConditionSatisfied(Vertx vertx, BooleanSupplier condition, Runnable action) {
31+
if (condition.getAsBoolean()) {
32+
action.run();
33+
} else {
34+
vertx.setTimer(5, ignored -> {
35+
executeWhenConditionSatisfied(vertx, condition, action);
36+
});
37+
}
38+
}
39+
40+
/**
41+
* Retries the given {@code action} until it succeeds or until the number of retries reaches the given maximum.
42+
*/
1643
public static <T> Future<T> retryUntilSuccess(Vertx vertx, Supplier<Future<T>> action, int maxRetries) {
1744
Promise<T> promise = Promise.promise();
1845
retryUntilSuccess(vertx, action, maxRetries, promise);

0 commit comments

Comments
 (0)