Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions activemq-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<!-- HTTP/WS/SSL tests are inherently slower: tests have legitimate Thread.sleep() calls up to 35s -->
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private void corruptLocationAtDataFileIndex(int id) throws IOException {
randomAccessFile.seek(offset);
randomAccessFile.write(bla, 0, bla.length);
randomAccessFile.getFD().sync();
dataFile.closeRandomAccessFile(randomAccessFile);
}

private int getNumberOfJournalFiles() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
Expand Down Expand Up @@ -185,9 +186,9 @@ public void testRecoveryAfterCorruption() throws Exception {
}

private void whackIndex(File dataDir) {
File indexToDelete = new File(dataDir, "db.data");
final File indexToDelete = new File(dataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete();
IOHelper.deleteFileNonBlocking(indexToDelete);
}

private void corruptBatchMiddle(int i) throws IOException {
Expand Down Expand Up @@ -228,6 +229,7 @@ private void corruptBatch(int id, boolean atEnd) throws IOException {
Arrays.fill(bla, fill);
randomAccessFile.seek(offset);
randomAccessFile.write(bla, 0, bla.length);
dataFile.closeRandomAccessFile(randomAccessFile);
}

private int getNumberOfJournalFiles() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
Expand Down Expand Up @@ -306,9 +307,9 @@ private void whackIndex(File dataDir) throws Exception {
}

private void whackFile(File dataDir, String name) throws Exception {
File indexToDelete = new File(dataDir, name);
final File indexToDelete = new File(dataDir, name);
LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete();
IOHelper.deleteFileNonBlocking(indexToDelete);
}

private int getNumberOfJournalFiles() throws IOException {
Expand Down
38 changes: 21 additions & 17 deletions activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -336,26 +339,27 @@ public Message createMessage(final Session session)
broker.stop();
}

// need to ensure broker bridge is alive before starting the consumer
// peeking at the internals will give us this info
// Wait until both brokers have their local consumer AND the remote demand subscription
// from the other broker's bridge (>= 2 consumers per queue). This guarantees:
// 1. Both local consumers (container1, container2) are truly subscribed
// 2. The network bridges are fully started and have propagated demand subscriptions
private void waitForBridgeFormation() throws Exception {
long done = System.currentTimeMillis() + 30000;
while (done > System.currentTimeMillis()) {
if (hasBridge("one") && hasBridge("two")) {
return;
}
Thread.sleep(1000);
}
assertTrue("Both brokers should have local + bridge demand consumers for " + TESTING_QUEUE,
Wait.waitFor(() -> getQueueConsumerCount("one") >= 2 && getQueueConsumerCount("two") >= 2,
30000, 100));
}

private boolean hasBridge(String name) {
boolean result = false;
BrokerService broker = BrokerRegistry.getInstance().lookup(name);
if (broker != null && !broker.getNetworkConnectors().isEmpty()) {
if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
result = true;
}
private int getQueueConsumerCount(String brokerName) {
try {
final BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
if (broker == null) {
return 0;
}
return result;
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
final Destination dest = regionBroker.getDestinationMap().get(new ActiveMQQueue(TESTING_QUEUE));
return dest != null ? dest.getConsumers().size() : 0;
} catch (Exception ignored) {
return 0;
}
}
}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

<surefire.argLine></surefire.argLine>
<maven.surefire.allow.securitymanager></maven.surefire.allow.securitymanager>
<!-- Per-class JVM timeout (reuseForks=false means one JVM per test class).
Kills a forked JVM that has been running longer than this many seconds,
preventing CI from hanging indefinitely on Windows file-lock issues or
any other test that never terminates. Override per-module if needed. -->
<surefire.forkedProcessTimeout>300</surefire.forkedProcessTimeout>

<siteId>activemq-${project.version}</siteId>
<projectName>Apache ActiveMQ</projectName>
Expand Down Expand Up @@ -973,6 +978,7 @@
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>${surefire.forkedProcessTimeout}</forkedProcessTimeoutInSeconds>
<failIfNoTests>false</failIfNoTests>
<systemPropertyVariables>
<java.awt.headless>true</java.awt.headless>
Expand Down