@@ -22,7 +22,7 @@ import NodeChannel from './ch-node';
2222import { Chunker , Dechunker } from './chunking' ;
2323import packStreamUtil from './packstream-util' ;
2424import { alloc } from './buf' ;
25- import { newError } from './../error' ;
25+ import { newError , PROTOCOL_ERROR } from './../error' ;
2626import ChannelConfig from './ch-config' ;
2727import urlUtil from './url-util' ;
2828import StreamObserver from './stream-observer' ;
@@ -120,7 +120,7 @@ class Connection {
120120 this . _packer = packStreamUtil . createLatestPacker ( this . _chunker ) ;
121121 this . _unpacker = packStreamUtil . createLatestUnpacker ( disableLosslessIntegers ) ;
122122
123- this . _isHandlingFailure = false ;
123+ this . _ackFailureMuted = false ;
124124 this . _currentFailure = null ;
125125
126126 this . _state = new ConnectionState ( this ) ;
@@ -241,25 +241,8 @@ class Connection {
241241 this . _currentObserver . onError ( this . _currentFailure ) ;
242242 } finally {
243243 this . _updateCurrentObserver ( ) ;
244- // Things are now broken. Pending observers will get FAILURE messages routed until
245- // We are done handling this failure.
246- if ( ! this . _isHandlingFailure ) {
247- this . _isHandlingFailure = true ;
248-
249- // isHandlingFailure was false, meaning this is the first failure message
250- // we see from this failure. We may see several others, one for each message
251- // we had "optimistically" already sent after whatever it was that failed.
252- // We only want to and need to ACK the first one, which is why we are tracking
253- // this _isHandlingFailure thing.
254- this . _ackFailure ( {
255- onNext : NO_OP ,
256- onError : NO_OP ,
257- onCompleted : ( ) => {
258- this . _isHandlingFailure = false ;
259- this . _currentFailure = null ;
260- }
261- } ) ;
262- }
244+ // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
245+ this . _ackFailureIfNeeded ( ) ;
263246 }
264247 break ;
265248 case IGNORED :
@@ -268,7 +251,7 @@ class Connection {
268251 if ( this . _currentFailure && this . _currentObserver . onError )
269252 this . _currentObserver . onError ( this . _currentFailure ) ;
270253 else if ( this . _currentObserver . onError )
271- this . _currentObserver . onError ( payload ) ;
254+ this . _currentObserver . onError ( newError ( 'Ignored either because of an error or RESET' ) ) ;
272255 } finally {
273256 this . _updateCurrentObserver ( ) ;
274257 }
@@ -282,80 +265,122 @@ class Connection {
282265 initialize ( clientName , token , observer ) {
283266 log ( "C" , "INIT" , clientName , token ) ;
284267 const initObserver = this . _state . wrap ( observer ) ;
285- this . _queueObserver ( initObserver ) ;
286- this . _packer . packStruct ( INIT , [ this . _packable ( clientName ) , this . _packable ( token ) ] ,
287- ( err ) => this . _handleFatalError ( err ) ) ;
288- this . _chunker . messageBoundary ( ) ;
289- this . sync ( ) ;
268+ const queued = this . _queueObserver ( initObserver ) ;
269+ if ( queued ) {
270+ this . _packer . packStruct ( INIT , [ this . _packable ( clientName ) , this . _packable ( token ) ] ,
271+ ( err ) => this . _handleFatalError ( err ) ) ;
272+ this . _chunker . messageBoundary ( ) ;
273+ this . sync ( ) ;
274+ }
290275 }
291276
292277 /** Queue a RUN-message to be sent to the database */
293278 run ( statement , params , observer ) {
294279 log ( "C" , "RUN" , statement , params ) ;
295- this . _queueObserver ( observer ) ;
296- this . _packer . packStruct ( RUN , [ this . _packable ( statement ) , this . _packable ( params ) ] ,
297- ( err ) => this . _handleFatalError ( err ) ) ;
298- this . _chunker . messageBoundary ( ) ;
280+ const queued = this . _queueObserver ( observer ) ;
281+ if ( queued ) {
282+ this . _packer . packStruct ( RUN , [ this . _packable ( statement ) , this . _packable ( params ) ] ,
283+ ( err ) => this . _handleFatalError ( err ) ) ;
284+ this . _chunker . messageBoundary ( ) ;
285+ }
299286 }
300287
301288 /** Queue a PULL_ALL-message to be sent to the database */
302289 pullAll ( observer ) {
303290 log ( "C" , "PULL_ALL" ) ;
304- this . _queueObserver ( observer ) ;
305- this . _packer . packStruct ( PULL_ALL , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
306- this . _chunker . messageBoundary ( ) ;
291+ const queued = this . _queueObserver ( observer ) ;
292+ if ( queued ) {
293+ this . _packer . packStruct ( PULL_ALL , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
294+ this . _chunker . messageBoundary ( ) ;
295+ }
307296 }
308297
309298 /** Queue a DISCARD_ALL-message to be sent to the database */
310299 discardAll ( observer ) {
311300 log ( "C" , "DISCARD_ALL" ) ;
312- this . _queueObserver ( observer ) ;
313- this . _packer . packStruct ( DISCARD_ALL , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
314- this . _chunker . messageBoundary ( ) ;
301+ const queued = this . _queueObserver ( observer ) ;
302+ if ( queued ) {
303+ this . _packer . packStruct ( DISCARD_ALL , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
304+ this . _chunker . messageBoundary ( ) ;
305+ }
315306 }
316307
317- /** Queue a RESET-message to be sent to the database. Mutes failure handling. */
318- resetAsync ( observer ) {
319- log ( "C" , "RESET_ASYNC" ) ;
320- this . _isHandlingFailure = true ;
321- let self = this ;
322- let wrappedObs = {
323- onNext : observer ? observer . onNext : NO_OP ,
324- onError : observer ? observer . onError : NO_OP ,
325- onCompleted : ( ) => {
326- self . _isHandlingFailure = false ;
327- if ( observer ) {
328- observer . onCompleted ( ) ;
308+ /**
309+ * Send a RESET-message to the database. Mutes failure handling.
310+ * Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
311+ * @return {Promise<void> } promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
312+ */
313+ resetAndFlush ( ) {
314+ log ( 'C' , 'RESET' ) ;
315+ this . _ackFailureMuted = true ;
316+
317+ return new Promise ( ( resolve , reject ) => {
318+ const observer = {
319+ onNext : record => {
320+ const neo4jError = this . _handleProtocolError ( 'Received RECORD as a response for RESET: ' + JSON . stringify ( record ) ) ;
321+ reject ( neo4jError ) ;
322+ } ,
323+ onError : error => {
324+ if ( this . _isBroken ) {
325+ // handling a fatal error, no need to raise a protocol violation
326+ reject ( error ) ;
327+ } else {
328+ const neo4jError = this . _handleProtocolError ( 'Received FAILURE as a response for RESET: ' + error ) ;
329+ reject ( neo4jError ) ;
330+ }
331+ } ,
332+ onCompleted : ( ) => {
333+ this . _ackFailureMuted = false ;
334+ resolve ( ) ;
329335 }
336+ } ;
337+ const queued = this . _queueObserver ( observer ) ;
338+ if ( queued ) {
339+ this . _packer . packStruct ( RESET , [ ] , err => this . _handleFatalError ( err ) ) ;
340+ this . _chunker . messageBoundary ( ) ;
341+ this . sync ( ) ;
330342 }
331- } ;
332- this . _queueObserver ( wrappedObs ) ;
333- this . _packer . packStruct ( RESET , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
334- this . _chunker . messageBoundary ( ) ;
343+ } ) ;
335344 }
336345
337- /** Queue a RESET-message to be sent to the database */
338- reset ( observer ) {
339- log ( 'C' , 'RESET' ) ;
340- this . _queueObserver ( observer ) ;
341- this . _packer . packStruct ( RESET , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
342- this . _chunker . messageBoundary ( ) ;
343- }
346+ _ackFailureIfNeeded ( ) {
347+ if ( this . _ackFailureMuted ) {
348+ return ;
349+ }
344350
345- /** Queue a ACK_FAILURE-message to be sent to the database */
346- _ackFailure ( observer ) {
347- log ( "C" , "ACK_FAILURE" ) ;
348- this . _queueObserver ( observer ) ;
349- this . _packer . packStruct ( ACK_FAILURE , [ ] , ( err ) => this . _handleFatalError ( err ) ) ;
350- this . _chunker . messageBoundary ( ) ;
351+ log ( 'C' , 'ACK_FAILURE' ) ;
352+
353+ const observer = {
354+ onNext : record => {
355+ this . _handleProtocolError ( 'Received RECORD as a response for ACK_FAILURE: ' + JSON . stringify ( record ) ) ;
356+ } ,
357+ onError : error => {
358+ if ( ! this . _isBroken && ! this . _ackFailureMuted ) {
359+ // not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
360+ this . _handleProtocolError ( 'Received FAILURE as a response for ACK_FAILURE: ' + error ) ;
361+ } else {
362+ this . _currentFailure = null ;
363+ }
364+ } ,
365+ onCompleted : ( ) => {
366+ this . _currentFailure = null ;
367+ }
368+ } ;
369+
370+ const queued = this . _queueObserver ( observer ) ;
371+ if ( queued ) {
372+ this . _packer . packStruct ( ACK_FAILURE , [ ] , err => this . _handleFatalError ( err ) ) ;
373+ this . _chunker . messageBoundary ( ) ;
374+ this . sync ( ) ;
375+ }
351376 }
352377
353378 _queueObserver ( observer ) {
354379 if ( this . _isBroken ) {
355380 if ( observer && observer . onError ) {
356381 observer . onError ( this . _error ) ;
357382 }
358- return ;
383+ return false ;
359384 }
360385 observer = observer || NO_OP_OBSERVER ;
361386 observer . onCompleted = observer . onCompleted || NO_OP ;
@@ -366,6 +391,7 @@ class Connection {
366391 } else {
367392 this . _pendingObservers . push ( observer ) ;
368393 }
394+ return true ;
369395 }
370396
371397 /**
@@ -427,6 +453,15 @@ class Connection {
427453 }
428454 }
429455 }
456+
457+ _handleProtocolError ( message ) {
458+ this . _ackFailureMuted = false ;
459+ this . _currentFailure = null ;
460+ this . _updateCurrentObserver ( ) ;
461+ const error = newError ( message , PROTOCOL_ERROR ) ;
462+ this . _handleFatalError ( error ) ;
463+ return error ;
464+ }
430465}
431466
432467class ConnectionState {
0 commit comments