Skip to content

Commit cb93bc1

Browse files
committed
Add basiс support for JOIN and SUBSCRIBE commands
1 parent 0ac9a67 commit cb93bc1

File tree

4 files changed

+97
-10
lines changed

4 files changed

+97
-10
lines changed

tarantool/connection.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
RequestDelete,
2828
RequestEval,
2929
RequestInsert,
30+
RequestJoin,
3031
RequestReplace,
3132
RequestPing,
3233
RequestSelect,
34+
RequestSubscribe,
3335
RequestUpdate,
3436
RequestAuthenticate)
3537

@@ -39,6 +41,8 @@
3941
RECONNECT_MAX_ATTEMPTS,
4042
RECONNECT_DELAY,
4143
RETRY_MAX_ATTEMPTS,
44+
REQUEST_TYPE_OK,
45+
REQUEST_TYPE_ERROR,
4246
IPROTO_GREETING_SIZE)
4347
from tarantool.error import (
4448
NetworkError,
@@ -332,6 +336,27 @@ def authenticate(self, user, password):
332336
self.password)
333337
return self._send_request_wo_reconnect(request)
334338

339+
def join(self, server_uuid):
340+
request = RequestJoin(self, server_uuid)
341+
resp = self._send_request(request)
342+
while True:
343+
yield resp
344+
if resp.code == REQUEST_TYPE_OK or \
345+
resp.code >= REQUEST_TYPE_ERROR:
346+
return
347+
resp = Response(self, self._read_response())
348+
self.close() # close connection after JOIN
349+
350+
def subscribe(self, cluster_uuid, server_uuid, vclock = {}):
351+
request = RequestSubscribe(self, cluster_uuid, server_uuid, vclock)
352+
resp = self._send_request(request)
353+
while True:
354+
yield resp
355+
if resp.code >= REQUEST_TYPE_ERROR:
356+
return
357+
resp = Response(self, self._read_response())
358+
self.close() # close connection after SUBSCRIBE
359+
335360
def insert(self, space_name, values):
336361
'''
337362
Execute INSERT request.

tarantool/const.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@
1212
IPROTO_TUPLE = 0x21
1313
IPROTO_FUNCTION_NAME = 0x22
1414
IPROTO_USER_NAME = 0x23
15+
IPROTO_SERVER_UUID = 0x24
16+
IPROTO_CLUSTER_UUID = 0x25
17+
IPROTO_VCLOCK = 0x26
1518
IPROTO_EXPR = 0x27
1619
IPROTO_DATA = 0x30
1720
IPROTO_ERROR = 0x31
1821

1922
IPROTO_GREETING_SIZE = 128
2023

2124
REQUEST_TYPE_OK = 0
22-
REQUEST_TYPE_PING = 64
2325
REQUEST_TYPE_SELECT = 1
2426
REQUEST_TYPE_INSERT = 2
2527
REQUEST_TYPE_REPLACE = 3
@@ -28,6 +30,9 @@
2830
REQUEST_TYPE_CALL = 6
2931
REQUEST_TYPE_AUTHENTICATE = 7
3032
REQUEST_TYPE_EVAL = 8
33+
REQUEST_TYPE_PING = 64
34+
REQUEST_TYPE_JOIN = 65
35+
REQUEST_TYPE_SUBSCRIBE = 66
3136
REQUEST_TYPE_ERROR = 1 << 15
3237

3338

tarantool/request.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
IPROTO_TUPLE,
2020
IPROTO_FUNCTION_NAME,
2121
IPROTO_ITERATOR,
22+
IPROTO_SERVER_UUID,
23+
IPROTO_CLUSTER_UUID,
24+
IPROTO_VCLOCK,
2225
IPROTO_EXPR,
2326
REQUEST_TYPE_PING,
2427
REQUEST_TYPE_SELECT,
@@ -28,7 +31,9 @@
2831
REQUEST_TYPE_UPDATE,
2932
REQUEST_TYPE_CALL,
3033
REQUEST_TYPE_EVAL,
31-
REQUEST_TYPE_AUTHENTICATE
34+
REQUEST_TYPE_AUTHENTICATE,
35+
REQUEST_TYPE_JOIN,
36+
REQUEST_TYPE_SUBSCRIBE
3237
)
3338

3439

@@ -226,3 +231,35 @@ class RequestPing(Request):
226231
def __init__(self, conn):
227232
super(RequestPing, self).__init__(conn)
228233
self._bytes = self.header(0)
234+
235+
class RequestJoin(Request):
236+
237+
'''
238+
Represents JOIN request
239+
'''
240+
request_type = REQUEST_TYPE_JOIN
241+
242+
# pylint: disable=W0231
243+
def __init__(self, conn, server_uuid):
244+
super(RequestJoin, self).__init__(conn)
245+
request_body = msgpack.dumps({ IPROTO_SERVER_UUID: server_uuid })
246+
self._bytes = self.header(len(request_body)) + request_body
247+
248+
class RequestSubscribe(Request):
249+
250+
'''
251+
Represents SUBSCRIBE request
252+
'''
253+
request_type = REQUEST_TYPE_SUBSCRIBE
254+
255+
# pylint: disable=W0231
256+
def __init__(self, conn, cluster_uuid, server_uuid, vclock):
257+
super(RequestSubscribe, self).__init__(conn)
258+
assert isinstance(vclock, dict)
259+
260+
request_body = msgpack.dumps({
261+
IPROTO_CLUSTER_UUID: cluster_uuid,
262+
IPROTO_SERVER_UUID: server_uuid,
263+
IPROTO_VCLOCK: vclock
264+
})
265+
self._bytes = self.header(len(request_body)) + request_body

tarantool/response.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,26 @@ def __init__(self, conn, response):
4848

4949
self._sync = header.get(IPROTO_SYNC, 0)
5050
self.conn = conn
51-
code = header[IPROTO_CODE]
52-
body = None
51+
self._code = header[IPROTO_CODE]
52+
self._body = {}
5353
try:
54-
body = unpacker.unpack()
54+
self._body = unpacker.unpack()
5555
except msgpack.OutOfData:
56-
body = {}
56+
pass
5757

58-
if code == REQUEST_TYPE_OK:
58+
if self._code < REQUEST_TYPE_ERROR:
5959
self._return_code = 0;
6060
self._completion_status = 0
61-
self._data = body.get(IPROTO_DATA, None)
61+
self._data = self._body.get(IPROTO_DATA, None)
6262
# Backward-compatibility
6363
if isinstance(self._data, (list, tuple)):
6464
self.extend(self._data)
6565
else:
6666
self.append(self._data)
6767
else:
6868
# Separate return_code and completion_code
69-
self._return_message = body.get(IPROTO_ERROR, "")
70-
self._return_code = code & (REQUEST_TYPE_ERROR - 1)
69+
self._return_message = self._body.get(IPROTO_ERROR, "")
70+
self._return_code = self._code & (REQUEST_TYPE_ERROR - 1)
7171
self._completion_status = 2
7272
self._data = None
7373
if self.conn.error:
@@ -99,6 +99,26 @@ def rowcount(self):
9999
'''
100100
return len(self)
101101

102+
@property
103+
def body(self):
104+
'''\
105+
:type: dict
106+
107+
Required field in the server response.
108+
Contains raw response body.
109+
'''
110+
return self._body
111+
112+
@property
113+
def code(self):
114+
'''\
115+
:type: int
116+
117+
Required field in the server response.
118+
Contains response type id.
119+
'''
120+
return self._code
121+
102122
@property
103123
def return_code(self):
104124
'''\

0 commit comments

Comments
 (0)