@@ -16,18 +16,13 @@ var states = {
1616 CONNECTING : 0 ,
1717 CONNECTED : 1 ,
1818 AWAITING : 2 ,
19- REQUESTING : 3 ,
20- GETTING : 4 ,
21- INITED : 5 ,
19+ DESTROYED : 5 ,
2220 DISONECTED : 6 ,
2321 PREHELLO : 7 ,
2422 AWAITING_LENGTH : 8
2523} ;
2624
27- var commandType = {
28- CONNECT : 0 ,
29- REQUEST : 1
30- } ;
25+ var requestMethods = [ 'select' , 'delete' , 'insert' , 'replace' , 'update' , 'eval' , 'call' ] ;
3126
3227var requestId = {
3328 _id : 0 ,
@@ -43,11 +38,9 @@ var defaultOptions = {
4338 port : '3301' ,
4439 username : null ,
4540 password : null ,
46- timeout : 5000 ,
47- reconnect : true
41+ timeout : 5000
4842} ;
4943
50-
5144function TarantoolConnection ( options ) {
5245 this . socket = new net . Socket ( {
5346 readable : true ,
@@ -57,20 +50,20 @@ function TarantoolConnection (options){
5750 this . emitter = new EventEmitter ( ) ;
5851 this . options = _ . extend ( defaultOptions , options ) ;
5952 this . commandsQueue = [ ] ;
53+ this . awaitingDestroy = false ;
6054 this . socket . on ( 'connect' , this . onConnect . bind ( this ) ) ;
6155 this . socket . on ( 'error' , this . onError . bind ( this ) ) ;
6256 this . socket . on ( 'data' , this . onData . bind ( this ) ) ;
63- this . responseEnded = true ;
6457}
6558
6659TarantoolConnection . prototype . onData = function ( data ) {
6760 switch ( this . state ) {
6861 case states . PREHELLO :
6962 for ( var i = 0 ; i < this . commandsQueue . length ; i ++ )
7063 {
71- if ( this . commandsQueue [ i ] [ 0 ] == commandType . CONNECT )
64+ if ( this . commandsQueue [ i ] [ 0 ] == tarantoolConstants . RequestCode . rqConnect )
7265 {
73- this . commandsQueue [ i ] [ 1 ] ( true ) ;
66+ this . commandsQueue [ i ] [ 1 ] . resolve ( true ) ;
7467 this . commandsQueue . splice ( i , 1 ) ;
7568 i -- ;
7669 }
@@ -143,41 +136,55 @@ TarantoolConnection.prototype._processResponse = function(buffer){
143136 var dataBuffer = Buffer . concat ( [ new Buffer ( [ 0x92 ] ) , buffer ] ) ;
144137 var obj = msgpack . unpack ( dataBuffer ) ;
145138 var reqId = obj [ 0 ] [ 1 ] ;
146- var task = this . commandsQueue . filter ( function ( t ) {
147- return t [ 1 ] == reqId ;
148- } ) [ 0 ] ;
139+ for ( var i = 0 ; i < this . commandsQueue . length ; i ++ )
140+ if ( this . commandsQueue [ i ] [ 1 ] == reqId )
141+ {
142+ var task = this . commandsQueue [ i ] ;
143+ this . commandsQueue . splice ( i , 1 ) ;
144+ break ;
145+ }
149146 var dfd = task [ 2 ] ;
150147 var success = obj [ 0 ] [ 0 ] == 0 ? true : false ;
151148 if ( success )
152149 dfd . resolve ( this . _processResponseBody ( task [ 0 ] , obj [ 1 ] [ tarantoolConstants . KeysCode . data ] ) ) ;
153150 else
154151 dfd . reject ( obj [ 1 ] [ tarantoolConstants . KeysCode . error ] ) ;
152+ if ( this . awaitingDestroy && this . commandsQueue . length == 1 )
153+ {
154+ this . commandsQueue [ 0 ] [ 2 ] . resolve ( true ) ;
155+ this . socket . destroy ( ) ;
156+ }
155157} ;
156158
157159TarantoolConnection . prototype . _processResponseBody = function ( cmd , data ) {
158160 return cmd == tarantoolConstants . RequestCode . rqAuth ? true : data ;
159161} ;
160162
161- TarantoolConnection . prototype . _dropAll = function ( ) {
162-
163- } ;
164-
165163TarantoolConnection . prototype . onConnect = function ( ) {
166164 this . state = states . PREHELLO ;
167165} ;
168166
169167TarantoolConnection . prototype . onError = function ( error ) {
170- for ( var i = 0 ; i < this . commandsQueue . length ; i ++ )
171- this . commandsQueue [ i ] [ 2 ] ( error ) ;
168+ this . _interupt ( ) ;
169+ tihs . _stubMethods ( ) ;
170+ this . socket . destroy ( ) ;
172171 this . commandsQueue = [ ] ;
173172} ;
174173
174+ TarantoolConnection . prototype . _interupt = function ( ) {
175+ for ( var i = 0 ; i < this . commandsQueue . length ; i ++ ) {
176+ var dfd = this . commandsQueue [ i ] [ 0 ] == tarantoolConstants . RequestCode . rqConnect ? this . commandsQueue [ i ] [ 1 ]
177+ : this . commandsQueue [ i ] [ 2 ] ;
178+ dfd . reject ( error ) ;
179+ }
180+ }
181+
175182TarantoolConnection . prototype . connect = function ( ) {
183+ var dfd = vow . defer ( ) ;
176184 this . state = states . CONNECTING ;
177- return new Promise ( function ( resolve , reject ) {
178- this . commandsQueue . push ( [ commandType . CONNECT , resolve , reject ] ) ;
179- this . socket . connect ( { port : this . options . port , host : this . options . host } ) ;
180- } . bind ( this ) ) ;
185+ this . commandsQueue . push ( [ tarantoolConstants . RequestCode . rqConnect , dfd ] ) ;
186+ this . socket . connect ( { port : this . options . port , host : this . options . host } ) ;
187+ return dfd . promise ( ) ;
181188} ;
182189
183190TarantoolConnection . prototype . ping = function ( ) {
@@ -269,7 +276,6 @@ TarantoolConnection.prototype.eval = function(expression, tuple){
269276 return dfd . promise ( ) ;
270277} ;
271278
272-
273279TarantoolConnection . prototype . call = function ( functionName , tuple ) {
274280 var dfd = vow . defer ( ) ;
275281 var reqId = requestId . getId ( ) ;
@@ -285,7 +291,6 @@ TarantoolConnection.prototype.call = function(functionName, tuple){
285291 return dfd . promise ( ) ;
286292} ;
287293
288-
289294TarantoolConnection . prototype . insert = function ( spaceId , tuple ) {
290295 var reqId = requestId . getId ( ) ;
291296 return this . _replaceInsert ( tarantoolConstants . RequestCode . rqInsert , reqId , spaceId , tuple ) ;
@@ -355,14 +360,39 @@ TarantoolConnection.prototype._request = function(header, body){
355360} ;
356361
357362TarantoolConnection . prototype . destroy = function ( interupt ) {
363+ var dfd = vow . defer ( ) ;
358364 if ( interupt )
359365 {
360- this . socket . destroy ( ) ;
366+ this . _interupt ( ) ;
367+ dfd . resolve ( true ) ;
361368 }
362369 else
363370 {
364- this . commandsQueue . push ( 'destroy' ) ;
371+ if ( this . commandsQueue . length )
372+ {
373+ this . commandsQueue . push ( [ tarantoolConstants . RequestCode . rqDestroy , - 1 , dfd ] ) ;
374+ this . awaitingDestroy = true ;
375+ //disable methods
376+ this . _stubMethods ( ) ;
377+ }
378+ else
379+ {
380+ this . socket . destroy ( ) ;
381+ dfd . resolve ( true ) ;
382+ }
365383 }
384+ return dfd . promise ( ) ;
385+ } ;
386+
387+ TarantoolConnection . prototype . _notAvailableMethod = function ( ) {
388+ var dfd = vow . defer ( ) ;
389+ dfd . reject ( 'connection will be destroyed or already destroyed, create another one' ) ;
390+ return dfd . promise ( ) ;
391+ } ;
392+
393+ TarantoolConnection . prototype . _stubMethods = function ( ) {
394+ for ( var i = 0 ; i < requestMethods . length ; i ++ )
395+ this [ requestMethods [ i ] ] = this . _notAvailableMethod ;
366396} ;
367397
368398module . exports = TarantoolConnection ;
0 commit comments