@@ -27,6 +27,7 @@ import {newError} from './../error';
2727import ChannelConfig from './ch-config' ;
2828import { parseHost , parsePort } from './util' ;
2929import StreamObserver from './stream-observer' ;
30+ import { ServerVersion , VERSION_3_2_0 } from './server-version' ;
3031
3132let Channel ;
3233if ( NodeChannel . available ) {
@@ -472,8 +473,18 @@ class Connection {
472473 return this . _packer . packable ( value , ( err ) => this . _handleFatalError ( err ) ) ;
473474 }
474475
475- setServerVersion ( version ) {
476- this . server . version = version ;
476+ /**
477+ * @protected
478+ */
479+ _markInitialized ( metadata ) {
480+ const serverVersion = metadata ? metadata . server : null ;
481+ if ( ! this . server . version ) {
482+ this . server . version = serverVersion ;
483+ const version = ServerVersion . fromString ( serverVersion ) ;
484+ if ( version . compareTo ( VERSION_3_2_0 ) < 0 ) {
485+ this . _packer . disableByteArrays ( ) ;
486+ }
487+ }
477488 }
478489}
479490
@@ -486,11 +497,15 @@ class ConnectionState {
486497 constructor ( connection ) {
487498 this . _connection = connection ;
488499
489- this . _initialized = false ;
490- this . _initializationError = null ;
500+ this . _initRequested = false ;
501+ this . _initError = null ;
491502
492- this . _resolvePromise = null ;
493- this . _rejectPromise = null ;
503+ this . _resolveInitPromise = null ;
504+ this . _rejectInitPromise = null ;
505+ this . _initPromise = new Promise ( ( resolve , reject ) => {
506+ this . _resolveInitPromise = resolve ;
507+ this . _rejectInitPromise = reject ;
508+ } ) ;
494509 }
495510
496511 /**
@@ -507,11 +522,7 @@ class ConnectionState {
507522 }
508523 } ,
509524 onError : error => {
510- this . _initializationError = error ;
511- if ( this . _rejectPromise ) {
512- this . _rejectPromise ( error ) ;
513- this . _rejectPromise = null ;
514- }
525+ this . _processFailure ( error ) ;
515526
516527 this . _connection . _updateCurrentObserver ( ) ; // make sure this same observer will not be called again
517528 try {
@@ -523,14 +534,9 @@ class ConnectionState {
523534 }
524535 } ,
525536 onCompleted : metaData => {
526- if ( metaData && metaData . server ) {
527- this . _connection . setServerVersion ( metaData . server ) ;
528- }
529- this . _initialized = true ;
530- if ( this . _resolvePromise ) {
531- this . _resolvePromise ( this . _connection ) ;
532- this . _resolvePromise = null ;
533- }
537+ this . _connection . _markInitialized ( metaData ) ;
538+ this . _resolveInitPromise ( this . _connection ) ;
539+
534540 if ( observer && observer . onCompleted ) {
535541 observer . onCompleted ( metaData ) ;
536542 }
@@ -543,15 +549,28 @@ class ConnectionState {
543549 * @return {Promise<Connection> } the result of connection initialization.
544550 */
545551 initializationCompleted ( ) {
546- if ( this . _initialized ) {
547- return Promise . resolve ( this . _connection ) ;
548- } else if ( this . _initializationError ) {
549- return Promise . reject ( this . _initializationError ) ;
552+ this . _initRequested = true ;
553+
554+ if ( this . _initError ) {
555+ const error = this . _initError ;
556+ this . _initError = null ; // to reject initPromise only once
557+ this . _rejectInitPromise ( error ) ;
558+ }
559+
560+ return this . _initPromise ;
561+ }
562+
563+ /**
564+ * @private
565+ */
566+ _processFailure ( error ) {
567+ if ( this . _initRequested ) {
568+ // someone is waiting for initialization to complete, reject the promise
569+ this . _rejectInitPromise ( error ) ;
550570 } else {
551- return new Promise ( ( resolve , reject ) => {
552- this . _resolvePromise = resolve ;
553- this . _rejectPromise = reject ;
554- } ) ;
571+ // no one is waiting for initialization, memorize the error but do not reject the promise
572+ // to avoid unnecessary unhandled promise rejection warnings
573+ this . _initError = error ;
555574 }
556575 }
557576}
0 commit comments