@@ -23,7 +23,6 @@ import { HostNameResolver } from '../channel'
2323import SingleConnectionProvider from './connection-provider-single'
2424import PooledConnectionProvider from './connection-provider-pooled'
2525import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
26- import { controller } from '../lang'
2726import {
2827 createChannelConnection ,
2928 ConnectionErrorHandler ,
@@ -76,13 +75,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
7675 )
7776 } )
7877
79- this . _updateRoutingTableTimeoutConfig = {
80- timeout : this . _config . updateRoutingTableTimeout ,
81- reason : ( ) => newError (
82- `Routing table update timed out in ${ this . _config . updateRoutingTableTimeout } ms.`
83- )
84- }
85-
8678 this . _routingContext = { ...routingContext , address : address . toString ( ) }
8779 this . _seedRouter = address
8880 this . _rediscovery = new Rediscovery ( this . _routingContext )
@@ -151,66 +143,53 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
151143 this . _handleAuthorizationExpired ( error , address , context . database )
152144 )
153145
154- const refreshRoutingTableJob = {
155- run : async ( _ , cancelationToken ) => {
156- const routingTable = await this . _freshRoutingTable ( {
157- accessMode,
158- database : context . database ,
159- bookmarks : bookmarks ,
160- impersonatedUser,
161- onDatabaseNameResolved : ( databaseName ) => {
162- context . database = context . database || databaseName
163- if ( onDatabaseNameResolved ) {
164- onDatabaseNameResolved ( databaseName )
165- }
166- } ,
167- cancelationToken
168- } )
169-
170- // select a target server based on specified access mode
171- if ( accessMode === READ ) {
172- address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
173- name = 'read'
174- } else if ( accessMode === WRITE ) {
175- address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
176- name = 'write'
177- } else {
178- throw newError ( 'Illegal mode ' + accessMode )
179- }
180-
181- // we couldn't select a target server
182- if ( ! address ) {
183- throw newError (
184- `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
185- SESSION_EXPIRED
186- )
146+ const routingTable = await this . _freshRoutingTable ( {
147+ accessMode,
148+ database : context . database ,
149+ bookmarks : bookmarks ,
150+ impersonatedUser,
151+ onDatabaseNameResolved : ( databaseName ) => {
152+ context . database = context . database || databaseName
153+ if ( onDatabaseNameResolved ) {
154+ onDatabaseNameResolved ( databaseName )
187155 }
188- return { routingTable, address }
189156 }
190- }
157+ } )
191158
192- const acquireConnectionJob = {
193- run : async ( { routingTable, address } ) => {
194- try {
195- const connection = await this . _acquireConnectionToServer (
196- address ,
197- name ,
198- routingTable
199- )
159+ // select a target server based on specified access mode
160+ if ( accessMode === READ ) {
161+ address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
162+ name = 'read'
163+ } else if ( accessMode === WRITE ) {
164+ address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
165+ name = 'write'
166+ } else {
167+ throw newError ( 'Illegal mode ' + accessMode )
168+ }
200169
201- return new DelegateConnection ( connection , databaseSpecificErrorHandler )
202- } catch ( error ) {
203- const transformed = databaseSpecificErrorHandler . handleAndTransformError (
204- error ,
205- address
206- )
207- throw transformed
208- }
209- } ,
210- onTimeout : connection => connection . _release ( )
170+ // we couldn't select a target server
171+ if ( ! address ) {
172+ throw newError (
173+ `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
174+ SESSION_EXPIRED
175+ )
211176 }
212177
213- return controller . runWithTimeout ( this . _sessionConnectionTimeoutConfig , refreshRoutingTableJob , acquireConnectionJob )
178+ try {
179+ const connection = await this . _acquireConnectionToServer (
180+ address ,
181+ name ,
182+ routingTable
183+ )
184+
185+ return new DelegateConnection ( connection , databaseSpecificErrorHandler )
186+ } catch ( error ) {
187+ const transformed = databaseSpecificErrorHandler . handleAndTransformError (
188+ error ,
189+ address
190+ )
191+ throw transformed
192+ }
214193 }
215194
216195 async _hasProtocolVersion ( versionPredicate ) {
@@ -322,28 +301,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
322301 return this . _connectionPool . acquire ( address )
323302 }
324303
325- _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller . CancelationToken ( ( ) => false ) } = { } ) {
326- const refreshRoutingTableJob = {
327- run : ( _ , refreshCancelationToken ) => {
328- const combinedCancelationToken = refreshCancelationToken . combine ( cancelationToken )
329- const currentRoutingTable = this . _routingTableRegistry . get (
330- database ,
331- ( ) => new RoutingTable ( { database } )
332- )
304+ _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = { } ) {
305+ const currentRoutingTable = this . _routingTableRegistry . get (
306+ database ,
307+ ( ) => new RoutingTable ( { database } )
308+ )
333309
334- if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
335- return currentRoutingTable
336- }
337- this . _log . info (
338- `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
339- )
340- return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , combinedCancelationToken )
341- }
310+ if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
311+ return currentRoutingTable
342312 }
343- return controller . runWithTimeout ( this . _updateRoutingTableTimeoutConfig , refreshRoutingTableJob )
313+ this . _log . info (
314+ `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
315+ )
316+ return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved )
344317 }
345318
346- _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , cancelationToken ) {
319+ _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved ) {
347320 const knownRouters = currentRoutingTable . routers
348321
349322 if ( this . _useSeedRouter ) {
@@ -352,17 +325,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
352325 currentRoutingTable ,
353326 bookmarks ,
354327 impersonatedUser ,
355- onDatabaseNameResolved ,
356- cancelationToken
328+ onDatabaseNameResolved
357329 )
358330 }
359331 return this . _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter (
360332 knownRouters ,
361333 currentRoutingTable ,
362334 bookmarks ,
363335 impersonatedUser ,
364- onDatabaseNameResolved ,
365- cancelationToken
336+ onDatabaseNameResolved
366337 )
367338 }
368339
@@ -371,8 +342,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
371342 currentRoutingTable ,
372343 bookmarks ,
373344 impersonatedUser ,
374- onDatabaseNameResolved ,
375- cancelationToken
345+ onDatabaseNameResolved
376346 ) {
377347 // we start with seed router, no routers were probed before
378348 const seenRouters = [ ]
@@ -381,8 +351,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
381351 this . _seedRouter ,
382352 currentRoutingTable ,
383353 bookmarks ,
384- impersonatedUser ,
385- cancelationToken
354+ impersonatedUser
386355 )
387356
388357 if ( newRoutingTable ) {
@@ -393,8 +362,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
393362 knownRouters ,
394363 currentRoutingTable ,
395364 bookmarks ,
396- impersonatedUser ,
397- cancelationToken
365+ impersonatedUser
398366 )
399367 newRoutingTable = newRoutingTable2
400368 error = error2 || error
@@ -413,15 +381,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
413381 currentRoutingTable ,
414382 bookmarks ,
415383 impersonatedUser ,
416- onDatabaseNameResolved ,
417- cancelationToken
384+ onDatabaseNameResolved
418385 ) {
419386 let [ newRoutingTable , error ] = await this . _fetchRoutingTableUsingKnownRouters (
420387 knownRouters ,
421388 currentRoutingTable ,
422389 bookmarks ,
423- impersonatedUser ,
424- cancelationToken
390+ impersonatedUser
425391 )
426392
427393 if ( ! newRoutingTable ) {
@@ -431,8 +397,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
431397 this . _seedRouter ,
432398 currentRoutingTable ,
433399 bookmarks ,
434- impersonatedUser ,
435- cancelationToken
400+ impersonatedUser
436401 )
437402 }
438403
@@ -448,15 +413,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
448413 knownRouters ,
449414 currentRoutingTable ,
450415 bookmarks ,
451- impersonatedUser ,
452- cancelationToken
416+ impersonatedUser
453417 ) {
454418 const [ newRoutingTable , error ] = await this . _fetchRoutingTable (
455419 knownRouters ,
456420 currentRoutingTable ,
457421 bookmarks ,
458- impersonatedUser ,
459- cancelationToken
422+ impersonatedUser
460423 )
461424
462425 if ( newRoutingTable ) {
@@ -481,19 +444,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
481444 seedRouter ,
482445 routingTable ,
483446 bookmarks ,
484- impersonatedUser ,
485- cancelationToken
447+ impersonatedUser
486448 ) {
487449 const resolvedAddresses = await this . _resolveSeedRouter ( seedRouter )
488450
489- cancelationToken . throwIfCancellationRequested ( )
490-
491451 // filter out all addresses that we've already tried
492452 const newAddresses = resolvedAddresses . filter (
493453 address => seenRouters . indexOf ( address ) < 0
494454 )
495455
496- return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken )
456+ return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser )
497457 }
498458
499459 async _resolveSeedRouter ( seedRouter ) {
@@ -505,7 +465,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
505465 return [ ] . concat . apply ( [ ] , dnsResolvedAddresses )
506466 }
507467
508- async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken ) {
468+ async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser ) {
509469 return routerAddresses . reduce (
510470 async ( refreshedTablePromise , currentRouter , currentIndex ) => {
511471 const [ newRoutingTable ] = await refreshedTablePromise
@@ -539,13 +499,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
539499 impersonatedUser
540500 ) , null ]
541501 } catch ( error ) {
542- cancelationToken . throwIfCancellationRequested ( )
543502 return this . _handleRediscoveryError ( error , currentRouter )
544503 } finally {
545- await session . close ( )
504+ session . close ( )
546505 }
547506 } else {
548- cancelationToken . throwIfCancellationRequested ( )
549507 // unable to acquire connection and create session towards the current router
550508 // return null to signal that the next router should be tried
551509 return [ null , error ]
0 commit comments