1818 */
1919
2020import Session from './session' ;
21- import { Driver , READ , WRITE } from './driver' ;
21+ import { Driver } from './driver' ;
2222import { newError , SERVICE_UNAVAILABLE , SESSION_EXPIRED } from './error' ;
23- import RoundRobinArray from './internal/round-robin-array' ;
24- import RoutingTable from './internal/routing-table' ;
25- import Rediscovery from './internal/rediscovery' ;
23+ import { LoadBalancer } from './internal/connection-providers' ;
2624
2725/**
2826 * A driver that supports routing in a core-edge cluster.
@@ -31,8 +29,10 @@ class RoutingDriver extends Driver {
3129
3230 constructor ( url , userAgent , token = { } , config = { } ) {
3331 super ( url , userAgent , token , RoutingDriver . _validateConfig ( config ) ) ;
34- this . _routingTable = new RoutingTable ( new RoundRobinArray ( [ url ] ) ) ;
35- this . _rediscovery = new Rediscovery ( ) ;
32+ }
33+
34+ _createConnectionProvider ( address , connectionPool ) {
35+ return new LoadBalancer ( address , connectionPool ) ;
3636 }
3737
3838 _createSession ( connectionPromise ) {
@@ -50,10 +50,10 @@ class RoutingDriver extends Driver {
5050 let url = 'UNKNOWN' ;
5151 if ( conn ) {
5252 url = conn . url ;
53- this . _routingTable . forgetWriter ( conn . url ) ;
53+ this . _connectionProvider . forgetWriter ( conn . url ) ;
5454 } else {
5555 connectionPromise . then ( ( conn ) => {
56- this . _routingTable . forgetWriter ( conn . url ) ;
56+ this . _connectionProvider . forgetWriter ( conn . url ) ;
5757 } ) . catch ( ( ) => { /*ignore*/ } ) ;
5858 }
5959 return newError ( 'No longer possible to write to server at ' + url , SESSION_EXPIRED ) ;
@@ -63,94 +63,11 @@ class RoutingDriver extends Driver {
6363 } ) ;
6464 }
6565
66- _acquireConnection ( mode ) {
67- return this . _freshRoutingTable ( ) . then ( routingTable => {
68- if ( mode === READ ) {
69- return this . _acquireConnectionToServer ( routingTable . readers , "read" ) ;
70- } else if ( mode === WRITE ) {
71- return this . _acquireConnectionToServer ( routingTable . writers , "write" ) ;
72- } else {
73- throw newError ( 'Illegal session mode ' + mode ) ;
74- }
75- } ) ;
76- }
77-
78- _acquireConnectionToServer ( serversRoundRobinArray , serverName ) {
79- const address = serversRoundRobinArray . next ( ) ;
80- if ( ! address ) {
81- return Promise . reject ( newError ( 'No ' + serverName + ' servers available' , SESSION_EXPIRED ) ) ;
82- }
83- return this . _pool . acquire ( address ) ;
84- }
85-
86- _freshRoutingTable ( ) {
87- const currentRoutingTable = this . _routingTable ;
88-
89- if ( ! currentRoutingTable . isStale ( ) ) {
90- return Promise . resolve ( currentRoutingTable ) ;
91- }
92- return this . _refreshRoutingTable ( currentRoutingTable ) ;
93- }
94-
95- _refreshRoutingTable ( currentRoutingTable ) {
96- const knownRouters = currentRoutingTable . routers . toArray ( ) ;
97-
98- const refreshedTablePromise = knownRouters . reduce ( ( refreshedTablePromise , currentRouter , currentIndex ) => {
99- return refreshedTablePromise . then ( newRoutingTable => {
100- if ( newRoutingTable ) {
101- if ( ! newRoutingTable . writers . isEmpty ( ) ) {
102- // valid routing table was fetched - just return it, try next router otherwise
103- return newRoutingTable ;
104- }
105- } else {
106- // returned routing table was undefined, this means a connection error happened and we need to forget the
107- // previous router and try the next one
108- const previousRouter = knownRouters [ currentIndex - 1 ] ;
109- if ( previousRouter ) {
110- currentRoutingTable . forgetRouter ( previousRouter ) ;
111- }
112- }
113-
114- // try next router
115- const session = this . _createSessionForRediscovery ( currentRouter ) ;
116- return this . _rediscovery . lookupRoutingTableOnRouter ( session , currentRouter ) ;
117- } )
118- } , Promise . resolve ( null ) ) ;
119-
120- return refreshedTablePromise . then ( newRoutingTable => {
121- if ( newRoutingTable && ! newRoutingTable . writers . isEmpty ( ) ) {
122- this . _updateRoutingTable ( newRoutingTable ) ;
123- return newRoutingTable
124- }
125- throw newError ( 'Could not perform discovery. No routing servers available.' , SERVICE_UNAVAILABLE ) ;
126- } ) ;
127- }
128-
129- _createSessionForRediscovery ( routerAddress ) {
130- const connection = this . _pool . acquire ( routerAddress ) ;
131- const connectionPromise = Promise . resolve ( connection ) ;
132- // error transformer here is a no-op unlike the one in a regular session, this is so because errors are
133- // handled in the rediscovery promise chain and we do not need to do this in the error transformer
134- const errorTransformer = error => error ;
135- return new RoutingSession ( connectionPromise , errorTransformer ) ;
136- }
137-
13866 _forget ( url ) {
139- this . _routingTable . forget ( url ) ;
67+ this . _connectionProvider . forget ( url ) ;
14068 this . _pool . purge ( url ) ;
14169 }
14270
143- _updateRoutingTable ( newRoutingTable ) {
144- const currentRoutingTable = this . _routingTable ;
145-
146- // close old connections to servers not present in the new routing table
147- const staleServers = currentRoutingTable . serversDiff ( newRoutingTable ) ;
148- staleServers . forEach ( server => this . _pool . purge ( server ) ) ;
149-
150- // make this driver instance aware of the new table
151- this . _routingTable = newRoutingTable ;
152- }
153-
15471 static _validateConfig ( config ) {
15572 if ( config . trust === 'TRUST_ON_FIRST_USE' ) {
15673 throw newError ( 'The chosen trust mode is not compatible with a routing driver' ) ;
0 commit comments