From b6c46cad182e3a87a28832378c0067bda4d9d9fd Mon Sep 17 00:00:00 2001 From: zstan Date: Tue, 17 Feb 2026 10:24:02 +0300 Subject: [PATCH 1/2] IGNITE-27873 Fix flaky IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests --- ...eQueryAbstractDistributedJoinSelfTest.java | 14 ++++--------- ...ancelOrTimeoutDistributedJoinSelfTest.java | 20 +++++++++---------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index b919a1a871efb..eabcc2675c1d2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@ -18,16 +18,13 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Serializable; -import java.util.Random; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; -import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -77,7 +74,7 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd private static final int PERS_CNT = 600; /** */ - private static final int PURCHASE_CNT = 6_000; + private static final int PURCHASE_CNT = 24_000; /** */ private static final int COMPANY_CNT = 25; @@ -103,7 +100,6 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd cc.setAtomicityMode(TRANSACTIONAL); cc.setRebalanceMode(SYNC); cc.setLongQueryWarningTimeout(15_000); - cc.setAffinity(new RendezvousAffinityFunction(false, 60)); switch (name) { case "pe": @@ -163,10 +159,8 @@ private void fillCaches() { IgniteCache pr = grid(0).cache("pr"); - Random rnd = new GridRandom(); - for (int i = 0; i < PRODUCT_CNT; i++) - pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + pr.put(i, new Product(i, i % COMPANY_CNT)); IgniteCache pe = grid(0).cache("pe"); @@ -176,8 +170,8 @@ private void fillCaches() { IgniteCache pu = grid(0).cache("pu"); for (int i = 0; i < PURCHASE_CNT; i++) { - int persId = rnd.nextInt(PERS_CNT); - int prodId = rnd.nextInt(PRODUCT_CNT); + int persId = i % PERS_CNT; + int prodId = i % PRODUCT_CNT; pu.put(i, new Purchase(persId, prodId)); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 07eabab9779cb..71419d27aab58 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@ -25,7 +25,6 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -91,7 +90,7 @@ public void testTimeout4() throws Exception { /** */ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit, boolean timeout, boolean checkCanceled) throws Exception { - SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true); + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);; IgniteCache cache = ignite.cache(cacheName); @@ -104,18 +103,17 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti else { cursor = cache.query(qry); - ignite.scheduler().runLocal(new Runnable() { - @Override public void run() { - cursor.close(); - } - }, timeoutUnits, timeUnit); + ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit); } try (QueryCursor> ignored = cursor) { - cursor.getAll(); + int resSize = 0; + for (List ignored1 : cursor) { + ++resSize; + } if (checkCanceled) - fail("Query not canceled"); + fail("Query not canceled, result size=" + resSize); } catch (CacheException ex) { log().error("Got expected exception", ex); @@ -124,7 +122,7 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti } // Give some time to clean up. - Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000); + Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 1_000); checkCleanState(); } @@ -132,7 +130,7 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti /** * Validates clean state on all participating nodes after query cancellation. */ - private void checkCleanState() throws IgniteCheckedException { + private void checkCleanState() { for (int i = 0; i < GRID_CNT; i++) { IgniteEx grid = grid(i); From 47e1aef92134e845ace2356b99091261d24747ad Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 20 Feb 2026 15:20:43 +0300 Subject: [PATCH 2/2] fix after review --- ...eQueryAbstractDistributedJoinSelfTest.java | 31 +++++-------------- ...ancelOrTimeoutDistributedJoinSelfTest.java | 30 ++++++++---------- 2 files changed, 21 insertions(+), 40 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index eabcc2675c1d2..5a3e5f6c71db2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@ -19,14 +19,13 @@ import java.io.Serializable; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -62,10 +61,10 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd "order by co._key, pr._key "; /** */ - protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" + + protected static final String QRY_LONG = "select id1 from (select pe.id as id1, co.id, pr._key\n" + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + - "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + - "order by pe.id desc"; + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key\n" + + "order by pe.id) where id1 > sleep(10)"; /** */ protected static final int GRID_CNT = 2; @@ -74,7 +73,7 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd private static final int PERS_CNT = 600; /** */ - private static final int PURCHASE_CNT = 24_000; + private static final int PURCHASE_CNT = 6_000; /** */ private static final int COMPANY_CNT = 25; @@ -100,6 +99,8 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd cc.setAtomicityMode(TRANSACTIONAL); cc.setRebalanceMode(SYNC); cc.setLongQueryWarningTimeout(15_000); + cc.setAffinity(new RendezvousAffinityFunction(false, 60)); + cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class); switch (name) { case "pe": @@ -252,20 +253,4 @@ protected static class Product implements Serializable { this.companyId = companyId; } } - - /** */ - public static class Functions { - /** */ - @QuerySqlFunction - public static int sleep() { - try { - U.sleep(1_000); - } - catch (IgniteInterruptedCheckedException ignored) { - // No-op. - } - - return 0; - } - } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 71419d27aab58..d56413c8369dc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@ -25,16 +25,20 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.GridProcessor; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.junit.Test; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * Test for cancel of query containing distributed joins. */ @@ -90,7 +94,7 @@ public void testTimeout4() throws Exception { /** */ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit, boolean timeout, boolean checkCanceled) throws Exception { - SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);; + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true); IgniteCache cache = ignite.cache(cacheName); @@ -107,44 +111,36 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti } try (QueryCursor> ignored = cursor) { - int resSize = 0; - for (List ignored1 : cursor) { - ++resSize; - } + int resSize = F.size(cursor.iterator()); if (checkCanceled) fail("Query not canceled, result size=" + resSize); } - catch (CacheException ex) { - log().error("Got expected exception", ex); + catch (CacheException | IgniteException ex) { + log().error("Got exception", ex); assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class)); } - // Give some time to clean up. - Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 1_000); - checkCleanState(); } /** * Validates clean state on all participating nodes after query cancellation. */ - private void checkCleanState() { + private void checkCleanState() throws IgniteInterruptedCheckedException { for (int i = 0; i < GRID_CNT; i++) { IgniteEx grid = grid(i); // Validate everything was cleaned up. - ConcurrentMap map = U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field( - grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess"); - - String msg = "Map executor state is not cleared"; + ConcurrentMap map = U.field( + ((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(), "qryRess"); // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query. for (Object result : map.values()) { Map m = U.field(result, "res"); - assertEquals(msg, 0, m.size()); + assertTrue("Map executor state is not cleared", waitForCondition(m::isEmpty, 1_000L)); } } }