Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 6af81c2

Browse files
author
Alex Walker
authored
Fault-tolerant Cluster database operations (#224)
## What is the goal of this PR? We improved the fault tolerance of our Cluster database operations, namely Create, Contains and Delete. ## What are the changes implemented in this PR? Make database operations "Create, Contains, Delete" fault tolerant
1 parent efa1c66 commit 6af81c2

File tree

9 files changed

+163
-160
lines changed

9 files changed

+163
-160
lines changed

tests/behaviour/typeql/typeql_steps.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,3 +474,10 @@ def step_impl(context: Context):
474474
for answer in context.answers:
475475
query = apply_query_template(template=context.text, answer=answer)
476476
assert_that(list(context.tx().query().match(query)), has_length(1))
477+
478+
479+
@step("each answer does not satisfy")
480+
def step_impl(context: Context):
481+
for answer in context.answers:
482+
query = apply_query_template(template=context.text, answer=answer)
483+
assert_that(list(context.tx().query().match(query)), has_length(0))

tests/integration/test_cluster_failover.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ def test_put_entity_type_to_crashed_primary_replica(self):
7979
print("Primary replica is hosted by server with PID %s" % primary_replica_server_pid)
8080
subprocess.check_call(["kill", "-9", primary_replica_server_pid])
8181
print("Primary replica stopped successfully.")
82-
sleep(0.5)
82+
sleep(5) # TODO: This ensures the server is actually shut down, but it's odd that it needs to be so long
8383
with client.session("typedb", SCHEMA) as session, session.transaction(READ) as tx:
8484
person = tx.concepts().get_entity_type("person")
8585
print("Retrieved entity type with label '%s' from new primary replica." % person.get_label())
8686
assert person.get_label().name() == "person"
8787
idx = str(primary_replica.address())[10]
88-
subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730" % (idx, idx), "--peer", "127.0.0.1:11729:11730", "--peer", "127.0.0.1:21729:21730", "--peer", "127.0.0.1:31729:31730"])
88+
subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730:%s1731" % (idx, idx, idx), "--peer", "127.0.0.1:11729:11730:11731", "--peer", "127.0.0.1:21729:21730:21731", "--peer", "127.0.0.1:31729:31730:31731"])
8989
lsof = None
9090
live_check_iteration = 0
9191
while not lsof and live_check_iteration < 60:
@@ -94,7 +94,6 @@ def test_put_entity_type_to_crashed_primary_replica(self):
9494
lsof = subprocess.check_output(["lsof", "-i", ":%s" % port])
9595
except subprocess.CalledProcessError:
9696
pass
97-
sleep(0.5)
9897

9998

10099
if __name__ == "__main__":

tools/behave_rule.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def _rule_implementation(ctx):
8383
if [[ $PRODUCT == "Core" ]]; then
8484
./typedb_distribution/"$DIRECTORY"/typedb server --port $PORT --data typedb_test &
8585
else
86-
./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1))" --data typedb_test &
86+
./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1)):$(($PORT+2))" --data typedb_test &
8787
fi
8888
8989
POLL_INTERVAL_SECS=0.5

tools/cluster_test_rule.bzl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ def _rule_implementation(ctx):
5656
echo Successfully unarchived TypeDB distribution. Creating 3 copies.
5757
cp -r typedb_distribution/$TYPEDB/ 1 && cp -r typedb_distribution/$TYPEDB/ 2 && cp -r typedb_distribution/$TYPEDB/ 3
5858
echo Starting 3 TypeDB servers.
59-
./1/typedb server --data server/data --address 127.0.0.1:11729:11730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
60-
./2/typedb server --data server/data --address 127.0.0.1:21729:21730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
61-
./3/typedb server --data server/data --address 127.0.0.1:31729:31730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
59+
./1/typedb server --data server/data --address 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &
60+
./2/typedb server --data server/data --address 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &
61+
./3/typedb server --data server/data --address 127.0.0.1:31729:31730:31731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &
6262
6363
POLL_INTERVAL_SECS=0.5
6464
MAX_RETRIES=60
@@ -88,7 +88,7 @@ def _rule_implementation(ctx):
8888
cmd += """
8989
echo Tests concluded with exit value $RESULT
9090
echo Stopping servers.
91-
kill $(jps | awk '/TypeDBServer/ {print $1}' | paste -sd " " -)
91+
kill $(jps | awk '/TypeDBNode/ {print $1}' | paste -sd " " -)
9292
exit $RESULT
9393
"""
9494

typedb/cluster/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
from typedb.api.client import TypeDBClusterClient
2424
from typedb.api.options import TypeDBOptions, TypeDBClusterOptions
2525
from typedb.api.session import SessionType
26-
from typedb.cluster.database import _ClusterDatabase
26+
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask
2727
from typedb.cluster.database_manager import _ClusterDatabaseManager
28-
from typedb.cluster.failsafe_task import _FailsafeTask
2928
from typedb.cluster.session import _ClusterSession
3029
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT
3130
from typedb.common.rpc.request_builder import cluster_server_manager_all_req

typedb/cluster/database.py

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,39 @@
1818
# specific language governing permissions and limitations
1919
# under the License.
2020
#
21+
from abc import ABC, abstractmethod
22+
from time import sleep
2123
from typing import Dict, Optional, Set, TYPE_CHECKING
2224

2325
import typedb_protocol.cluster.cluster_database_pb2 as cluster_database_proto
2426

2527
from typedb.api.database import ClusterDatabase
28+
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_REPLICA_NOT_PRIMARY, \
29+
CLUSTER_UNABLE_TO_CONNECT
30+
from typedb.common.rpc.request_builder import cluster_database_manager_get_req
2631
from typedb.core.database import _CoreDatabase
2732

2833
if TYPE_CHECKING:
29-
from typedb.cluster.database_manager import _ClusterDatabaseManager
34+
from typedb.cluster.client import _ClusterClient
3035

3136

3237
class _ClusterDatabase(ClusterDatabase):
3338

34-
def __init__(self, database: str, cluster_database_mgr: "_ClusterDatabaseManager"):
39+
def __init__(self, database: str, client: "_ClusterClient"):
3540
self._name = database
36-
self._database_mgr = cluster_database_mgr
41+
self._client = client
3742
self._databases: Dict[str, _CoreDatabase] = {}
3843
self._replicas: Set["_ClusterDatabase.Replica"] = set()
39-
for address in cluster_database_mgr.database_mgrs():
40-
core_database_mgr = cluster_database_mgr.database_mgrs()[address]
44+
cluster_db_mgr = client.databases()
45+
for address in cluster_db_mgr.database_mgrs():
46+
core_database_mgr = cluster_db_mgr.database_mgrs()[address]
4147
self._databases[address] = _CoreDatabase(core_database_mgr.stub(), name=database)
4248

4349
@staticmethod
44-
def of(proto_db: cluster_database_proto.ClusterDatabase, cluster_database_mgr: "_ClusterDatabaseManager") -> "_ClusterDatabase":
50+
def of(proto_db: cluster_database_proto.ClusterDatabase, client: "_ClusterClient") -> "_ClusterDatabase":
4551
assert proto_db.replicas
4652
database: str = proto_db.name
47-
database_cluster_rpc = _ClusterDatabase(database, cluster_database_mgr)
53+
database_cluster_rpc = _ClusterDatabase(database, client)
4854
for proto_replica in proto_db.replicas:
4955
database_cluster_rpc.replicas().add(_ClusterDatabase.Replica.of(proto_replica, database_cluster_rpc))
5056
print("Discovered database cluster: %s" % database_cluster_rpc)
@@ -57,9 +63,8 @@ def schema(self) -> str:
5763
return next(iter(self._databases.values())).schema()
5864

5965
def delete(self) -> None:
60-
for address in self._databases:
61-
if self._database_mgr.database_mgrs()[address].contains(self._name):
62-
self._databases[address].delete()
66+
delete_db_task = _DeleteDatabaseFailsafeTask(self._client, self._name, self._databases)
67+
delete_db_task.run_primary_replica()
6368

6469
def replicas(self):
6570
return self._replicas
@@ -142,3 +147,99 @@ def __hash__(self):
142147

143148
def __str__(self):
144149
return "%s/%s" % (self._address, self._database)
150+
151+
152+
# This class has to live here because of circular class creation between ClusterDatabase and FailsafeTask
153+
class _FailsafeTask(ABC):
154+
155+
PRIMARY_REPLICA_TASK_MAX_RETRIES = 10
156+
FETCH_REPLICAS_MAX_RETRIES = 10
157+
WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2
158+
159+
def __init__(self, client: "_ClusterClient", database: str):
160+
self.client = client
161+
self.database = database
162+
163+
@abstractmethod
164+
def run(self, replica: "_ClusterDatabase.Replica"):
165+
pass
166+
167+
def rerun(self, replica: "_ClusterDatabase.Replica"):
168+
return self.run(replica)
169+
170+
def run_primary_replica(self):
171+
if self.database not in self.client.database_by_name() or not self.client.database_by_name()[self.database].primary_replica():
172+
self._seek_primary_replica()
173+
replica = self.client.database_by_name()[self.database].primary_replica()
174+
retries = 0
175+
while True:
176+
try:
177+
return self.run(replica) if retries == 0 else self.rerun(replica)
178+
except TypeDBClientException as e:
179+
if e.error_message in [CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT]:
180+
print("Unable to open a session or transaction, retrying in 2s... %s" % str(e))
181+
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
182+
replica = self._seek_primary_replica()
183+
else:
184+
raise e
185+
retries += 1
186+
if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES:
187+
raise self._cluster_not_available_exception()
188+
189+
def run_any_replica(self):
190+
if self.database in self.client.database_by_name():
191+
cluster_database = self.client.database_by_name()[self.database]
192+
else:
193+
cluster_database = self._fetch_database_replicas()
194+
195+
replicas = [cluster_database.preferred_replica()] + [replica for replica in cluster_database.replicas() if not replica.is_preferred()]
196+
retries = 0
197+
for replica in replicas:
198+
try:
199+
return self.run(replica) if retries == 0 else self.rerun(replica)
200+
except TypeDBClientException as e:
201+
if e.error_message is UNABLE_TO_CONNECT:
202+
print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e)))
203+
else:
204+
raise e
205+
retries += 1
206+
raise self._cluster_not_available_exception()
207+
208+
def _seek_primary_replica(self) -> "_ClusterDatabase.Replica":
209+
retries = 0
210+
while retries < self.FETCH_REPLICAS_MAX_RETRIES:
211+
cluster_database = self._fetch_database_replicas()
212+
if cluster_database.primary_replica():
213+
return cluster_database.primary_replica()
214+
else:
215+
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
216+
retries += 1
217+
raise self._cluster_not_available_exception()
218+
219+
def _fetch_database_replicas(self) -> "_ClusterDatabase":
220+
for server_address in self.client.cluster_members():
221+
try:
222+
print("Fetching replica info from %s" % server_address)
223+
res = self.client.stub(server_address).databases_get(cluster_database_manager_get_req(self.database))
224+
cluster_database = _ClusterDatabase.of(res.database, self.client)
225+
self.client.database_by_name()[self.database] = cluster_database
226+
return cluster_database
227+
except TypeDBClientException as e:
228+
if e.error_message is UNABLE_TO_CONNECT:
229+
print("Unable to fetch replica info for database '%s' from %s. Attempting next address. %s" % (self.database, server_address, str(e)))
230+
else:
231+
raise e
232+
raise self._cluster_not_available_exception()
233+
234+
def _cluster_not_available_exception(self) -> TypeDBClientException:
235+
return TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, str([str(addr) for addr in self.client.cluster_members()]))
236+
237+
238+
class _DeleteDatabaseFailsafeTask(_FailsafeTask):
239+
240+
def __init__(self, client: "_ClusterClient", database: str, databases: Dict[str, _CoreDatabase]):
241+
super().__init__(client, database)
242+
self.databases = databases
243+
244+
def run(self, replica: _ClusterDatabase.Replica):
245+
self.databases.get(replica.address()).delete()

typedb/cluster/database_manager.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
# specific language governing permissions and limitations
1919
# under the License.
2020
#
21-
from typing import Dict, List, TYPE_CHECKING
21+
from typing import Dict, List, TYPE_CHECKING, Callable, TypeVar
2222

2323
from typedb.api.database import ClusterDatabaseManager
24-
from typedb.cluster.database import _ClusterDatabase
25-
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED
24+
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask
25+
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED, CLUSTER_REPLICA_NOT_PRIMARY, \
26+
DB_DOES_NOT_EXIST
2627
from typedb.common.rpc.request_builder import cluster_database_manager_get_req, cluster_database_manager_all_req
28+
from typedb.common.rpc.stub import TypeDBClusterStub
2729
from typedb.core.database_manager import _CoreDatabaseManager
2830

31+
T = TypeVar("T")
32+
2933
if TYPE_CHECKING:
3034
from typedb.cluster.client import _ClusterClient
3135

@@ -37,38 +41,48 @@ def __init__(self, client: "_ClusterClient"):
3741
self._database_mgrs: Dict[str, _CoreDatabaseManager] = {addr: client.databases() for (addr, client) in client.core_clients().items()}
3842

3943
def contains(self, name: str) -> bool:
40-
errors = []
41-
for address in self._database_mgrs:
42-
try:
43-
return self._database_mgrs[address].contains(name)
44-
except TypeDBClientException as e:
45-
errors.append("- %s: %s\n" % (address, e))
46-
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))
44+
return self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.contains(name))
4745

4846
def create(self, name: str) -> None:
49-
for database_manager in self._database_mgrs.values():
50-
if not database_manager.contains(name):
51-
database_manager.create(name)
47+
self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.create(name))
5248

5349
def get(self, name: str) -> _ClusterDatabase:
54-
errors = []
55-
for address in self._database_mgrs:
56-
try:
57-
res = self._client.stub(address).databases_get(cluster_database_manager_get_req(name))
58-
return _ClusterDatabase.of(res.database, self)
59-
except TypeDBClientException as e:
60-
errors.append("- %s: %s\n" % (address, e))
61-
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))
50+
return self._failsafe_task(name, lambda stub, core_db_mgr: self._get_database_task(name, stub))
51+
52+
def _get_database_task(self, name: str, stub: TypeDBClusterStub):
53+
if self.contains(name):
54+
res = stub.databases_get(cluster_database_manager_get_req(name))
55+
return _ClusterDatabase.of(res.database, self._client)
56+
raise TypeDBClientException.of(DB_DOES_NOT_EXIST, name)
6257

6358
def all(self) -> List[_ClusterDatabase]:
6459
errors = []
6560
for address in self._database_mgrs:
6661
try:
6762
res = self._client.stub(address).databases_all(cluster_database_manager_all_req())
68-
return [_ClusterDatabase.of(db, self) for db in res.databases]
63+
return [_ClusterDatabase.of(db, self._client) for db in res.databases]
6964
except TypeDBClientException as e:
7065
errors.append("- %s: %s\n" % (address, e))
7166
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))
7267

7368
def database_mgrs(self) -> Dict[str, _CoreDatabaseManager]:
7469
return self._database_mgrs
70+
71+
def _failsafe_task(self, name: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]):
72+
failsafe_task = _DatabaseManagerFailsafeTask(self._client, name, task)
73+
try:
74+
return failsafe_task.run_any_replica()
75+
except TypeDBClientException as e:
76+
if e.error_message == CLUSTER_REPLICA_NOT_PRIMARY:
77+
return failsafe_task.run_primary_replica()
78+
raise e
79+
80+
81+
class _DatabaseManagerFailsafeTask(_FailsafeTask):
82+
83+
def __init__(self, client: "_ClusterClient", database: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]):
84+
super().__init__(client, database)
85+
self.task = task
86+
87+
def run(self, replica: _ClusterDatabase.Replica) -> T:
88+
return self.task(self.client.stub(replica.address()), self.client.core_client(replica.address()).databases())

0 commit comments

Comments
 (0)