Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 5594249

Browse files
author
Alex Walker
authored
Bind Cluster sessions to single server nodes, and add naive load balancing (#183)
## What is the goal of this PR? Previously, each Cluster session would lazily create Core sessions on-demand as Transactions were opened. This delegated too much responsibility to the Transaction. Now, each Cluster session is bound to a single server node, which may be a primary or a secondary node depending on the Options passed to the Session. We also select secondary nodes in a fairer, more balanced way; the server indicates which node is its first preference for secondary sessions, and the client tries to obey the server's decision, with fallback to backup nodes only in the event of a failure. ## What are the changes implemented in this PR? - On creating a Cluster session, a Core session is immediately created to a server node of the appropriate type - Node selection is based on the `read_any_replica` Option passed to the Session - When `read_any_replica` is `true`, the client will first select nodes that the server has indicated are "preferred secondary", with fallback to backup nodes only in the event of a failure
1 parent 71816c5 commit 5594249

File tree

8 files changed

+222
-131
lines changed

8 files changed

+222
-131
lines changed

dependencies/graknlabs/artifacts.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def graknlabs_grakn_core_artifacts():
2727
artifact_name = "grakn-core-server-{platform}-{version}.{ext}",
2828
tag_source = deployment["artifact.release"],
2929
commit_source = deployment["artifact.snapshot"],
30-
commit = "3227b9e07c9c2317c0e7eab29259204f92433d76",
30+
commit = "4d449aa198fd5cceca54cb3889114ab5aa1b8e5e",
3131
)
3232

3333
def graknlabs_grakn_cluster_artifacts():
@@ -37,5 +37,5 @@ def graknlabs_grakn_cluster_artifacts():
3737
artifact_name = "grakn-cluster-server-{platform}-{version}.{ext}",
3838
tag_source = deployment_private["artifact.release"],
3939
commit_source = deployment_private["artifact.snapshot"],
40-
commit = "93cbc149d7b9ce588e52d8132c8a0b2265658a3c",
40+
commit = "55bb7a5bb99fecf6990b0e8ed3220b264f8de452",
4141
)

grakn/client.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
# Repackaging these symbols allows them to be imported from "grakn.client"
2828
from grakn.common.exception import GraknClientException # noqa # pylint: disable=unused-import
2929
from grakn.concept.type.value_type import ValueType # noqa # pylint: disable=unused-import
30-
from grakn.options import GraknOptions
30+
from grakn.options import GraknOptions, GraknClusterOptions
31+
from grakn.rpc.cluster.failsafe_task import FailsafeTask
32+
from grakn.rpc.cluster.replica_info import ReplicaInfo
3133
from grakn.rpc.cluster.server_address import ServerAddress
3234
from grakn.rpc.cluster.database_manager import _DatabaseManagerClusterRPC
33-
from grakn.rpc.cluster.session import _SessionClusterRPC
35+
from grakn.rpc.cluster.session import SessionClusterRPC
3436
from grakn.rpc.database_manager import DatabaseManager, _DatabaseManagerRPC
3537
from grakn.rpc.session import Session, SessionType, _SessionRPC
3638
from grakn.rpc.transaction import TransactionType # noqa # pylint: disable=unused-import
@@ -109,22 +111,29 @@ def channel(self):
109111
return self._channel
110112

111113

112-
# _RPCGraknClientCluster must live in this package because of circular ref with GraknClient
114+
# _ClientClusterRPC must live in this package because of circular ref with GraknClient
113115
class _ClientClusterRPC(GraknClient):
114116

115117
def __init__(self, addresses: List[str]):
116-
self._core_clients: Dict[ServerAddress, _ClientRPC] = {addr: _ClientRPC(addr.client()) for addr in self._discover_cluster(addresses)}
118+
self._core_clients: Dict[ServerAddress, _ClientRPC] = {addr: _ClientRPC(addr.client()) for addr in self._fetch_cluster_servers(addresses)}
117119
self._grakn_cluster_grpc_stubs = {addr: GraknClusterStub(client.channel()) for (addr, client) in self._core_clients.items()}
118-
self._databases = _DatabaseManagerClusterRPC({addr: client.databases() for (addr, client) in self._core_clients.items()})
120+
self._database_managers = _DatabaseManagerClusterRPC({addr: client.databases() for (addr, client) in self._core_clients.items()})
121+
self._replica_info_map: Dict[str, ReplicaInfo] = {}
119122
self._is_open = True
120123

121124
def session(self, database: str, session_type: SessionType, options=None) -> Session:
122125
if not options:
123126
options = GraknOptions.cluster()
124-
return _SessionClusterRPC(self, database, session_type, options)
127+
return self._session_any_replica(database, session_type, options) if options.read_any_replica else self._session_primary_replica(database, session_type, options)
128+
129+
def _session_primary_replica(self, database: str, session_type: SessionType, options=None) -> SessionClusterRPC:
130+
return _OpenSessionFailsafeTask(database, session_type, options, self).run_primary_replica()
131+
132+
def _session_any_replica(self, database: str, session_type: SessionType, options=None) -> SessionClusterRPC:
133+
return _OpenSessionFailsafeTask(database, session_type, options, self).run_any_replica()
125134

126135
def databases(self) -> DatabaseManager:
127-
return self._databases
136+
return self._database_managers
128137

129138
def is_open(self) -> bool:
130139
return self._is_open
@@ -140,6 +149,9 @@ def __enter__(self):
140149
def __exit__(self, exc_type, exc_val, exc_tb):
141150
self.close()
142151

152+
def replica_info_map(self) -> Dict[str, ReplicaInfo]:
153+
return self._replica_info_map
154+
143155
def cluster_members(self) -> Set[ServerAddress]:
144156
return set(self._core_clients.keys())
145157

@@ -149,16 +161,27 @@ def core_client(self, address: ServerAddress) -> _ClientRPC:
149161
def grakn_cluster_grpc_stub(self, address: ServerAddress) -> GraknClusterStub:
150162
return self._grakn_cluster_grpc_stubs.get(address)
151163

152-
def _discover_cluster(self, addresses: List[str]) -> Set[ServerAddress]:
164+
def _fetch_cluster_servers(self, addresses: List[str]) -> Set[ServerAddress]:
153165
for address in addresses:
154166
try:
155167
with _ClientRPC(address) as client:
156168
print("Performing cluster discovery to %s..." % address)
157169
grakn_cluster_stub = GraknClusterStub(client.channel())
158-
res = grakn_cluster_stub.cluster_discover(cluster_proto.Cluster.Discover.Req())
170+
res = grakn_cluster_stub.cluster_servers(cluster_proto.Cluster.Servers.Req())
159171
members = set([ServerAddress.parse(srv) for srv in res.servers])
160172
print("Discovered %s" % [str(member) for member in members])
161173
return members
162-
except RpcError:
163-
print("Cluster discovery to %s failed." % address)
174+
except RpcError as e:
175+
print("Cluster discovery to %s failed. %s" % (address, str(e)))
164176
raise GraknClientException("Unable to connect to Grakn Cluster. Attempted connecting to the cluster members, but none are available: %s" % str(addresses))
177+
178+
179+
class _OpenSessionFailsafeTask(FailsafeTask):
180+
181+
def __init__(self, database: str, session_type: SessionType, options: GraknClusterOptions, client: "_ClientClusterRPC"):
182+
super().__init__(client, database)
183+
self.session_type = session_type
184+
self.options = options
185+
186+
def run(self, replica: ReplicaInfo.Replica):
187+
return SessionClusterRPC(self.client, replica.address(), self.database, self.session_type, self.options)

grakn/rpc/cluster/failsafe_task.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
import time
20+
from abc import ABC, abstractmethod
21+
22+
import grakn_protocol.protobuf.cluster.database_pb2 as database_proto
23+
from grpc import RpcError, StatusCode
24+
25+
from grakn.common.exception import GraknClientException
26+
from grakn.rpc.cluster.replica_info import ReplicaInfo
27+
28+
29+
class FailsafeTask(ABC):
30+
31+
PRIMARY_REPLICA_TASK_MAX_RETRIES = 10
32+
FETCH_REPLICAS_MAX_RETRIES = 10
33+
WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2
34+
35+
def __init__(self, client, database: str):
36+
self.client = client
37+
self.database = database
38+
39+
@abstractmethod
40+
def run(self, replica: ReplicaInfo.Replica):
41+
pass
42+
43+
def rerun(self, replica: ReplicaInfo.Replica):
44+
return self.run(replica)
45+
46+
def run_primary_replica(self):
47+
if self.database not in self.client.replica_info_map() or not self.client.replica_info_map()[self.database].primary_replica():
48+
self._seek_primary_replica()
49+
replica = self.client.replica_info_map()[self.database].primary_replica()
50+
retries = 0
51+
while True:
52+
try:
53+
return self.run(replica) if retries == 0 else self.rerun(replica)
54+
except GraknClientException as e:
55+
# TODO: propagate exception from the server in a less brittle way
56+
if "[RPL01]" in str(e): # The contacted replica reported that it was not the primary replica
57+
print("Unable to open a session or transaction, retrying in 2s... %s" % str(e))
58+
time.sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
59+
replica = self._seek_primary_replica()
60+
else:
61+
raise e
62+
# TODO: introduce a special type that extends RpcError and Call
63+
except RpcError as e:
64+
# TODO: this logic should be extracted into GraknClientException
65+
# TODO: error message should be checked in a less brittle way
66+
if e.code() == StatusCode.UNAVAILABLE or "[INT07]" in str(e) or "Received RST_STREAM" in str(e):
67+
print("Unable to open a session or transaction, retrying in 2s... %s" % str(e))
68+
time.sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
69+
replica = self._seek_primary_replica()
70+
else:
71+
raise e
72+
retries += 1
73+
if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES:
74+
raise self._cluster_not_available_exception()
75+
76+
def run_any_replica(self):
77+
if self.database in self.client.replica_info_map():
78+
replica_info = self.client.replica_info_map()[self.database]
79+
else:
80+
replica_info = self._fetch_database_replicas()
81+
82+
replicas = [replica_info.preferred_secondary_replica()] + [replica for replica in replica_info.replicas() if not replica.is_preferred_secondary()]
83+
retries = 0
84+
for replica in replicas:
85+
try:
86+
return self.run(replica) if retries == 0 else self.rerun(replica)
87+
except RpcError as e:
88+
if e.code() == StatusCode.UNAVAILABLE or "[INT07]" in str(e) or "Received RST_STREAM" in str(e):
89+
print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e)))
90+
else:
91+
raise e
92+
retries += 1
93+
raise self._cluster_not_available_exception()
94+
95+
def _seek_primary_replica(self) -> ReplicaInfo.Replica:
96+
retries = 0
97+
while retries < self.FETCH_REPLICAS_MAX_RETRIES:
98+
replica_info = self._fetch_database_replicas()
99+
if replica_info.primary_replica():
100+
return replica_info.primary_replica()
101+
else:
102+
time.sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
103+
retries += 1
104+
raise self._cluster_not_available_exception()
105+
106+
def _fetch_database_replicas(self) -> ReplicaInfo:
107+
for server_address in self.client.cluster_members():
108+
try:
109+
print("Fetching replica info from %s" % server_address)
110+
db_replicas_req = database_proto.Database.Replicas.Req()
111+
db_replicas_req.database = self.database
112+
res = self.client.grakn_cluster_grpc_stub(server_address).database_replicas(db_replicas_req)
113+
replica_info = ReplicaInfo.of_proto(res)
114+
print("Requested database discovery from peer %s, and got response: %s" % (str(server_address), str([str(replica) for replica in replica_info.replicas()])))
115+
self.client.replica_info_map()[self.database] = replica_info
116+
return replica_info
117+
except RpcError as e:
118+
print("Unable to perform database discovery to %s. Attempting next address. %s" % (str(server_address), str(e)))
119+
raise self._cluster_not_available_exception()
120+
121+
def _cluster_not_available_exception(self) -> GraknClientException:
122+
addresses = str([str(addr) for addr in self.client.cluster_members()])
123+
return GraknClientException("Unable to connect to Grakn Cluster. Attempted connecting to the cluster members, but none are available: '%s'" % addresses)

grakn/rpc/cluster/replica_info.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, replicas: Dict["ReplicaInfo.Replica.Id", "ReplicaInfo.Replica
3030
self._replicas = replicas
3131

3232
@staticmethod
33-
def of_proto(res: database_proto.Database.Discover.Res) -> "ReplicaInfo":
33+
def of_proto(res: database_proto.Database.Replicas.Res) -> "ReplicaInfo":
3434
replica_map: Dict["ReplicaInfo.Replica.Id", "ReplicaInfo.Replica"] = {}
3535
for replica_proto in res.replicas:
3636
replica_id = ReplicaInfo.Replica.Id(ServerAddress.parse(replica_proto.address), replica_proto.database)
@@ -41,22 +41,30 @@ def primary_replica(self) -> Optional["ReplicaInfo.Replica"]:
4141
primaries = [replica for replica in self._replicas.values() if replica.is_primary()]
4242
return max(primaries, key=lambda r: r.term) if primaries else None
4343

44+
def preferred_secondary_replica(self) -> "ReplicaInfo.Replica":
45+
return next(iter([replica for replica in self._replicas.values() if replica.is_preferred_secondary()]), next(iter(self._replicas.values())))
46+
4447
def replicas(self):
4548
return self._replicas.values()
4649

50+
def __str__(self):
51+
return str([str(replica) for replica in self._replicas.values()])
52+
4753
class Replica:
4854

49-
def __init__(self, replica_id: "ReplicaInfo.Replica.Id", term: int, is_primary: bool):
55+
def __init__(self, replica_id: "ReplicaInfo.Replica.Id", term: int, is_primary: bool, is_preferred_secondary: bool):
5056
self._replica_id = replica_id
5157
self._term = term
5258
self._is_primary = is_primary
59+
self._is_preferred_secondary = is_preferred_secondary
5360

5461
@staticmethod
55-
def of_proto(replica_proto: database_proto.Database.Discover.Res.Replica) -> "ReplicaInfo.Replica":
62+
def of_proto(replica_proto: database_proto.Database.Replica) -> "ReplicaInfo.Replica":
5663
return ReplicaInfo.Replica(
5764
replica_id=ReplicaInfo.Replica.Id(ServerAddress.parse(replica_proto.address), replica_proto.database),
5865
term=replica_proto.term,
59-
is_primary=replica_proto.is_primary
66+
is_primary=replica_proto.primary,
67+
is_preferred_secondary=replica_proto.preferred_secondary
6068
)
6169

6270
def replica_id(self) -> "ReplicaInfo.Replica.Id":
@@ -68,15 +76,21 @@ def term(self) -> int:
6876
def is_primary(self) -> bool:
6977
return self._is_primary
7078

79+
def is_preferred_secondary(self) -> bool:
80+
return self._is_preferred_secondary
81+
82+
def address(self) -> ServerAddress:
83+
return self._replica_id.address()
84+
7185
def __eq__(self, other):
7286
if self is other:
7387
return True
7488
if not other or type(self) != type(other):
7589
return False
76-
return self._term == other.term() and self._is_primary == other.is_primary()
90+
return self._term == other.term() and self._is_primary == other.is_primary() and self._is_preferred_secondary == other.is_preferred_secondary()
7791

7892
def __hash__(self):
79-
return hash((self._is_primary, self._term))
93+
return hash((self._is_primary, self._is_preferred_secondary, self._term))
8094

8195
def __str__(self):
8296
return "%s:%s:%d" % (str(self._replica_id), "P" if self._is_primary else "S", self._term)

0 commit comments

Comments
 (0)