Skip to content

Commit ee9b459

Browse files
committed
Updated with Kirk's first review comments
1 parent 4c8672a commit ee9b459

File tree

9 files changed

+289
-356
lines changed

9 files changed

+289
-356
lines changed

geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java

Lines changed: 114 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717
import static org.apache.geode.cache.Region.SEPARATOR;
1818
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
1919
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
20-
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
2120
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
2221
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
2322
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
2423
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
2524
import static org.apache.geode.test.dunit.VM.getVM;
2625
import static org.assertj.core.api.Assertions.assertThat;
27-
import static org.junit.Assert.assertEquals;
28-
import static org.junit.Assert.assertNotNull;
2926

3027
import java.io.File;
3128
import java.io.IOException;
@@ -62,7 +59,11 @@
6259
import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
6360
import org.apache.geode.internal.cache.PartitionedRegion;
6461
import org.apache.geode.internal.cache.RegionQueue;
62+
import org.apache.geode.internal.cache.execute.data.CustId;
63+
import org.apache.geode.internal.cache.execute.data.Order;
6564
import org.apache.geode.internal.cache.execute.data.OrderId;
65+
import org.apache.geode.internal.cache.execute.data.Shipment;
66+
import org.apache.geode.internal.cache.execute.data.ShipmentId;
6667
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
6768
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
6869
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
@@ -193,7 +194,6 @@ public void tearDown() {
193194

194195
protected Properties createLocatorConfig(int systemId, int locatorPort, int remoteLocatorPort) {
195196
Properties config = new Properties();
196-
config.setProperty(MCAST_PORT, "0");
197197
config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
198198
config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
199199
config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + ']');
@@ -259,7 +259,6 @@ protected GatewayReceiverFactory createGatewayReceiverFactory(int receiverPort)
259259

260260
protected Properties createServerConfig(int locatorPort) {
261261
Properties config = new Properties();
262-
config.setProperty(MCAST_PORT, "0");
263262
config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
264263
return config;
265264
}
@@ -288,10 +287,8 @@ protected boolean isRunning(GatewaySender sender) {
288287

289288
protected void validateRegionSize(String regionName, final int regionSize) {
290289
final Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR + regionName);
291-
assertNotNull(r);
292-
if (regionSize != r.keySet().size()) {
293-
await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize));
294-
}
290+
assertThat(r).isNotNull();
291+
await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize));
295292
}
296293

297294
protected List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
@@ -310,7 +307,7 @@ protected List<Integer> getSenderStats(String senderId, final int expectedQueueS
310307
await()
311308
.untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
312309
}
313-
ArrayList<Integer> stats = new ArrayList<>();
310+
List<Integer> stats = new ArrayList<>();
314311
stats.add(statistics.getEventQueueSize());
315312
stats.add(statistics.getEventsReceived());
316313
stats.add(statistics.getEventsQueued());
@@ -340,26 +337,30 @@ protected GatewaySender getGatewaySender(String senderId) {
340337
return sender;
341338
}
342339

343-
protected void doPutsInsideTransactions(String regionName, Map<Object, Object> keyValues,
344-
int eventsPerTransaction) {
345-
Region<Object, Object> r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
346-
assertNotNull(r);
347-
int eventInTransaction = 0;
348-
CacheTransactionManager cacheTransactionManager =
349-
cacheRule.getCache().getCacheTransactionManager();
350-
for (Object key : keyValues.keySet()) {
351-
if (eventInTransaction == 0) {
352-
cacheTransactionManager.begin();
353-
}
354-
r.put(key, keyValues.get(key));
355-
if (++eventInTransaction == eventsPerTransaction) {
356-
cacheTransactionManager.commit();
357-
eventInTransaction = 0;
358-
}
340+
protected void doTxPuts(String regionName, final long putsPerTransaction,
341+
final long transactions) {
342+
doTxPuts(regionName, putsPerTransaction, transactions, 0);
343+
}
344+
345+
protected void doTxPuts(String regionName, final long putsPerTransaction,
346+
final long transactions, long initialKeyId) {
347+
Region<Object, Object> region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
348+
CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager();
349+
for (int i = 0; i < transactions; i++) {
350+
long keyId = initialKeyId + (i * putsPerTransaction);
351+
doOneTxWithPuts(region, mgr, putsPerTransaction, keyId);
359352
}
360-
if (eventInTransaction != 0) {
361-
cacheTransactionManager.commit();
353+
}
354+
355+
private void doOneTxWithPuts(Region<Object, Object> region, CacheTransactionManager mgr,
356+
long putsPerTransaction, long initialKeyId) {
357+
mgr.begin();
358+
for (int j = 0; j < putsPerTransaction; j++) {
359+
long key = initialKeyId + j;
360+
String value = "Value_" + key;
361+
region.put(key, value);
362362
}
363+
mgr.commit();
363364
}
364365

365366
protected void checkGatewayReceiverStats(int processBatches, int eventsReceived,
@@ -386,35 +387,42 @@ protected void checkGatewayReceiverStats(int processBatches, int eventsReceived,
386387
}
387388

388389
protected void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
389-
final long transactions, long offset) {
390-
Region<Object, Object> r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
391-
long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
392-
long j;
390+
final long transactions, long initialKeyId) {
391+
Region<Object, Object> region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
393392
CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager();
394393
for (int i = 0; i < transactions; i++) {
395-
boolean done = false;
396-
do {
394+
long keyId = initialKeyId + (i * putsPerTransaction);
395+
doOneTxWithPutsWithRetryIfError(region, mgr, putsPerTransaction, keyId);
396+
}
397+
}
398+
399+
private void doOneTxWithPutsWithRetryIfError(Region<Object, Object> region,
400+
CacheTransactionManager mgr, long putsPerTransaction, long initialKeyId) {
401+
while (true) {
402+
try {
403+
mgr.begin();
404+
for (int j = 0; j < putsPerTransaction; j++) {
405+
long key = initialKeyId + j;
406+
String value = "Value_" + key;
407+
region.put(key, value);
408+
}
409+
mgr.commit();
410+
return;
411+
} catch (TransactionException ignore) {
412+
} catch (IllegalStateException ignore) {
397413
try {
398-
mgr.begin();
399-
for (j = 0; j < putsPerTransaction; j++) {
400-
long key = keyOffset + ((j + (10L * i)) * 100);
401-
String value = "Value_" + key;
402-
r.put(key, value);
403-
}
404-
mgr.commit();
405-
done = true;
406-
} catch (TransactionException ignore) {
407-
} catch (IllegalStateException ignore) {
408-
try {
409-
mgr.rollback();
410-
} catch (Exception ignored) {
411-
}
414+
mgr.rollback();
415+
} catch (Exception ignored) {
412416
}
413-
} while (!done);
417+
}
414418
}
415419
}
416420

417421
public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
422+
createCustomerOrderShipmentPartitionedRegion(senderId, 0);
423+
}
424+
425+
public void createCustomerOrderShipmentPartitionedRegion(String senderId, int redundantCopies) {
418426
RegionFactory<Object, Object> fact =
419427
cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
420428
if (senderId != null) {
@@ -423,10 +431,12 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
423431

424432
PartitionAttributesFactory paf = new PartitionAttributesFactory();
425433
paf.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
434+
paf.setRedundantCopies(redundantCopies);
426435
fact.setPartitionAttributes(paf.create());
427436
fact.create(customerRegionName);
428437

429438
paf = new PartitionAttributesFactory();
439+
paf.setRedundantCopies(redundantCopies);
430440
paf.setColocatedWith(customerRegionName)
431441
.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
432442
fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
@@ -437,6 +447,7 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
437447
fact.create(orderRegionName);
438448

439449
paf = new PartitionAttributesFactory();
450+
paf.setRedundantCopies(redundantCopies);
440451
paf.setColocatedWith(orderRegionName)
441452
.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
442453
fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
@@ -447,33 +458,62 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
447458
fact.create(shipmentRegionName);
448459
}
449460

450-
public void doOrderAndShipmentPutsInsideTransactions(Map<Object, Object> keyValues,
451-
int eventsPerTransaction) {
452-
Region<Object, Object> orderRegion = cacheRule.getCache().getRegion(orderRegionName);
453-
Region<Object, Object> shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName);
454-
assertNotNull(orderRegion);
455-
assertNotNull(shipmentRegion);
456-
int eventInTransaction = 0;
461+
public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction,
462+
int transactions) {
463+
doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction, transactions, false);
464+
}
465+
466+
public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction,
467+
int transactions, boolean retryIfError) {
457468
CacheTransactionManager cacheTransactionManager =
458469
cacheRule.getCache().getCacheTransactionManager();
459-
for (Object key : keyValues.keySet()) {
460-
if (eventInTransaction == 0) {
461-
cacheTransactionManager.begin();
462-
}
463-
Region<Object, Object> r;
464-
if (key instanceof OrderId) {
465-
r = orderRegion;
470+
for (int i = 0; i < transactions; i++) {
471+
int keyId = i * eventsPerTransaction;
472+
if (retryIfError) {
473+
doOneTxOrderAndShipmentPutsWithRetryIfError(cacheTransactionManager, keyId,
474+
eventsPerTransaction, customerId);
466475
} else {
467-
r = shipmentRegion;
476+
doOneTxOrderAndShipmentPuts(cacheTransactionManager, keyId, eventsPerTransaction,
477+
customerId);
468478
}
469-
r.put(key, keyValues.get(key));
470-
if (++eventInTransaction == eventsPerTransaction) {
479+
}
480+
}
481+
482+
private void doOneTxOrderAndShipmentPuts(
483+
CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction,
484+
int customerId) {
485+
cacheTransactionManager.begin();
486+
doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId);
487+
cacheTransactionManager.commit();
488+
}
489+
490+
private void doOneTxOrderAndShipmentPutsWithRetryIfError(
491+
CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction,
492+
int customerId) {
493+
while (true) {
494+
try {
495+
cacheTransactionManager.begin();
496+
doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId);
471497
cacheTransactionManager.commit();
472-
eventInTransaction = 0;
498+
break;
499+
} catch (TransactionException exception) {
500+
} catch (IllegalStateException exception) {
501+
try {
502+
cacheTransactionManager.rollback();
503+
} catch (Exception ignored) {
504+
}
473505
}
474506
}
475-
if (eventInTransaction != 0) {
476-
cacheTransactionManager.commit();
507+
}
508+
509+
private void doOneOrderAndShipmentPuts(int keyId, int eventsPerTransaction, int customerId) {
510+
Region<Object, Object> orderRegion = cacheRule.getCache().getRegion(orderRegionName);
511+
Region<Object, Object> shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName);
512+
OrderId orderId = new OrderId(keyId, new CustId(customerId));
513+
orderRegion.put(orderId, new Order());
514+
for (int i = 0; i < eventsPerTransaction - 1; i++) {
515+
ShipmentId shipmentId = new ShipmentId(keyId + i, orderId);
516+
shipmentRegion.put(shipmentId, new Shipment());
477517
}
478518
}
479519

@@ -498,15 +538,15 @@ protected void checkGatewayReceiverStatsHA(int processBatches, int eventsReceive
498538

499539
protected void putGivenKeyValues(String regionName, Map<?, ?> keyValues) {
500540
Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR + regionName);
501-
assertNotNull(r);
541+
assertThat(r).isNotNull();
502542
for (Object key : keyValues.keySet()) {
503543
r.put(key, keyValues.get(key));
504544
}
505545
}
506546

507547
protected void checkConflatedStats(String senderId, final int eventsConflated) {
508548
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
509-
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
549+
assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated);
510550
}
511551

512552
protected GatewaySenderStats getGatewaySenderStats(String senderId) {
@@ -517,12 +557,9 @@ protected GatewaySenderStats getGatewaySenderStats(String senderId) {
517557
protected void validateGatewaySenderQueueAllBucketsDrained(final String senderId) {
518558
GatewaySender sender = getGatewaySender(senderId);
519559
final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
520-
await().untilAsserted(() -> {
521-
assertThat(abstractSender.getEventQueueSize()).isEqualTo(0);
522-
});
523-
await().untilAsserted(() -> {
524-
assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0);
525-
});
560+
await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).isEqualTo(0));
561+
await()
562+
.untilAsserted(() -> assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0));
526563
}
527564

528565
public static void setNumDispatcherThreadsForTheRun(int numThreads) {

geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20+
import java.util.ArrayList;
2021
import java.util.HashMap;
22+
import java.util.List;
2123
import java.util.Map;
2224

2325
import junitparams.Parameters;
@@ -29,8 +31,6 @@
2931
import org.apache.geode.internal.cache.execute.data.CustId;
3032
import org.apache.geode.internal.cache.execute.data.Order;
3133
import org.apache.geode.internal.cache.execute.data.OrderId;
32-
import org.apache.geode.internal.cache.execute.data.Shipment;
33-
import org.apache.geode.internal.cache.execute.data.ShipmentId;
3434
import org.apache.geode.test.dunit.AsyncInvocation;
3535
import org.apache.geode.test.dunit.VM;
3636
import org.apache.geode.test.junit.categories.WanTest;
@@ -62,46 +62,37 @@ public void testPartitionedRegionPropagationWithGroupTransactionEventsAndMixOfEv
6262
}
6363

6464
int customers = 4;
65-
6665
int transactionsPerCustomer = 1000;
67-
final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
68-
for (int custId = 0; custId < customers; custId++) {
69-
for (int i = 0; i < transactionsPerCustomer; i++) {
70-
CustId custIdObject = new CustId(custId);
71-
OrderId orderId = new OrderId(i, custIdObject);
72-
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
73-
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
74-
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
75-
keyValuesInTransactions.put(orderId, new Order());
76-
keyValuesInTransactions.put(shipmentId1, new Shipment());
77-
keyValuesInTransactions.put(shipmentId2, new Shipment());
78-
keyValuesInTransactions.put(shipmentId3, new Shipment());
79-
}
80-
}
81-
8266
int ordersPerCustomerNotInTransactions = 1000;
8367

8468
final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
8569
for (int custId = 0; custId < customers; custId++) {
8670
for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
8771
CustId custIdObject = new CustId(custId);
88-
OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
72+
OrderId orderId =
73+
new OrderId(i + ordersPerCustomerNotInTransactions * customers, custIdObject);
8974
keyValuesNotInTransactions.put(orderId, new Order());
9075
}
9176
}
9277

9378
// eventsPerTransaction is 1 (orders) + 3 (shipments)
9479
int eventsPerTransaction = 4;
95-
AsyncInvocation<Void> putsInTransactionsInvocation =
96-
londonServer1VM.invokeAsync(
97-
() -> doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
98-
eventsPerTransaction));
80+
List<AsyncInvocation<Void>> putsInTransactionsInvocationList = new ArrayList<>(customers);
81+
for (int i = 0; i < customers; i++) {
82+
final int customerId = i;
83+
putsInTransactionsInvocationList.add(
84+
londonServer1VM.invokeAsync(
85+
() -> doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction,
86+
transactionsPerCustomer)));
87+
}
9988

10089
AsyncInvocation<Void> putsNotInTransactionsInvocation =
10190
londonServer2VM.invokeAsync(
10291
() -> putGivenKeyValues(orderRegionName, keyValuesNotInTransactions));
10392

104-
putsInTransactionsInvocation.await();
93+
for (AsyncInvocation<Void> putsInTransactionInvocation : putsInTransactionsInvocationList) {
94+
putsInTransactionInvocation.await();
95+
}
10596
putsNotInTransactionsInvocation.await();
10697

10798
int entries =

0 commit comments

Comments
 (0)