@@ -24,6 +24,7 @@ import {alloc} from './buf';
2424import { Node , Path , PathSegment , Relationship , UnboundRelationship } from '../graph-types' ;
2525import { newError } from './../error' ;
2626import ChannelConfig from './ch-config' ;
27+ import StreamObserver from './stream-observer' ;
2728
2829let Channel ;
2930if ( NodeChannel . available ) {
@@ -57,7 +58,7 @@ UNBOUND_RELATIONSHIP = 0x72,
5758PATH = 0x50 ,
5859//sent before version negotiation
5960MAGIC_PREAMBLE = 0x6060B017 ,
60- DEBUG = true ;
61+ DEBUG = false ;
6162
6263let URLREGEX = new RegExp ( [
6364 "([^/]+//)?" , // scheme
@@ -272,7 +273,7 @@ class Connection {
272273 * failing, and the connection getting ejected from the session pool.
273274 *
274275 * @param err an error object, forwarded to all current and future subscribers
275- * @private
276+ * @protected
276277 */
277278 _handleFatalError ( err ) {
278279 this . _isBroken = true ;
@@ -288,21 +289,13 @@ class Connection {
288289 }
289290 }
290291
291- /**
292- * Mark this connection as failed because processing of INIT message failed and server will close the connection.
293- * Initialization failure is a fatal error for the connection.
294- * @param {Neo4jError } error the initialization error.
295- * @param {StreamObserver } initObserver the initialization observer that noticed the failure.
296- * @protected
297- */
298- _initializationFailed ( error , initObserver ) {
299- if ( this . _currentObserver === initObserver ) {
300- this . _currentObserver = null ; // init observer detected the failure and should not be notified again
292+ _handleMessage ( msg ) {
293+ if ( this . _isBroken ) {
294+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
295+ // with the fatal error. all future observers will fail with same fatal error.
296+ return ;
301297 }
302- this . _handleFatalError ( error ) ;
303- }
304298
305- _handleMessage ( msg ) {
306299 const payload = msg . fields [ 0 ] ;
307300
308301 switch ( msg . signature ) {
@@ -315,7 +308,7 @@ class Connection {
315308 try {
316309 this . _currentObserver . onCompleted ( payload ) ;
317310 } finally {
318- this . _currentObserver = this . _pendingObservers . shift ( ) ;
311+ this . _updateCurrentObserver ( ) ;
319312 }
320313 break ;
321314 case FAILURE :
@@ -324,7 +317,7 @@ class Connection {
324317 this . _currentFailure = newError ( payload . message , payload . code ) ;
325318 this . _currentObserver . onError ( this . _currentFailure ) ;
326319 } finally {
327- this . _currentObserver = this . _pendingObservers . shift ( ) ;
320+ this . _updateCurrentObserver ( ) ;
328321 // Things are now broken. Pending observers will get FAILURE messages routed until
329322 // We are done handling this failure.
330323 if ( ! this . _isHandlingFailure ) {
@@ -354,7 +347,7 @@ class Connection {
354347 else if ( this . _currentObserver . onError )
355348 this . _currentObserver . onError ( payload ) ;
356349 } finally {
357- this . _currentObserver = this . _pendingObservers . shift ( ) ;
350+ this . _updateCurrentObserver ( ) ;
358351 }
359352 break ;
360353 default :
@@ -452,6 +445,14 @@ class Connection {
452445 }
453446 }
454447
448+ /**
449+ * Pop next pending observer form the list of observers and make it current observer.
450+ * @protected
451+ */
452+ _updateCurrentObserver ( ) {
453+ this . _currentObserver = this . _pendingObservers . shift ( ) ;
454+ }
455+
455456 /**
456457 * Synchronize - flush all queued outgoing messages and route their responses
457458 * to their respective handlers.
@@ -509,14 +510,15 @@ function connect(url, config = {}, connectionErrorCode = null) {
509510 * closed by the server if processing of INIT message fails so this observer will handle initialization failure
510511 * as a fatal error.
511512 */
512- class InitObserver {
513+ class InitObserver extends StreamObserver {
513514
514515 /**
515516 * @constructor
516517 * @param {Connection } connection the connection used to send INIT message.
517518 * @param {StreamObserver } originalObserver the observer to wrap and delegate calls to.
518519 */
519520 constructor ( connection , originalObserver ) {
521+ super ( ) ;
520522 this . _connection = connection ;
521523 this . _originalObserver = originalObserver || NO_OP_OBSERVER ;
522524 }
@@ -526,10 +528,11 @@ class InitObserver {
526528 }
527529
528530 onError ( error ) {
531+ this . _connection . _updateCurrentObserver ( ) ; // make sure this same observer will not be called again
529532 try {
530533 this . _originalObserver . onError ( error ) ;
531534 } finally {
532- this . _connection . _initializationFailed ( error , this ) ;
535+ this . _connection . _handleFatalError ( error ) ;
533536 }
534537 }
535538
0 commit comments