@@ -365,35 +365,49 @@ def authenticate(self, user, password):
365365 self .password )
366366 return self ._send_request_wo_reconnect (request )
367367
368- def join (self , server_uuid ):
368+ def _join_v16 (self , server_uuid ):
369369 request = RequestJoin (self , server_uuid )
370- sync = request . _sync
371- resp = self . _send_request ( request )
370+ self . _socket . sendall ( bytes ( request ))
371+
372372 while True :
373- if self .version_id >= version_id (1 , 7 , 0 ) and resp .code == REQUEST_TYPE_OK :
374- # Send acknowledgement
375- ack = RequestOK (self , sync )
376- self ._socket .sendall (bytes (ack ))
373+ resp = Response (self , self ._read_response ());
377374 yield resp
378375 if resp .code == REQUEST_TYPE_OK or resp .code >= REQUEST_TYPE_ERROR :
379376 return
380- resp = Response (self , self ._read_response ())
381377 self .close () # close connection after JOIN
382378
383- def subscribe (self , cluster_uuid , server_uuid , vclock = {}):
384- # FIXME rudnyh: ^ 'vclock={}'? really? sure?
385- request = RequestSubscribe (self , cluster_uuid , server_uuid , vclock )
386- sync = request ._sync
387- resp = self ._send_request (request )
379+ def _join_v17 (self , server_uuid ):
380+ class JoinState :
381+ Handshake , Initial , Final , Done = range (4 )
382+
383+ request = RequestJoin (self , server_uuid )
384+ self ._socket .sendall (bytes (request ))
385+ state = JoinState .Handshake
388386 while True :
389- if self .version_id >= version_id (1 , 7 , 0 ):
390- # Send acknowledgement
391- ack = RequestOK (self , sync )
392- self ._socket .sendall (bytes (ack ))
387+ resp = Response (self , self ._read_response ())
393388 yield resp
394389 if resp .code >= REQUEST_TYPE_ERROR :
395390 return
391+ elif resp .code == REQUEST_TYPE_OK :
392+ state = state + 1
393+ if state == JoinState .Done :
394+ return
395+
396+ def join (self , server_uuid ):
397+ self ._opt_reconnect ()
398+ if self .version_id < version_id (1 , 7 , 0 ):
399+ return self ._join_v16 (server_uuid )
400+ return self ._join_v17 (server_uuid )
401+
402+ def subscribe (self , cluster_uuid , server_uuid , vclock = None ):
403+ vclock = vclock or {}
404+ request = RequestSubscribe (self , cluster_uuid , server_uuid , vclock )
405+ self ._socket .sendall (bytes (request ))
406+ while True :
396407 resp = Response (self , self ._read_response ())
408+ yield resp
409+ if resp .code >= REQUEST_TYPE_ERROR :
410+ return
397411 self .close () # close connection after SUBSCRIBE
398412
399413 def insert (self , space_name , values ):
0 commit comments