@@ -29,32 +29,43 @@ class ConnectionProvider {
2929 acquireConnection ( mode ) {
3030 throw new Error ( 'Abstract method' ) ;
3131 }
32+
33+ _withAdditionalOnErrorCallback ( connectionPromise , driverOnErrorCallback ) {
34+ // install error handler from the driver on the connection promise; this callback is installed separately
35+ // so that it does not handle errors, instead it is just an additional error reporting facility.
36+ connectionPromise . catch ( error => driverOnErrorCallback ( error ) ) ;
37+ // return the original connection promise
38+ return connectionPromise ;
39+ }
3240}
3341
3442export class DirectConnectionProvider extends ConnectionProvider {
3543
36- constructor ( address , connectionPool ) {
44+ constructor ( address , connectionPool , driverOnErrorCallback ) {
3745 super ( ) ;
3846 this . _address = address ;
3947 this . _connectionPool = connectionPool ;
48+ this . _driverOnErrorCallback = driverOnErrorCallback ;
4049 }
4150
4251 acquireConnection ( mode ) {
43- return Promise . resolve ( this . _connectionPool . acquire ( this . _address ) ) ;
52+ const connectionPromise = Promise . resolve ( this . _connectionPool . acquire ( this . _address ) ) ;
53+ return this . _withAdditionalOnErrorCallback ( connectionPromise , this . _driverOnErrorCallback ) ;
4454 }
4555}
4656
4757export class LoadBalancer extends ConnectionProvider {
4858
49- constructor ( address , connectionPool ) {
59+ constructor ( address , connectionPool , driverOnErrorCallback ) {
5060 super ( ) ;
5161 this . _routingTable = new RoutingTable ( new RoundRobinArray ( [ address ] ) ) ;
5262 this . _rediscovery = new Rediscovery ( ) ;
5363 this . _connectionPool = connectionPool ;
64+ this . _driverOnErrorCallback = driverOnErrorCallback ;
5465 }
5566
5667 acquireConnection ( mode ) {
57- return this . _freshRoutingTable ( ) . then ( routingTable => {
68+ const connectionPromise = this . _freshRoutingTable ( ) . then ( routingTable => {
5869 if ( mode === READ ) {
5970 return this . _acquireConnectionToServer ( routingTable . readers , 'read' ) ;
6071 } else if ( mode === WRITE ) {
@@ -63,6 +74,7 @@ export class LoadBalancer extends ConnectionProvider {
6374 throw newError ( 'Illegal mode ' + mode ) ;
6475 }
6576 } ) ;
77+ return this . _withAdditionalOnErrorCallback ( connectionPromise , this . _driverOnErrorCallback ) ;
6678 }
6779
6880 forget ( address ) {
@@ -132,7 +144,8 @@ export class LoadBalancer extends ConnectionProvider {
132144 _createSessionForRediscovery ( routerAddress ) {
133145 const connection = this . _connectionPool . acquire ( routerAddress ) ;
134146 const connectionPromise = Promise . resolve ( connection ) ;
135- return new Session ( connectionPromise ) ;
147+ const connectionProvider = new SingleConnectionProvider ( connectionPromise ) ;
148+ return new Session ( READ , connectionProvider ) ;
136149 }
137150
138151 _updateRoutingTable ( newRoutingTable ) {
@@ -153,3 +166,17 @@ export class LoadBalancer extends ConnectionProvider {
153166 }
154167 }
155168}
169+
170+ export class SingleConnectionProvider extends ConnectionProvider {
171+
172+ constructor ( connectionPromise ) {
173+ super ( ) ;
174+ this . _connectionPromise = connectionPromise ;
175+ }
176+
177+ acquireConnection ( mode ) {
178+ const connectionPromise = this . _connectionPromise ;
179+ this . _connectionPromise = null ;
180+ return connectionPromise ;
181+ }
182+ }
0 commit comments