Skip to content

Commit 1fd2f79

Browse files
committed
Fixed correct node selection for CLUSTER batch command
1 parent 16fc59e commit 1fd2f79

File tree

2 files changed

+83
-9
lines changed

2 files changed

+83
-9
lines changed

src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ public Future<List<Response>> batch(List<Request> requests) {
329329
LOG.debug("Empty batch");
330330
promise.complete(Collections.emptyList());
331331
} else {
332-
int currentSlot = -1;
332+
int correctSlot = -1;
333+
String currentEndpoint = null;
333334
boolean readOnly = false;
334335
boolean forceMasterEndpoint = false;
335336

@@ -353,6 +354,7 @@ public Future<List<Response>> batch(List<Request> requests) {
353354
final List<byte[]> keys = req.keys();
354355
forceMasterEndpoint |= MASTER_ONLY_COMMANDS.contains(cmd);
355356
int slot;
357+
String endpoint;
356358

357359
// process slots, need to verify if we can run this batch
358360
switch (keys.size()) {
@@ -362,10 +364,14 @@ public Future<List<Response>> batch(List<Request> requests) {
362364
case 1:
363365
// command is single key, as long as we're on the same slot, it's OK
364366
slot = ZModem.generate(keys.get(0));
367+
// as cluster server serves range of slots we need to compare to server range and not exact slot
368+
//always take master to make sure we have same endpoint
369+
endpoint = slots.endpointsForKey(slot)[0];
365370
// we are checking the first request key
366-
if (currentSlot == -1) {
367-
currentSlot = slot;
368-
} else if (currentSlot != slot) {
371+
if (currentEndpoint == null) {
372+
currentEndpoint = endpoint;
373+
correctSlot = slot;
374+
} else if (!currentEndpoint.equals(endpoint)) {
369375
// in cluster mode we currently do not handle batching commands which keys are not on the same slot
370376
promise.fail(buildCrossslotFailureMsg(req));
371377
return promise.future();
@@ -375,9 +381,11 @@ public Future<List<Response>> batch(List<Request> requests) {
375381
// multiple keys on the command
376382
for (byte[] key : keys) {
377383
slot = ZModem.generate(key);
378-
if (currentSlot == -1) {
379-
currentSlot = slot;
380-
} else if (currentSlot != slot) {
384+
endpoint = slots.endpointsForKey(slot)[0];
385+
if (currentEndpoint == null) {
386+
correctSlot = slot;
387+
currentEndpoint = endpoint;
388+
} else if (!currentEndpoint.equals(endpoint)) {
381389
// in cluster mode we currently do not handle batching commands which keys are not on the same slot
382390
promise.fail(buildCrossslotFailureMsg(req));
383391
return promise.future();
@@ -388,7 +396,8 @@ public Future<List<Response>> batch(List<Request> requests) {
388396
}
389397

390398
// all keys are on the same slot!
391-
batch(selectEndpoint(currentSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
399+
//we just need to decide which endpoint to use based on additional options
400+
batch(selectEndpoint(correctSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
392401
}
393402

394403
return promise.future();

src/test/java/io/vertx/redis/client/test/RedisClusterTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99
import io.vertx.ext.unit.junit.RunTestOnContext;
1010
import io.vertx.ext.unit.junit.VertxUnitRunner;
1111
import io.vertx.redis.client.*;
12-
import java.util.stream.Collectors;
12+
import io.vertx.redis.client.impl.ZModem;
1313
import org.junit.*;
1414
import org.junit.runner.RunWith;
1515
import org.testcontainers.containers.FixedHostPortGenericContainer;
1616
import org.testcontainers.containers.GenericContainer;
1717

1818
import java.util.*;
1919
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.stream.Collectors;
2021

2122
import static io.vertx.redis.client.Command.*;
2223
import static io.vertx.redis.client.Request.cmd;
@@ -1163,6 +1164,70 @@ public void setAndWaitEmptyBatch(TestContext should) {
11631164
));
11641165
}
11651166

1167+
@Test(timeout = 30_000)
1168+
@SuppressWarnings("unchecked")
1169+
public void batchSameSlotGroupByMultipleSlotsCommands(TestContext should) {
1170+
final Async test = should.async();
1171+
1172+
Map<Integer, List<Request>> mapCommands = new HashMap<>();
1173+
for (int i = 0; i < 100000; i++) {
1174+
String key = "key-" + i;
1175+
String value = "value-" + i;
1176+
int endpoint;
1177+
int slot = ZModem.generate(key);
1178+
if (slot < 16384 / (options.getEndpoints().size() / 2)) {
1179+
endpoint = 0;
1180+
} else if (slot <= 2 * 16384 / (options.getEndpoints().size() / 2)) {
1181+
endpoint = 1;
1182+
} else {
1183+
endpoint = 2;
1184+
}
1185+
List<Request> commands = mapCommands.computeIfAbsent(endpoint, k -> new ArrayList<>());
1186+
commands.add(Request.cmd(Command.SET).arg(key).arg(value));
1187+
}
1188+
1189+
Redis.createClient(rule.vertx(), options)
1190+
.connect()
1191+
.onComplete(should.asyncAssertSuccess(cluster -> {
1192+
List<Future<List<Response>>> futures = new ArrayList<>();
1193+
for (Map.Entry<Integer, List<Request>> entry : mapCommands.entrySet()) {
1194+
futures.add(cluster.batch(entry.getValue()));
1195+
}
1196+
Future.all(futures).onComplete(should.asyncAssertSuccess(responses -> {
1197+
should.assertEquals(mapCommands.values().stream().map(List::size).reduce(0 , Integer::sum), responses.result().list().stream().map(item -> ((List<Request>) item).size()).reduce(0, Integer::sum));
1198+
1199+
test.countDown();
1200+
})).onFailure(should::fail);
1201+
}
1202+
));
1203+
}
1204+
1205+
@Test(timeout = 30_000)
1206+
public void batchSameSlotsCommands(TestContext should) {
1207+
final Async test = should.async();
1208+
1209+
List<Request> rawCommands = new ArrayList<>();
1210+
for (int i = 0; i < 100000; i++) {
1211+
String key = "key-" + i;
1212+
String value = "value-" + i;
1213+
int endpoint;
1214+
int slot = ZModem.generate(key);
1215+
if (slot < 16384 / (options.getEndpoints().size() / 2)) {
1216+
rawCommands.add(Request.cmd(Command.SET).arg(key).arg(value));
1217+
}
1218+
}
1219+
1220+
Redis.createClient(rule.vertx(), options)
1221+
.connect()
1222+
.onComplete(should.asyncAssertSuccess(cluster -> {
1223+
cluster.batch(rawCommands).onComplete(should.asyncAssertSuccess(responses -> {
1224+
should.assertEquals(rawCommands.size(), responses.size());
1225+
test.countDown();
1226+
})).onFailure(should::fail);
1227+
}
1228+
));
1229+
}
1230+
11661231
@Test(timeout = 30_000)
11671232
public void clusterInfoReturnsVerbatimString(TestContext should) {
11681233
final Async test = should.async();

0 commit comments

Comments
 (0)