diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index b919a1a871efb..5a3e5f6c71db2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@ -18,18 +18,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Serializable; -import java.util.Random; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; -import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -65,10 +61,10 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd "order by co._key, pr._key "; /** */ - protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" + + protected static final String QRY_LONG = "select id1 from (select pe.id as id1, co.id, pr._key\n" + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + - "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + - "order by pe.id desc"; + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key\n" + + "order by pe.id) where id1 > sleep(10)"; /** */ protected static final int GRID_CNT = 2; @@ -104,6 +100,7 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd cc.setRebalanceMode(SYNC); cc.setLongQueryWarningTimeout(15_000); cc.setAffinity(new RendezvousAffinityFunction(false, 60)); + cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class); switch (name) { case "pe": @@ -163,10 +160,8 @@ private void fillCaches() { IgniteCache pr = grid(0).cache("pr"); - Random rnd = new GridRandom(); - for (int i = 0; i < PRODUCT_CNT; i++) - pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + pr.put(i, new Product(i, i % COMPANY_CNT)); IgniteCache pe = grid(0).cache("pe"); @@ -176,8 +171,8 @@ private void fillCaches() { IgniteCache pu = grid(0).cache("pu"); for (int i = 0; i < PURCHASE_CNT; i++) { - int persId = rnd.nextInt(PERS_CNT); - int prodId = rnd.nextInt(PRODUCT_CNT); + int persId = i % PERS_CNT; + int prodId = i % PRODUCT_CNT; pu.put(i, new Purchase(persId, prodId)); } @@ -258,20 +253,4 @@ protected static class Product implements Serializable { this.companyId = companyId; } } - - /** */ - public static class Functions { - /** */ - @QuerySqlFunction - public static int sleep() { - try { - U.sleep(1_000); - } - catch (IgniteInterruptedCheckedException ignored) { - // No-op. - } - - return 0; - } - } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 07eabab9779cb..d56413c8369dc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@ -25,17 +25,20 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.GridProcessor; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.junit.Test; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * Test for cancel of query containing distributed joins. */ @@ -104,49 +107,40 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti else { cursor = cache.query(qry); - ignite.scheduler().runLocal(new Runnable() { - @Override public void run() { - cursor.close(); - } - }, timeoutUnits, timeUnit); + ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit); } try (QueryCursor> ignored = cursor) { - cursor.getAll(); + int resSize = F.size(cursor.iterator()); if (checkCanceled) - fail("Query not canceled"); + fail("Query not canceled, result size=" + resSize); } - catch (CacheException ex) { - log().error("Got expected exception", ex); + catch (CacheException | IgniteException ex) { + log().error("Got exception", ex); assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class)); } - // Give some time to clean up. - Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000); - checkCleanState(); } /** * Validates clean state on all participating nodes after query cancellation. */ - private void checkCleanState() throws IgniteCheckedException { + private void checkCleanState() throws IgniteInterruptedCheckedException { for (int i = 0; i < GRID_CNT; i++) { IgniteEx grid = grid(i); // Validate everything was cleaned up. - ConcurrentMap map = U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field( - grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess"); - - String msg = "Map executor state is not cleared"; + ConcurrentMap map = U.field( + ((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(), "qryRess"); // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query. for (Object result : map.values()) { Map m = U.field(result, "res"); - assertEquals(msg, 0, m.size()); + assertTrue("Map executor state is not cleared", waitForCondition(m::isEmpty, 1_000L)); } } }