Skip to content
This repository was archived by the owner on Sep 7, 2023. It is now read-only.

Commit 68ebb2d

Browse files
author
Alex Walker
authored
Expose replica info when getting Cluster databases (typedb#186)
## What is the goal of this PR? We now expose additional information when fetching databases in Cluster. The database name, server address, primary/secondary state, and term number are all available. This is a **breaking change**. `client.databases().all()` now returns `List[Database]` (previously it was `List[str]`). To get the old functionality (listing the names), use `[db.name() for db in client.databases().all()]`. Additionally, `client.databases().delete()` has been deleted, and superseded by the new syntax: `client.databases().get(name).delete()`. ## What are the changes implemented in this PR? - Delete `client.databases().delete`, and add `client.databases().get`, which returns a new `Database` object that has a `delete` method - Add `GraknClientCluster` and `DatabaseManagerCluster`, which return `DatabaseCluster` objects when `all` and `get` are called. `DatabaseCluster` objects expose their database name, server address, primary/secondary state and term number.
1 parent 03bb8f7 commit 68ebb2d

File tree

19 files changed

+410
-238
lines changed

19 files changed

+410
-238
lines changed

dependencies/graknlabs/artifacts.bzl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ def graknlabs_grakn_core_artifacts():
2727
artifact_name = "grakn-core-server-{platform}-{version}.{ext}",
2828
tag_source = deployment["artifact.release"],
2929
commit_source = deployment["artifact.snapshot"],
30-
commit = "4d449aa198fd5cceca54cb3889114ab5aa1b8e5e",
30+
commit = "5f96fb705f8f256e8cb1b918fe85893056c827d2",
3131
)
3232

3333
def graknlabs_grakn_cluster_artifacts():
3434
native_artifact_files(
3535
name = "graknlabs_grakn_cluster_artifact",
3636
group_name = "graknlabs_grakn_cluster",
37-
artifact_name = "grakn-cluster-server-{platform}-{version}.{ext}",
37+
artifact_name = "grakn-cluster-all-{platform}-{version}.{ext}",
3838
tag_source = deployment_private["artifact.release"],
3939
commit_source = deployment_private["artifact.snapshot"],
40-
commit = "55bb7a5bb99fecf6990b0e8ed3220b264f8de452",
40+
commit = "cc3382fe6be239973f1e7d8437a71683e3dff809",
4141
)

grakn/client.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
from grakn.common.exception import GraknClientException # noqa # pylint: disable=unused-import
2929
from grakn.concept.type.value_type import ValueType # noqa # pylint: disable=unused-import
3030
from grakn.options import GraknOptions, GraknClusterOptions
31-
from grakn.rpc.cluster.failsafe_task import FailsafeTask
32-
from grakn.rpc.cluster.replica_info import ReplicaInfo
31+
from grakn.rpc.cluster.failsafe_task import _FailsafeTask
32+
from grakn.rpc.cluster.database import DatabaseCluster, _DatabaseClusterRPC # noqa # pylint: disable=unused-import
3333
from grakn.rpc.cluster.server_address import ServerAddress
34-
from grakn.rpc.cluster.database_manager import _DatabaseManagerClusterRPC
34+
from grakn.rpc.cluster.database_manager import DatabaseManagerCluster, _DatabaseManagerClusterRPC
3535
from grakn.rpc.cluster.session import SessionClusterRPC
3636
from grakn.rpc.database_manager import DatabaseManager, _DatabaseManagerRPC
3737
from grakn.rpc.session import Session, SessionType, _SessionRPC
@@ -46,7 +46,7 @@ def core(address=DEFAULT_ADDRESS) -> "GraknClient":
4646
return _ClientRPC(address)
4747

4848
@staticmethod
49-
def cluster(addresses: List[str]) -> "GraknClient":
49+
def cluster(addresses: List[str]) -> "GraknClientCluster":
5050
return _ClientClusterRPC(addresses)
5151

5252
@abstractmethod
@@ -65,6 +65,10 @@ def is_open(self) -> bool:
6565
def close(self) -> None:
6666
pass
6767

68+
@abstractmethod
69+
def is_cluster(self) -> bool:
70+
pass
71+
6872
@abstractmethod
6973
def __enter__(self):
7074
pass
@@ -74,6 +78,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7478
pass
7579

7680

81+
class GraknClientCluster(GraknClient):
82+
83+
@abstractmethod
84+
def databases(self) -> DatabaseManagerCluster:
85+
pass
86+
87+
7788
class _ClientRPC(GraknClient):
7889

7990
def __init__(self, address: str):
@@ -97,6 +108,9 @@ def close(self):
97108
self._channel.close()
98109
self._is_open = False
99110

111+
def is_cluster(self) -> bool:
112+
return False
113+
100114
def __enter__(self):
101115
return self
102116

@@ -112,13 +126,13 @@ def channel(self):
112126

113127

114128
# _ClientClusterRPC must live in this package because of circular ref with GraknClient
115-
class _ClientClusterRPC(GraknClient):
129+
class _ClientClusterRPC(GraknClientCluster):
116130

117131
def __init__(self, addresses: List[str]):
118132
self._core_clients: Dict[ServerAddress, _ClientRPC] = {addr: _ClientRPC(addr.client()) for addr in self._fetch_cluster_servers(addresses)}
119133
self._grakn_cluster_grpc_stubs = {addr: GraknClusterStub(client.channel()) for (addr, client) in self._core_clients.items()}
120-
self._database_managers = _DatabaseManagerClusterRPC({addr: client.databases() for (addr, client) in self._core_clients.items()})
121-
self._replica_info_map: Dict[str, ReplicaInfo] = {}
134+
self._database_managers = _DatabaseManagerClusterRPC(self, {addr: client.databases() for (addr, client) in self._core_clients.items()})
135+
self._cluster_databases: Dict[str, _DatabaseClusterRPC] = {}
122136
self._is_open = True
123137

124138
def session(self, database: str, session_type: SessionType, options=None) -> Session:
@@ -132,7 +146,7 @@ def _session_primary_replica(self, database: str, session_type: SessionType, opt
132146
def _session_any_replica(self, database: str, session_type: SessionType, options=None) -> SessionClusterRPC:
133147
return _OpenSessionFailsafeTask(database, session_type, options, self).run_any_replica()
134148

135-
def databases(self) -> DatabaseManager:
149+
def databases(self) -> DatabaseManagerCluster:
136150
return self._database_managers
137151

138152
def is_open(self) -> bool:
@@ -143,14 +157,17 @@ def close(self) -> None:
143157
client.close()
144158
self._is_open = False
145159

160+
def is_cluster(self) -> bool:
161+
return True
162+
146163
def __enter__(self):
147164
return self
148165

149166
def __exit__(self, exc_type, exc_val, exc_tb):
150167
self.close()
151168

152-
def replica_info_map(self) -> Dict[str, ReplicaInfo]:
153-
return self._replica_info_map
169+
def cluster_databases(self) -> Dict[str, _DatabaseClusterRPC]:
170+
return self._cluster_databases
154171

155172
def cluster_members(self) -> Set[ServerAddress]:
156173
return set(self._core_clients.keys())
@@ -176,12 +193,12 @@ def _fetch_cluster_servers(self, addresses: List[str]) -> Set[ServerAddress]:
176193
raise GraknClientException("Unable to connect to Grakn Cluster. Attempted connecting to the cluster members, but none are available: %s" % str(addresses))
177194

178195

179-
class _OpenSessionFailsafeTask(FailsafeTask):
196+
class _OpenSessionFailsafeTask(_FailsafeTask):
180197

181198
def __init__(self, database: str, session_type: SessionType, options: GraknClusterOptions, client: "_ClientClusterRPC"):
182199
super().__init__(client, database)
183200
self.session_type = session_type
184201
self.options = options
185202

186-
def run(self, replica: ReplicaInfo.Replica):
203+
def run(self, replica: _DatabaseClusterRPC.Replica):
187204
return SessionClusterRPC(self.client, replica.address(), self.database, self.session_type, self.options)

grakn/grakn_proto_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
#
19-
from typing import Type, Union
19+
from typing import Union
2020

2121
import grakn_protocol.protobuf.options_pb2 as options_proto
2222

grakn/rpc/cluster/database.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
from typing import Dict, Optional, Set
20+
21+
import grakn_protocol.protobuf.cluster.database_pb2 as database_proto
22+
23+
from grakn.rpc.cluster.server_address import ServerAddress
24+
from grakn.rpc.database import DatabaseCluster, _DatabaseRPC
25+
26+
27+
class _DatabaseClusterRPC(DatabaseCluster):
28+
29+
def __init__(self, database_manager_cluster, database: str):
30+
self._databases: Dict[ServerAddress, _DatabaseRPC] = {}
31+
for address in database_manager_cluster.database_managers():
32+
database_manager = database_manager_cluster.database_managers()[address]
33+
self._databases[address] = _DatabaseRPC(database_manager, name=database)
34+
self._name = database
35+
self._database_manager_cluster = database_manager_cluster
36+
self._replicas: Set["_DatabaseClusterRPC.Replica"] = set()
37+
38+
@staticmethod
39+
def of(proto_db: database_proto.Database, database_manager_cluster) -> "_DatabaseClusterRPC":
40+
assert proto_db.replicas
41+
database: str = proto_db.name
42+
database_cluster_rpc = _DatabaseClusterRPC(database_manager_cluster, database)
43+
for proto_replica in proto_db.replicas:
44+
database_cluster_rpc.replicas().add(_DatabaseClusterRPC.Replica.of(proto_replica, database_cluster_rpc))
45+
print("Discovered database cluster: " + str(database_cluster_rpc))
46+
return database_cluster_rpc
47+
48+
def primary_replica(self) -> Optional["_DatabaseClusterRPC.Replica"]:
49+
primaries = [replica for replica in self._replicas if replica.is_primary()]
50+
return max(primaries, key=lambda r: r.term) if primaries else None
51+
52+
def preferred_secondary_replica(self) -> "_DatabaseClusterRPC.Replica":
53+
return next(iter([replica for replica in self._replicas if replica.is_preferred_secondary()]), next(iter(self._replicas)))
54+
55+
def name(self) -> str:
56+
return self._name
57+
58+
def delete(self) -> None:
59+
for address in self._databases:
60+
if self._database_manager_cluster.database_managers()[address].contains(self._name):
61+
self._databases[address].delete()
62+
63+
def replicas(self):
64+
return self._replicas
65+
66+
def __str__(self):
67+
return self._name
68+
69+
class Replica:
70+
71+
def __init__(self, database: "_DatabaseClusterRPC", address: ServerAddress, term: int, is_primary: bool, is_preferred_secondary: bool):
72+
self._database = database
73+
self._replica_id = _DatabaseClusterRPC.Replica.Id(address, database.name())
74+
self._term = term
75+
self._is_primary = is_primary
76+
self._is_preferred_secondary = is_preferred_secondary
77+
78+
@staticmethod
79+
def of(proto_replica: database_proto.Database.Replica, database: "_DatabaseClusterRPC") -> "_DatabaseClusterRPC.Replica":
80+
return _DatabaseClusterRPC.Replica(database, ServerAddress.parse(proto_replica.address), proto_replica.term,
81+
proto_replica.primary, proto_replica.preferred_secondary)
82+
83+
def replica_id(self) -> "_DatabaseClusterRPC.Replica.Id":
84+
return self._replica_id
85+
86+
def term(self) -> int:
87+
return self._term
88+
89+
def is_primary(self) -> bool:
90+
return self._is_primary
91+
92+
def is_preferred_secondary(self) -> bool:
93+
return self._is_preferred_secondary
94+
95+
def address(self) -> ServerAddress:
96+
return self._replica_id.address()
97+
98+
def __eq__(self, other):
99+
if self is other:
100+
return True
101+
if not other or type(self) != type(other):
102+
return False
103+
return self._replica_id == other.replica_id() and self._term == other.term() and self._is_primary == other.is_primary() and self._is_preferred_secondary == other.is_preferred_secondary()
104+
105+
def __hash__(self):
106+
return hash((self._replica_id, self._is_primary, self._is_preferred_secondary, self._term))
107+
108+
def __str__(self):
109+
return "%s:%s:%d" % (str(self._replica_id), "P" if self._is_primary else "S", self._term)
110+
111+
class Id:
112+
113+
def __init__(self, address: ServerAddress, database: str):
114+
self._address = address
115+
self._database = database
116+
117+
def address(self) -> ServerAddress:
118+
return self._address
119+
120+
def database(self) -> str:
121+
return self._database
122+
123+
def __eq__(self, other):
124+
if self is other:
125+
return True
126+
if not other or type(self) != type(other):
127+
return False
128+
return self._address == other.address() and self._database == other.database()
129+
130+
def __hash__(self):
131+
return hash((self._address, self._database))
132+
133+
def __str__(self):
134+
return "%s/%s" % (str(self._address), self._database)

grakn/rpc/cluster/database_manager.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,33 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
#
19-
19+
from abc import abstractmethod
2020
from typing import Dict, List
2121

22+
import grakn_protocol.protobuf.cluster.database_pb2 as database_proto
23+
2224
from grakn.common.exception import GraknClientException
25+
from grakn.rpc.cluster.database import _DatabaseClusterRPC
2326
from grakn.rpc.cluster.server_address import ServerAddress
27+
from grakn.rpc.database import DatabaseCluster
2428
from grakn.rpc.database_manager import DatabaseManager, _DatabaseManagerRPC
2529

2630

27-
class _DatabaseManagerClusterRPC(DatabaseManager):
31+
class DatabaseManagerCluster(DatabaseManager):
32+
33+
@abstractmethod
34+
def get(self, name: str) -> DatabaseCluster:
35+
pass
36+
37+
@abstractmethod
38+
def all(self) -> List[DatabaseCluster]:
39+
pass
2840

29-
def __init__(self, database_managers: Dict[ServerAddress, "_DatabaseManagerRPC"]):
30-
assert database_managers
41+
42+
class _DatabaseManagerClusterRPC(DatabaseManagerCluster):
43+
44+
def __init__(self, client, database_managers: Dict[ServerAddress, "_DatabaseManagerRPC"]):
45+
self._client = client
3146
self._database_managers = database_managers
3247

3348
def contains(self, name: str) -> bool:
@@ -44,16 +59,27 @@ def create(self, name: str) -> None:
4459
if not database_manager.contains(name):
4560
database_manager.create(name)
4661

47-
def delete(self, name: str) -> None:
48-
for database_manager in self._database_managers.values():
49-
if database_manager.contains(name):
50-
database_manager.delete(name)
62+
def get(self, name: str) -> DatabaseCluster:
63+
errors = []
64+
for address in self._database_managers:
65+
try:
66+
database_get_req = database_proto.Database.Get.Req()
67+
database_get_req.name = name
68+
res = self._client.grakn_cluster_grpc_stub(address).database_get(database_get_req)
69+
return _DatabaseClusterRPC.of(res.database, self)
70+
except GraknClientException as e:
71+
errors.append(e)
72+
raise GraknClientException("Attempted connecting to all cluster members, but the following errors occurred: " + str([str(e) for e in errors]))
5173

52-
def all(self) -> List[str]:
74+
def all(self) -> List[DatabaseCluster]:
5375
errors = []
54-
for database_manager in self._database_managers.values():
76+
for address in self._database_managers:
5577
try:
56-
return database_manager.all()
78+
res = self._client.grakn_cluster_grpc_stub(address).database_all(database_proto.Database.All.Req())
79+
return [_DatabaseClusterRPC.of(db, self) for db in res.databases]
5780
except GraknClientException as e:
5881
errors.append(e)
5982
raise GraknClientException("Attempted connecting to all cluster members, but the following errors occurred: " + str([str(e) for e in errors]))
83+
84+
def database_managers(self) -> Dict[ServerAddress, _DatabaseManagerRPC]:
85+
return self._database_managers

0 commit comments

Comments
 (0)