Skip to content
This repository was archived by the owner on Sep 7, 2023. It is now read-only.

Commit 85803e7

Browse files
author
Alex Walker
authored
Make DatabaseManager more fault tolerant in Cluster (typedb#178)
## What is the goal of this PR? We've introduced a couple of fault tolerance improvements into the Grakn Cluster version of DatabaseManager: - `contains` and `all` will now try to retrieve information from every node, instead of failing if the first node is down - `create` and `delete` will now be applied to every working node, even if some nodes are down. For now, this means `create` will no longer throw if the database already exists, and `delete` will no longer throw if the database doesn't exist. ## What are the changes implemented in this PR? - `contains` and `all` will now try to retrieve information from every node, instead of failing if the first node is down - `create` and `delete` will now be applied to every working node, even if some nodes are down.
1 parent d9305f7 commit 85803e7

File tree

9 files changed

+44
-22
lines changed

9 files changed

+44
-22
lines changed

dependencies/graknlabs/artifacts.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def graknlabs_grakn_core_artifacts():
2727
artifact_name = "grakn-core-server-{platform}-{version}.{ext}",
2828
tag_source = deployment["artifact.release"],
2929
commit_source = deployment["artifact.snapshot"],
30-
commit = "4d449aa198fd5cceca54cb3889114ab5aa1b8e5e",
30+
commit = "3227b9e07c9c2317c0e7eab29259204f92433d76",
3131
)
3232

3333
def graknlabs_grakn_cluster_artifacts():
@@ -37,5 +37,5 @@ def graknlabs_grakn_cluster_artifacts():
3737
artifact_name = "grakn-cluster-server-{platform}-{version}.{ext}",
3838
tag_source = deployment_private["artifact.release"],
3939
commit_source = deployment_private["artifact.snapshot"],
40-
commit = "93cbc149d7b9ce588e52d8132c8a0b2265658a3c",
40+
commit = "892073639b024904a413154011fb42d9dae090f3",
4141
)

dependencies/graknlabs/repositories.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ def graknlabs_behaviour():
3737
git_repository(
3838
name = "graknlabs_behaviour",
3939
remote = "https://github.com/graknlabs/behaviour",
40-
commit = "5a3b731b3ef154b5b1bd95b788dc374bd8873746" # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_behaviour
40+
commit = "9f1cf29952dddaaee96a9ce3b982a8e4d6d45c48" # sync-marker: do not remove this comment, this is used for sync-dependencies by @graknlabs_behaviour
4141
)

grakn/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def core(address=DEFAULT_ADDRESS) -> "GraknClient":
4444
return _RPCGraknClient(address)
4545

4646
@staticmethod
47-
def cluster(address=DEFAULT_ADDRESS) -> "GraknClient":
48-
return _RPCGraknClientCluster(address)
47+
def cluster(addresses: List[str]) -> "GraknClient":
48+
return _RPCGraknClientCluster(addresses)
4949

5050
@abstractmethod
5151
def session(self, database: str, session_type: SessionType, options: GraknOptions = None) -> Session:
@@ -112,8 +112,8 @@ def channel(self):
112112
# _RPCGraknClientCluster must live in this package because of circular ref with GraknClient
113113
class _RPCGraknClientCluster(GraknClient):
114114

115-
def __init__(self, address: str):
116-
self._core_clients: Dict[Address.Server, _RPCGraknClient] = {addr: _RPCGraknClient(addr.client()) for addr in self._discover_cluster([address])}
115+
def __init__(self, addresses: List[str]):
116+
self._core_clients: Dict[Address.Server, _RPCGraknClient] = {addr: _RPCGraknClient(addr.client()) for addr in self._discover_cluster(addresses)}
117117
self._grakn_cluster_grpc_stubs = {addr: GraknClusterStub(client.channel()) for (addr, client) in self._core_clients.items()}
118118
self._databases = _RPCDatabaseManagerCluster({addr: client.databases() for (addr, client) in self._core_clients.items()})
119119
self._is_open = True

grakn/rpc/cluster/database_manager.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from typing import Dict, List
2121

22+
from grakn.common.exception import GraknClientException
2223
from grakn.rpc.cluster.address import Address
2324
from grakn.rpc.database_manager import DatabaseManager, _RPCDatabaseManager
2425

@@ -30,15 +31,29 @@ def __init__(self, database_managers: Dict[Address.Server, "_RPCDatabaseManager"
3031
self._database_managers = database_managers
3132

3233
def contains(self, name: str) -> bool:
33-
return next(iter(self._database_managers.values())).contains(name)
34+
errors = []
35+
for database_manager in self._database_managers.values():
36+
try:
37+
return database_manager.contains(name)
38+
except GraknClientException as e:
39+
errors.append(e)
40+
raise GraknClientException("Attempted connecting to all cluster members, but the following errors occurred: " + str([str(e) for e in errors]))
3441

3542
def create(self, name: str) -> None:
3643
for database_manager in self._database_managers.values():
37-
database_manager.create(name)
44+
if not database_manager.contains(name):
45+
database_manager.create(name)
3846

3947
def delete(self, name: str) -> None:
4048
for database_manager in self._database_managers.values():
41-
database_manager.delete(name)
49+
if database_manager.contains(name):
50+
database_manager.delete(name)
4251

4352
def all(self) -> List[str]:
44-
return next(iter(self._database_managers.values())).all()
53+
errors = []
54+
for database_manager in self._database_managers.values():
55+
try:
56+
return database_manager.all()
57+
except GraknClientException as e:
58+
errors.append(e)
59+
raise GraknClientException("Attempted connecting to all cluster members, but the following errors occurred: " + str([str(e) for e in errors]))

tests/behaviour/background/cluster/environment.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@
2222
from tests.behaviour.context import Context
2323

2424

25+
IGNORE_TAGS = ["ignore", "ignore-client-python", "ignore-cluster"]
26+
27+
2528
def before_all(context: Context):
2629
environment_base.before_all(context)
27-
context.client = GraknClient.cluster()
30+
context.client = GraknClient.cluster([GraknClient.DEFAULT_ADDRESS])
2831

2932

3033
def before_scenario(context: Context, scenario):
34+
for tag in IGNORE_TAGS:
35+
if tag in scenario.effective_tags:
36+
scenario.skip("tagged with @" + tag)
37+
return
3138
environment_base.before_scenario(context, scenario)
3239

3340

tests/behaviour/background/core/environment.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@
2222
from tests.behaviour.context import Context
2323

2424

25+
IGNORE_TAGS = ["ignore", "ignore-client-python", "ignore-core"]
26+
27+
2528
def before_all(context: Context):
2629
environment_base.before_all(context)
2730
context.client = GraknClient.core()
2831

2932

3033
def before_scenario(context: Context, scenario):
34+
for tag in IGNORE_TAGS:
35+
if tag in scenario.effective_tags:
36+
scenario.skip("tagged with @" + tag)
37+
return
3138
environment_base.before_scenario(context, scenario)
3239

3340

tests/behaviour/background/environment_base.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,13 @@
2525

2626
import time
2727

28-
IGNORE_TAGS = ["ignore", "ignore-client-python"]
29-
3028

3129
def before_all(context: Context):
3230
context.THREAD_POOL_SIZE = 32
3331
context.client = GraknClient.core()
3432

3533

3634
def before_scenario(context: Context, scenario):
37-
for tag in IGNORE_TAGS:
38-
if tag in scenario.effective_tags:
39-
scenario.skip("tagged with @" + tag)
40-
return
41-
4235
for database in context.client.databases().all():
4336
context.client.databases().delete(database)
4437
context.sessions = []

tests/integration/test_cluster_failover.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
class TestClusterFailover(TestCase):
3333

3434
def setUp(self):
35-
with GraknClient.cluster("localhost:11729") as client:
35+
with GraknClient.cluster(["localhost:11729", "localhost:21729", "localhost:31729"]) as client:
3636
if "grakn" in client.databases().all():
3737
client.databases().delete("grakn")
3838
client.databases().create("grakn")
@@ -56,7 +56,7 @@ def get_primary_replica(self):
5656
return self.get_primary_replica()
5757

5858
def test_put_entity_type_to_crashed_primary_replica(self):
59-
with GraknClient.cluster("localhost:11729") as client:
59+
with GraknClient.cluster(["localhost:11729", "localhost:21729", "localhost:31729"]) as client:
6060
assert client.databases().contains("grakn")
6161
primary_replica = self.get_primary_replica()
6262
print("Performing operations against the primary replica " + str(primary_replica))

tools/behave_rule.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def _rule_implementation(ctx):
6464
echo Starting Grakn Server
6565
mkdir ./grakn_distribution/"$DIRECTORY"/grakn_test
6666
./grakn_distribution/"$DIRECTORY"/grakn server --data grakn_test &
67-
sleep 8
67+
sleep 9
6868
6969
"""
7070
# TODO: If two step files have the same name, we should rename the second one to prevent conflict

0 commit comments

Comments
 (0)