@@ -24,6 +24,7 @@ import {alloc} from './buf';
2424import { Node , Path , PathSegment , Relationship , UnboundRelationship } from '../graph-types' ;
2525import { newError } from './../error' ;
2626import ChannelConfig from './ch-config' ;
27+ import StreamObserver from './stream-observer' ;
2728
2829let Channel ;
2930if ( NodeChannel . available ) {
@@ -272,7 +273,7 @@ class Connection {
272273 * failing, and the connection getting ejected from the session pool.
273274 *
274275 * @param err an error object, forwarded to all current and future subscribers
275- * @private
276+ * @protected
276277 */
277278 _handleFatalError ( err ) {
278279 this . _isBroken = true ;
@@ -289,6 +290,12 @@ class Connection {
289290 }
290291
291292 _handleMessage ( msg ) {
293+ if ( this . _isBroken ) {
294+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
295+ // with the fatal error. all future observers will fail with same fatal error.
296+ return ;
297+ }
298+
292299 const payload = msg . fields [ 0 ] ;
293300
294301 switch ( msg . signature ) {
@@ -301,7 +308,7 @@ class Connection {
301308 try {
302309 this . _currentObserver . onCompleted ( payload ) ;
303310 } finally {
304- this . _currentObserver = this . _pendingObservers . shift ( ) ;
311+ this . _updateCurrentObserver ( ) ;
305312 }
306313 break ;
307314 case FAILURE :
@@ -310,7 +317,7 @@ class Connection {
310317 this . _currentFailure = newError ( payload . message , payload . code ) ;
311318 this . _currentObserver . onError ( this . _currentFailure ) ;
312319 } finally {
313- this . _currentObserver = this . _pendingObservers . shift ( ) ;
320+ this . _updateCurrentObserver ( ) ;
314321 // Things are now broken. Pending observers will get FAILURE messages routed until
315322 // We are done handling this failure.
316323 if ( ! this . _isHandlingFailure ) {
@@ -340,7 +347,7 @@ class Connection {
340347 else if ( this . _currentObserver . onError )
341348 this . _currentObserver . onError ( payload ) ;
342349 } finally {
343- this . _currentObserver = this . _pendingObservers . shift ( ) ;
350+ this . _updateCurrentObserver ( ) ;
344351 }
345352 break ;
346353 default :
@@ -351,7 +358,8 @@ class Connection {
351358 /** Queue an INIT-message to be sent to the database */
352359 initialize ( clientName , token , observer ) {
353360 log ( "C" , "INIT" , clientName , token ) ;
354- this . _queueObserver ( observer ) ;
361+ const initObserver = new InitObserver ( this , observer ) ;
362+ this . _queueObserver ( initObserver ) ;
355363 this . _packer . packStruct ( INIT , [ this . _packable ( clientName ) , this . _packable ( token ) ] ,
356364 ( err ) => this . _handleFatalError ( err ) ) ;
357365 this . _chunker . messageBoundary ( ) ;
@@ -437,6 +445,14 @@ class Connection {
437445 }
438446 }
439447
448+ /**
449+ * Pop next pending observer form the list of observers and make it current observer.
450+ * @protected
451+ */
452+ _updateCurrentObserver ( ) {
453+ this . _currentObserver = this . _pendingObservers . shift ( ) ;
454+ }
455+
440456 /**
441457 * Synchronize - flush all queued outgoing messages and route their responses
442458 * to their respective handlers.
@@ -489,6 +505,42 @@ function connect(url, config = {}, connectionErrorCode = null) {
489505 return new Connection ( new Ch ( channelConfig ) , completeUrl ) ;
490506}
491507
508+ /**
509+ * Observer that wraps user-defined observer for INIT message and handles initialization failures. Connection is
510+ * closed by the server if processing of INIT message fails so this observer will handle initialization failure
511+ * as a fatal error.
512+ */
513+ class InitObserver extends StreamObserver {
514+
515+ /**
516+ * @constructor
517+ * @param {Connection } connection the connection used to send INIT message.
518+ * @param {StreamObserver } originalObserver the observer to wrap and delegate calls to.
519+ */
520+ constructor ( connection , originalObserver ) {
521+ super ( ) ;
522+ this . _connection = connection ;
523+ this . _originalObserver = originalObserver || NO_OP_OBSERVER ;
524+ }
525+
526+ onNext ( record ) {
527+ this . _originalObserver . onNext ( record ) ;
528+ }
529+
530+ onError ( error ) {
531+ this . _connection . _updateCurrentObserver ( ) ; // make sure this same observer will not be called again
532+ try {
533+ this . _originalObserver . onError ( error ) ;
534+ } finally {
535+ this . _connection . _handleFatalError ( error ) ;
536+ }
537+ }
538+
539+ onCompleted ( metaData ) {
540+ this . _originalObserver . onCompleted ( metaData ) ;
541+ }
542+ }
543+
492544export {
493545 connect ,
494546 parseScheme ,
0 commit comments