Skip to content

Commit db8bcc3

Browse files
committed
1. Add _opt_reconnect to connection class for better reconnection (handle situation, when server closes connection)
2. Add "not_presented" and "presented" flags into Insert operation
1 parent e594774 commit db8bcc3

File tree

2 files changed

+39
-27
lines changed

2 files changed

+39
-27
lines changed

src/tarantool/connection.py

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,9 @@ def _send_request_wo_reconnect(self, request, space_name = None, field_defs = No
127127

128128
# Repeat request in a loop if the server returns completion_status == 1 (try again)
129129
for attempt in xrange(RETRY_MAX_ATTEMPTS): # pylint: disable=W0612
130-
try:
131-
self._socket.sendall(bytes(request))
132-
header, body = self._read_response()
133-
response = Response(self, header, body, space_name, field_defs, default_type)
134-
except socket.error as e:
135-
raise NetworkError(e)
130+
self._socket.sendall(bytes(request))
131+
header, body = self._read_response()
132+
response = Response(self, header, body, space_name, field_defs, default_type)
136133

137134
if response.completion_status != 1:
138135
return response
@@ -141,6 +138,24 @@ def _send_request_wo_reconnect(self, request, space_name = None, field_defs = No
141138
# Raise an error if the maximum number of attempts have been made
142139
raise DatabaseError(response.return_code, response.return_message)
143140

141+
def _opt_reconnect(self):
142+
attempt = 0
143+
while True:
144+
try:
145+
if not self._socket or not self._socket.recv(0, socket.MSG_DONTWAIT):
146+
time.sleep(self.reconnect_delay)
147+
self.connect()
148+
except socket.error as e:
149+
if e.errno == errno.EAGAIN:
150+
break
151+
else:
152+
time.slelep(self.reconnect_delay)
153+
self.connect()
154+
if attempt == self.reconnect_max_attempts:
155+
raise
156+
attempt += 1
157+
warn("%s : Reconnect attempt %d of %d"%(e.message, attempt, self.reconnect_max_attempts), NetworkWarning)
158+
144159

145160
def _send_request(self, request, space_name = None, field_defs = None, default_type = None):
146161
'''\
@@ -156,21 +171,8 @@ def _send_request(self, request, space_name = None, field_defs = None, default_t
156171

157172
connected = True
158173
attempt = 1
159-
while True:
160-
try:
161-
if not connected:
162-
time.sleep(self.reconnect_delay)
163-
self.connect()
164-
connected = True
165-
warn("Successfully reconnected", NetworkWarning)
166-
response = self._send_request_wo_reconnect(request, space_name, field_defs, default_type)
167-
break
168-
except NetworkError as e:
169-
if attempt > self.reconnect_max_attempts:
170-
raise
171-
warn("%s : Reconnect attempt %d of %d"%(e.message, attempt, self.reconnect_max_attempts), NetworkWarning)
172-
attempt += 1
173-
connected = False
174+
self._opt_reconnect()
175+
response = self._send_request_wo_reconnect(request, space_name, field_defs, default_type)
174176

175177
return response
176178

@@ -213,7 +215,7 @@ def call(self, func_name, *args, **kwargs):
213215
return response
214216

215217

216-
def insert(self, space_name, values, return_tuple=False):
218+
def insert(self, space_name, values, return_tuple=False, not_presented=False, presented=False):
217219
'''\
218220
Execute INSERT request.
219221
Insert single record into a space `space_name`.
@@ -222,14 +224,19 @@ def insert(self, space_name, values, return_tuple=False):
222224
:type space_name: int or str
223225
:param values: record to be inserted. The tuple must contain only scalar (integer or strings) values
224226
:type values: tuple
227+
225228
:param return_tuple: True indicates that it is required to return the inserted tuple back
226229
:type return_tuple: bool
227-
230+
:param not_presented: True indicates that there's must be no tuple with same primary key
231+
:type not_presented: bool
232+
:param presented: True indicates that there's must be tuple with same primary key
233+
:type presented: bool
234+
228235
:rtype: `Response` instance
229236
'''
230237
assert isinstance(values, tuple)
231238

232-
request = RequestInsert(self, space_name, values, return_tuple)
239+
request = RequestInsert(self, space_name, values, return_tuple, not_presented, presented)
233240
return self._send_request(request, space_name)
234241

235242

@@ -277,20 +284,23 @@ def update(self, space_name, key, op_list, return_tuple=False):
277284
return self._send_request(request, space_name)
278285

279286

280-
def ping(self):
287+
def ping(self, notime = False):
281288
'''\
282289
Execute PING request.
283290
Send empty request and receive empty response from server.
284291
285292
:return: response time in seconds
286293
:rtype: float
287294
'''
295+
self._opt_reconnect()
288296
t0 = time.time()
289297
self._socket.sendall(struct_LLL.pack(0xff00, 0, 0))
290298
request_type, body_length, request_id = struct_LLL.unpack(self._socket.recv(12)) # pylint: disable=W0612
291299
t1 = time.time()
292300
assert request_type == 0xff00
293301
assert body_length == 0
302+
if no_time:
303+
return "Success"
294304
return t1 - t0
295305

296306

src/tarantool/request.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,15 @@ class RequestInsert(Request):
144144
'''
145145
request_type = REQUEST_TYPE_INSERT
146146

147-
def __init__(self, conn, space_name, values, return_tuple): # pylint: disable=W0231
147+
def __init__(self, conn, space_name, values, return_tuple, not_presented, presented): # pylint: disable=W0231
148148
'''\
149149
'''
150150
super(RequestInsert, self).__init__(conn)
151151

152152
assert isinstance(values, (tuple, list))
153-
flags = 1 if return_tuple else 0
153+
flags = 1 if return_tuple else 0
154+
flags += 2 if not_presented else 0
155+
flags += 4 if presented else 0
154156

155157
space_no = self.conn.schema.space_no(space_name)
156158
request_body = \

0 commit comments

Comments
 (0)