From 5e8a68330fd1cc374aedb245e7e8b9e65200d7fa Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Mon, 23 Feb 2026 10:58:24 +0100 Subject: [PATCH 1/3] fix(kahadb-store): Fix Windows CI hanging by adding timeout for forked processes and ensuring proper resource cleanup --- .../store/kahadb/JournalCorruptionExceptionTest.java | 1 + .../store/kahadb/JournalCorruptionIndexRecoveryTest.java | 6 ++++-- .../apache/activemq/store/kahadb/JournalFdRecoveryTest.java | 5 +++-- pom.xml | 6 ++++++ 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java index 3806fe05fc8..83054413e4f 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java @@ -168,6 +168,7 @@ private void corruptLocationAtDataFileIndex(int id) throws IOException { randomAccessFile.seek(offset); randomAccessFile.write(bla, 0, bla.length); randomAccessFile.getFD().sync(); + dataFile.closeRandomAccessFile(randomAccessFile); } private int getNumberOfJournalFiles() throws IOException { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java index 0a4d4fcb5c4..d69594ff35e 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; @@ -185,9 +186,9 @@ public void testRecoveryAfterCorruption() throws Exception { } private void whackIndex(File dataDir) { - File indexToDelete = new File(dataDir, "db.data"); + final File indexToDelete = new File(dataDir, "db.data"); LOG.info("Whacking index: " + indexToDelete); - indexToDelete.delete(); + IOHelper.deleteFileNonBlocking(indexToDelete); } private void corruptBatchMiddle(int i) throws IOException { @@ -228,6 +229,7 @@ private void corruptBatch(int id, boolean atEnd) throws IOException { Arrays.fill(bla, fill); randomAccessFile.seek(offset); randomAccessFile.write(bla, 0, bla.length); + dataFile.closeRandomAccessFile(randomAccessFile); } private int getNumberOfJournalFiles() throws IOException { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java index 8a34b88944b..6ec60213e14 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java @@ -18,6 +18,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -306,9 +307,9 @@ private void whackIndex(File dataDir) throws Exception { } private void whackFile(File dataDir, String name) throws Exception { - File indexToDelete = new File(dataDir, name); + final File indexToDelete = new File(dataDir, name); LOG.info("Whacking index: " + indexToDelete); - indexToDelete.delete(); + IOHelper.deleteFileNonBlocking(indexToDelete); } private int getNumberOfJournalFiles() throws IOException { diff --git a/pom.xml b/pom.xml index a7a4f5e106f..d7a7fffdf6f 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,11 @@ + + 300 activemq-${project.version} Apache ActiveMQ @@ -973,6 +978,7 @@ true 1 false + ${surefire.forkedProcessTimeout} false true From 905db6e94aba1c57c9ab61032b1a9bdd1fa47090 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Mon, 23 Feb 2026 16:08:34 +0100 Subject: [PATCH 2/3] fix(http): Increase forked process timeout for slower HTTP/WS/SSL tests --- activemq-http/pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index 0a9cc4eec33..2bae67a3f1e 100644 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -177,6 +177,8 @@ maven-surefire-plugin 1 + + 600 From 514a5d614f9d6bdd22b6b5af672641047f31b2fb Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Mon, 23 Feb 2026 16:59:55 +0100 Subject: [PATCH 3/3] Refactor LoadBalanceTest to improve bridge formation wait logic and add consumer count check --- .../java/org/apache/bugs/LoadBalanceTest.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java index 3d24867ddbe..e36afa66b43 100644 --- a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java +++ b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java @@ -35,9 +35,12 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.util.Wait; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -336,26 +339,27 @@ public Message createMessage(final Session session) broker.stop(); } - // need to ensure broker bridge is alive before starting the consumer - // peeking at the internals will give us this info + // Wait until both brokers have their local consumer AND the remote demand subscription + // from the other broker's bridge (>= 2 consumers per queue). This guarantees: + // 1. Both local consumers (container1, container2) are truly subscribed + // 2. The network bridges are fully started and have propagated demand subscriptions private void waitForBridgeFormation() throws Exception { - long done = System.currentTimeMillis() + 30000; - while (done > System.currentTimeMillis()) { - if (hasBridge("one") && hasBridge("two")) { - return; - } - Thread.sleep(1000); - } + assertTrue("Both brokers should have local + bridge demand consumers for " + TESTING_QUEUE, + Wait.waitFor(() -> getQueueConsumerCount("one") >= 2 && getQueueConsumerCount("two") >= 2, + 30000, 100)); } - private boolean hasBridge(String name) { - boolean result = false; - BrokerService broker = BrokerRegistry.getInstance().lookup(name); - if (broker != null && !broker.getNetworkConnectors().isEmpty()) { - if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { - result = true; - } + private int getQueueConsumerCount(String brokerName) { + try { + final BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + if (broker == null) { + return 0; } - return result; + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + final Destination dest = regionBroker.getDestinationMap().get(new ActiveMQQueue(TESTING_QUEUE)); + return dest != null ? dest.getConsumers().size() : 0; + } catch (Exception ignored) { + return 0; + } } }