@@ -254,7 +254,7 @@ class Connection {
254254 * failing, and the connection getting ejected from the session pool.
255255 *
256256 * @param err an error object, forwarded to all current and future subscribers
257- * @private
257+ * @protected
258258 */
259259 _handleFatalError ( err ) {
260260 this . _isBroken = true ;
@@ -271,6 +271,12 @@ class Connection {
271271 }
272272
273273 _handleMessage ( msg ) {
274+ if ( this . _isBroken ) {
275+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
276+ // with the fatal error. all future observers will fail with same fatal error.
277+ return ;
278+ }
279+
274280 const payload = msg . fields [ 0 ] ;
275281
276282 switch ( msg . signature ) {
@@ -283,7 +289,7 @@ class Connection {
283289 try {
284290 this . _currentObserver . onCompleted ( payload ) ;
285291 } finally {
286- this . _currentObserver = this . _pendingObservers . shift ( ) ;
292+ this . _updateCurrentObserver ( ) ;
287293 }
288294 break ;
289295 case FAILURE :
@@ -292,7 +298,7 @@ class Connection {
292298 this . _currentFailure = newError ( payload . message , payload . code ) ;
293299 this . _currentObserver . onError ( this . _currentFailure ) ;
294300 } finally {
295- this . _currentObserver = this . _pendingObservers . shift ( ) ;
301+ this . _updateCurrentObserver ( ) ;
296302 // Things are now broken. Pending observers will get FAILURE messages routed until
297303 // We are done handling this failure.
298304 if ( ! this . _isHandlingFailure ) {
@@ -322,7 +328,7 @@ class Connection {
322328 else if ( this . _currentObserver . onError )
323329 this . _currentObserver . onError ( payload ) ;
324330 } finally {
325- this . _currentObserver = this . _pendingObservers . shift ( ) ;
331+ this . _updateCurrentObserver ( ) ;
326332 }
327333 break ;
328334 default :
@@ -429,6 +435,14 @@ class Connection {
429435 return this . _state . initializationCompleted ( ) ;
430436 }
431437
438+ /*
439+ * Pop next pending observer form the list of observers and make it current observer.
440+ * @protected
441+ */
442+ _updateCurrentObserver ( ) {
443+ this . _currentObserver = this . _pendingObservers . shift ( ) ;
444+ }
445+
432446 /**
433447 * Synchronize - flush all queued outgoing messages and route their responses
434448 * to their respective handlers.
@@ -480,7 +494,8 @@ class ConnectionState {
480494 }
481495
482496 /**
483- * Wrap the given observer to track connection's initialization state.
497+ * Wrap the given observer to track connection's initialization state. Connection is closed by the server if
498+ * processing of INIT message fails so returned observer will handle initialization failure as a fatal error.
484499 * @param {StreamObserver } observer the observer used for INIT message.
485500 * @return {StreamObserver } updated observer.
486501 */
@@ -497,8 +512,14 @@ class ConnectionState {
497512 this . _rejectPromise ( error ) ;
498513 this . _rejectPromise = null ;
499514 }
500- if ( observer && observer . onError ) {
501- observer . onError ( error ) ;
515+
516+ this . _connection . _updateCurrentObserver ( ) ; // make sure this same observer will not be called again
517+ try {
518+ if ( observer && observer . onError ) {
519+ observer . onError ( error ) ;
520+ }
521+ } finally {
522+ this . _connection . _handleFatalError ( error ) ;
502523 }
503524 } ,
504525 onCompleted : metaData => {
0 commit comments