Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@
package org.apache.ignite.internal.processors.cache.distributed.near;

import java.io.Serializable;
import java.util.Random;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
Expand Down Expand Up @@ -65,10 +61,10 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
"order by co._key, pr._key ";

/** */
protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
protected static final String QRY_LONG = "select id1 from (select pe.id as id1, co.id, pr._key\n" +
"from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
"where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
"order by pe.id desc";
"where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key\n" +
"order by pe.id) where id1 > sleep(10)";

/** */
protected static final int GRID_CNT = 2;
Expand Down Expand Up @@ -104,6 +100,7 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
cc.setRebalanceMode(SYNC);
cc.setLongQueryWarningTimeout(15_000);
cc.setAffinity(new RendezvousAffinityFunction(false, 60));
cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class);

switch (name) {
case "pe":
Expand Down Expand Up @@ -163,10 +160,8 @@ private void fillCaches() {

IgniteCache<Integer, Product> pr = grid(0).cache("pr");

Random rnd = new GridRandom();

for (int i = 0; i < PRODUCT_CNT; i++)
pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
pr.put(i, new Product(i, i % COMPANY_CNT));

IgniteCache<Integer, Person> pe = grid(0).cache("pe");

Expand All @@ -176,8 +171,8 @@ private void fillCaches() {
IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");

for (int i = 0; i < PURCHASE_CNT; i++) {
int persId = rnd.nextInt(PERS_CNT);
int prodId = rnd.nextInt(PRODUCT_CNT);
int persId = i % PERS_CNT;
int prodId = i % PRODUCT_CNT;

pu.put(i, new Purchase(persId, prodId));
}
Expand Down Expand Up @@ -258,20 +253,4 @@ protected static class Product implements Serializable {
this.companyId = companyId;
}
}

/** */
public static class Functions {
/** */
@QuerySqlFunction
public static int sleep() {
try {
U.sleep(1_000);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}

return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;

import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

/**
* Test for cancel of query containing distributed joins.
*/
Expand Down Expand Up @@ -104,49 +107,40 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti
else {
cursor = cache.query(qry);

ignite.scheduler().runLocal(new Runnable() {
@Override public void run() {
cursor.close();
}
}, timeoutUnits, timeUnit);
ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit);
}

try (QueryCursor<List<?>> ignored = cursor) {
cursor.getAll();
int resSize = F.size(cursor.iterator());

if (checkCanceled)
fail("Query not canceled");
fail("Query not canceled, result size=" + resSize);
}
catch (CacheException ex) {
log().error("Got expected exception", ex);
catch (CacheException | IgniteException ex) {
log().error("Got exception", ex);

assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
}

// Give some time to clean up.
Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000);

checkCleanState();
}

/**
* Validates clean state on all participating nodes after query cancellation.
*/
private void checkCleanState() throws IgniteCheckedException {
private void checkCleanState() throws IgniteInterruptedCheckedException {
for (int i = 0; i < GRID_CNT; i++) {
IgniteEx grid = grid(i);

// Validate everything was cleaned up.
ConcurrentMap<UUID, ?> map = U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field(
grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess");

String msg = "Map executor state is not cleared";
ConcurrentMap<UUID, ?> map = U.field(
((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(), "qryRess");

// TODO FIXME Current implementation leaves map entry for each node that's ever executed a query.
for (Object result : map.values()) {
Map<Long, ?> m = U.field(result, "res");

assertEquals(msg, 0, m.size());
assertTrue("Map executor state is not cleared", waitForCondition(m::isEmpty, 1_000L));
}
}
}
Expand Down
Loading