Skip to content

Commit f9157d3

Browse files
authored
Merge pull request #448 from vert-x3/447-cluster-batch-command-same-slot-calculation-is-wrong
Fixed correct node selection for CLUSTER batch command
2 parents 16fc59e + 0148193 commit f9157d3

File tree

2 files changed

+112
-9
lines changed

2 files changed

+112
-9
lines changed

src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ public Future<List<Response>> batch(List<Request> requests) {
329329
LOG.debug("Empty batch");
330330
promise.complete(Collections.emptyList());
331331
} else {
332-
int currentSlot = -1;
332+
int correctSlot = -1;
333+
String currentEndpoint = null;
333334
boolean readOnly = false;
334335
boolean forceMasterEndpoint = false;
335336

@@ -353,6 +354,7 @@ public Future<List<Response>> batch(List<Request> requests) {
353354
final List<byte[]> keys = req.keys();
354355
forceMasterEndpoint |= MASTER_ONLY_COMMANDS.contains(cmd);
355356
int slot;
357+
String endpoint;
356358

357359
// process slots, need to verify if we can run this batch
358360
switch (keys.size()) {
@@ -362,10 +364,14 @@ public Future<List<Response>> batch(List<Request> requests) {
362364
case 1:
363365
// command is single key, as long as we're on the same slot, it's OK
364366
slot = ZModem.generate(keys.get(0));
367+
// as cluster server serves range of slots we need to compare to server range and not exact slot
368+
//always take master to make sure we have same endpoint
369+
endpoint = slots.endpointsForKey(slot)[0];
365370
// we are checking the first request key
366-
if (currentSlot == -1) {
367-
currentSlot = slot;
368-
} else if (currentSlot != slot) {
371+
if (currentEndpoint == null) {
372+
currentEndpoint = endpoint;
373+
correctSlot = slot;
374+
} else if (!currentEndpoint.equals(endpoint)) {
369375
// in cluster mode we currently do not handle batching commands which keys are not on the same slot
370376
promise.fail(buildCrossslotFailureMsg(req));
371377
return promise.future();
@@ -375,9 +381,11 @@ public Future<List<Response>> batch(List<Request> requests) {
375381
// multiple keys on the command
376382
for (byte[] key : keys) {
377383
slot = ZModem.generate(key);
378-
if (currentSlot == -1) {
379-
currentSlot = slot;
380-
} else if (currentSlot != slot) {
384+
endpoint = slots.endpointsForKey(slot)[0];
385+
if (currentEndpoint == null) {
386+
correctSlot = slot;
387+
currentEndpoint = endpoint;
388+
} else if (!currentEndpoint.equals(endpoint)) {
381389
// in cluster mode we currently do not handle batching commands which keys are not on the same slot
382390
promise.fail(buildCrossslotFailureMsg(req));
383391
return promise.future();
@@ -388,7 +396,8 @@ public Future<List<Response>> batch(List<Request> requests) {
388396
}
389397

390398
// all keys are on the same slot!
391-
batch(selectEndpoint(currentSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
399+
//we just need to decide which endpoint to use based on additional options
400+
batch(selectEndpoint(correctSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
392401
}
393402

394403
return promise.future();

src/test/java/io/vertx/redis/client/test/RedisClusterTest.java

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@
44
import io.vertx.core.Context;
55
import io.vertx.core.Future;
66
import io.vertx.core.buffer.Buffer;
7+
import io.vertx.core.json.JsonObject;
78
import io.vertx.ext.unit.Async;
89
import io.vertx.ext.unit.TestContext;
910
import io.vertx.ext.unit.junit.RunTestOnContext;
1011
import io.vertx.ext.unit.junit.VertxUnitRunner;
1112
import io.vertx.redis.client.*;
12-
import java.util.stream.Collectors;
13+
import io.vertx.redis.client.impl.ZModem;
1314
import org.junit.*;
1415
import org.junit.runner.RunWith;
1516
import org.testcontainers.containers.FixedHostPortGenericContainer;
1617
import org.testcontainers.containers.GenericContainer;
1718

1819
import java.util.*;
1920
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.stream.Collectors;
2022

2123
import static io.vertx.redis.client.Command.*;
2224
import static io.vertx.redis.client.Request.cmd;
@@ -1163,6 +1165,98 @@ public void setAndWaitEmptyBatch(TestContext should) {
11631165
));
11641166
}
11651167

1168+
@Test(timeout = 30_000)
1169+
@SuppressWarnings("unchecked")
1170+
public void batchSameSlotGroupByMultipleSlotsCommands(TestContext should) {
1171+
final Async test = should.async();
1172+
1173+
Redis.createClient(rule.vertx(), options)
1174+
.connect()
1175+
.onComplete(should.asyncAssertSuccess(cluster -> {
1176+
cluster
1177+
.send(cmd(CLUSTER).arg("SLOTS"))
1178+
.compose(reply -> {
1179+
if (reply == null || reply.size() == 0) {
1180+
// no slots available we can't really proceed
1181+
return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster.");
1182+
}
1183+
1184+
Map<Integer, JsonObject> slotRangeMap = new HashMap<>();
1185+
for (int i = 0; i < reply.size(); i++) {
1186+
Response s = reply.get(i);
1187+
JsonObject slotRange = new JsonObject()
1188+
.put("start", s.get(0).toInteger())
1189+
.put("end", s.get(1).toInteger());
1190+
slotRangeMap.put(i, slotRange);
1191+
}
1192+
1193+
Map<Integer, List<Request>> mapCommands = new HashMap<>();
1194+
for (int i = 0; i < 100000; i++) {
1195+
String key = "key-" + i;
1196+
String value = "value-" + i;
1197+
int group;
1198+
int slot = ZModem.generate(key);
1199+
for (Map.Entry<Integer, JsonObject> entry : slotRangeMap.entrySet()) {
1200+
if (slot >= entry.getValue().getInteger("start") && slot <= entry.getValue().getInteger("end")) {
1201+
mapCommands.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(cmd(SET).arg(key).arg(value));
1202+
break;
1203+
}
1204+
}
1205+
}
1206+
1207+
List<Future<List<Response>>> futures = new ArrayList<>();
1208+
for (Map.Entry<Integer, List<Request>> entry : mapCommands.entrySet()) {
1209+
futures.add(cluster.batch(entry.getValue()));
1210+
}
1211+
1212+
return Future.all(futures)
1213+
.onComplete(should.asyncAssertSuccess(responses -> {
1214+
should.assertEquals(mapCommands.values().stream().map(List::size).reduce(0, Integer::sum), responses.result().list().stream().map(item -> ((List<Request>) item).size()).reduce(0, Integer::sum));
1215+
test.complete();
1216+
}));
1217+
}).onFailure(should::fail);
1218+
})).onFailure(should::fail);
1219+
}
1220+
1221+
@Test(timeout = 30_000)
1222+
public void batchSameSlotsCommands(TestContext should) {
1223+
final Async test = should.async();
1224+
1225+
Redis.createClient(rule.vertx(), options)
1226+
.connect()
1227+
.onComplete(should.asyncAssertSuccess(cluster -> {
1228+
cluster
1229+
.send(cmd(CLUSTER).arg("SLOTS"))
1230+
.compose(reply -> {
1231+
if (reply == null || reply.size() == 0) {
1232+
// no slots available we can't really proceed
1233+
return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster.");
1234+
}
1235+
1236+
//take random slot
1237+
Response s = reply.get(0);
1238+
JsonObject slotRange = new JsonObject()
1239+
.put("start", s.get(0).toInteger())
1240+
.put("end", s.get(1).toInteger());
1241+
1242+
List<Request> rawCommands = new ArrayList<>();
1243+
for (int i = 0; i < 100000; i++) {
1244+
String key = "key-" + i;
1245+
String value = "value-" + i;
1246+
int endpoint;
1247+
int slot = ZModem.generate(key);
1248+
if (slot >= slotRange.getInteger("start") && slot <= slotRange.getInteger("end")) {
1249+
rawCommands.add(Request.cmd(Command.SET).arg(key).arg(value));
1250+
}
1251+
}
1252+
return cluster.batch(rawCommands).onComplete(should.asyncAssertSuccess(responses -> {
1253+
should.assertEquals(rawCommands.size(), responses.size());
1254+
test.complete();
1255+
}));
1256+
}).onFailure(should::fail);
1257+
})).onFailure(should::fail);
1258+
}
1259+
11661260
@Test(timeout = 30_000)
11671261
public void clusterInfoReturnsVerbatimString(TestContext should) {
11681262
final Async test = should.async();

0 commit comments

Comments
 (0)