|
4 | 4 | import io.vertx.core.Context; |
5 | 5 | import io.vertx.core.Future; |
6 | 6 | import io.vertx.core.buffer.Buffer; |
| 7 | +import io.vertx.core.json.JsonObject; |
7 | 8 | import io.vertx.ext.unit.Async; |
8 | 9 | import io.vertx.ext.unit.TestContext; |
9 | 10 | import io.vertx.ext.unit.junit.RunTestOnContext; |
@@ -1169,63 +1170,91 @@ public void setAndWaitEmptyBatch(TestContext should) { |
1169 | 1170 | public void batchSameSlotGroupByMultipleSlotsCommands(TestContext should) { |
1170 | 1171 | final Async test = should.async(); |
1171 | 1172 |
|
1172 | | - Map<Integer, List<Request>> mapCommands = new HashMap<>(); |
1173 | | - for (int i = 0; i < 100000; i++) { |
1174 | | - String key = "key-" + i; |
1175 | | - String value = "value-" + i; |
1176 | | - int endpoint; |
1177 | | - int slot = ZModem.generate(key); |
1178 | | - if (slot < 16384 / (options.getEndpoints().size() / 2)) { |
1179 | | - endpoint = 0; |
1180 | | - } else if (slot <= 2 * 16384 / (options.getEndpoints().size() / 2)) { |
1181 | | - endpoint = 1; |
1182 | | - } else { |
1183 | | - endpoint = 2; |
1184 | | - } |
1185 | | - List<Request> commands = mapCommands.computeIfAbsent(endpoint, k -> new ArrayList<>()); |
1186 | | - commands.add(Request.cmd(Command.SET).arg(key).arg(value)); |
1187 | | - } |
1188 | | - |
1189 | 1173 | Redis.createClient(rule.vertx(), options) |
1190 | 1174 | .connect() |
1191 | 1175 | .onComplete(should.asyncAssertSuccess(cluster -> { |
1192 | | - List<Future<List<Response>>> futures = new ArrayList<>(); |
1193 | | - for (Map.Entry<Integer, List<Request>> entry : mapCommands.entrySet()) { |
1194 | | - futures.add(cluster.batch(entry.getValue())); |
1195 | | - } |
1196 | | - Future.all(futures).onComplete(should.asyncAssertSuccess(responses -> { |
1197 | | - should.assertEquals(mapCommands.values().stream().map(List::size).reduce(0 , Integer::sum), responses.result().list().stream().map(item -> ((List<Request>) item).size()).reduce(0, Integer::sum)); |
| 1176 | + cluster |
| 1177 | + .send(cmd(CLUSTER).arg("SLOTS")) |
| 1178 | + .compose(reply -> { |
| 1179 | + if (reply == null || reply.size() == 0) { |
| 1180 | + // no slots available we can't really proceed |
| 1181 | + return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster."); |
| 1182 | + } |
1198 | 1183 |
|
1199 | | - test.countDown(); |
1200 | | - })).onFailure(should::fail); |
1201 | | - } |
1202 | | - )); |
| 1184 | + Map<Integer, JsonObject> slotRangeMap = new HashMap<>(); |
| 1185 | + for (int i = 0; i < reply.size(); i++) { |
| 1186 | + Response s = reply.get(i); |
| 1187 | + JsonObject slotRange = new JsonObject() |
| 1188 | + .put("start", s.get(0).toInteger()) |
| 1189 | + .put("end", s.get(1).toInteger()); |
| 1190 | + slotRangeMap.put(i, slotRange); |
| 1191 | + } |
| 1192 | + |
| 1193 | + Map<Integer, List<Request>> mapCommands = new HashMap<>(); |
| 1194 | + for (int i = 0; i < 100000; i++) { |
| 1195 | + String key = "key-" + i; |
| 1196 | + String value = "value-" + i; |
| 1197 | + int group; |
| 1198 | + int slot = ZModem.generate(key); |
| 1199 | + for (Map.Entry<Integer, JsonObject> entry : slotRangeMap.entrySet()) { |
| 1200 | + if (slot >= entry.getValue().getInteger("start") && slot <= entry.getValue().getInteger("end")) { |
| 1201 | + mapCommands.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(cmd(SET).arg(key).arg(value)); |
| 1202 | + break; |
| 1203 | + } |
| 1204 | + } |
| 1205 | + } |
| 1206 | + |
| 1207 | + List<Future<List<Response>>> futures = new ArrayList<>(); |
| 1208 | + for (Map.Entry<Integer, List<Request>> entry : mapCommands.entrySet()) { |
| 1209 | + futures.add(cluster.batch(entry.getValue())); |
| 1210 | + } |
| 1211 | + |
| 1212 | + return Future.all(futures) |
| 1213 | + .onComplete(should.asyncAssertSuccess(responses -> { |
| 1214 | + should.assertEquals(mapCommands.values().stream().map(List::size).reduce(0, Integer::sum), responses.result().list().stream().map(item -> ((List<Request>) item).size()).reduce(0, Integer::sum)); |
| 1215 | + test.complete(); |
| 1216 | + })); |
| 1217 | + }).onFailure(should::fail); |
| 1218 | + })).onFailure(should::fail); |
1203 | 1219 | } |
1204 | 1220 |
|
1205 | 1221 | @Test(timeout = 30_000) |
1206 | 1222 | public void batchSameSlotsCommands(TestContext should) { |
1207 | 1223 | final Async test = should.async(); |
1208 | 1224 |
|
1209 | | - List<Request> rawCommands = new ArrayList<>(); |
1210 | | - for (int i = 0; i < 100000; i++) { |
1211 | | - String key = "key-" + i; |
1212 | | - String value = "value-" + i; |
1213 | | - int endpoint; |
1214 | | - int slot = ZModem.generate(key); |
1215 | | - if (slot < 16384 / (options.getEndpoints().size() / 2)) { |
1216 | | - rawCommands.add(Request.cmd(Command.SET).arg(key).arg(value)); |
1217 | | - } |
1218 | | - } |
1219 | | - |
1220 | 1225 | Redis.createClient(rule.vertx(), options) |
1221 | 1226 | .connect() |
1222 | 1227 | .onComplete(should.asyncAssertSuccess(cluster -> { |
1223 | | - cluster.batch(rawCommands).onComplete(should.asyncAssertSuccess(responses -> { |
1224 | | - should.assertEquals(rawCommands.size(), responses.size()); |
1225 | | - test.countDown(); |
1226 | | - })).onFailure(should::fail); |
1227 | | - } |
1228 | | - )); |
| 1228 | + cluster |
| 1229 | + .send(cmd(CLUSTER).arg("SLOTS")) |
| 1230 | + .compose(reply -> { |
| 1231 | + if (reply == null || reply.size() == 0) { |
| 1232 | + // no slots available we can't really proceed |
| 1233 | + return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster."); |
| 1234 | + } |
| 1235 | + |
| 1236 | + //take random slot |
| 1237 | + Response s = reply.get(0); |
| 1238 | + JsonObject slotRange = new JsonObject() |
| 1239 | + .put("start", s.get(0).toInteger()) |
| 1240 | + .put("end", s.get(1).toInteger()); |
| 1241 | + |
| 1242 | + List<Request> rawCommands = new ArrayList<>(); |
| 1243 | + for (int i = 0; i < 100000; i++) { |
| 1244 | + String key = "key-" + i; |
| 1245 | + String value = "value-" + i; |
| 1246 | + int endpoint; |
| 1247 | + int slot = ZModem.generate(key); |
| 1248 | + if (slot >= slotRange.getInteger("start") && slot <= slotRange.getInteger("end")) { |
| 1249 | + rawCommands.add(Request.cmd(Command.SET).arg(key).arg(value)); |
| 1250 | + } |
| 1251 | + } |
| 1252 | + return cluster.batch(rawCommands).onComplete(should.asyncAssertSuccess(responses -> { |
| 1253 | + should.assertEquals(rawCommands.size(), responses.size()); |
| 1254 | + test.complete(); |
| 1255 | + })); |
| 1256 | + }).onFailure(should::fail); |
| 1257 | + })).onFailure(should::fail); |
1229 | 1258 | } |
1230 | 1259 |
|
1231 | 1260 | @Test(timeout = 30_000) |
|
0 commit comments