1818 */
1919
2020import Session from './session' ;
21- import { Pool } from './internal/pool' ;
22- import { connect } from "./internal/connector" ;
21+ import Pool from './internal/pool' ;
22+ import Integer from './integer' ;
23+ import { connect , scheme } from "./internal/connector" ;
2324import StreamObserver from './internal/stream-observer' ;
24- import { VERSION } from '../version' ;
25+ import VERSION from '../version' ;
26+ import "babel-polyfill" ;
2527
28+ let READ = 'READ' , WRITE = 'WRITE' ;
2629/**
2730 * A driver maintains one or more {@link Session sessions} with a remote
2831 * Neo4j instance. Through the {@link Session sessions} you can send statements
@@ -53,7 +56,7 @@ class Driver {
5356 this . _pool = new Pool (
5457 this . _createConnection . bind ( this ) ,
5558 this . _destroyConnection . bind ( this ) ,
56- this . _validateConnection . bind ( this ) ,
59+ Driver . _validateConnection . bind ( this ) ,
5760 config . connectionPoolSize
5861 ) ;
5962 }
@@ -63,7 +66,7 @@ class Driver {
6366 * @return {Connection } new connector-api session instance, a low level session API.
6467 * @access private
6568 */
66- _createConnection ( release ) {
69+ _createConnection ( release ) {
6770 let sessionId = this . _sessionIdGenerator ++ ;
6871 let streamObserver = new _ConnectionStreamObserver ( this ) ;
6972 let conn = connect ( this . _url , this . _config ) ;
@@ -80,7 +83,7 @@ class Driver {
8083 * @return {boolean } true if the connection is open
8184 * @access private
8285 **/
83- _validateConnection ( conn ) {
86+ static _validateConnection ( conn ) {
8487 return conn . isOpen ( ) ;
8588 }
8689
@@ -89,7 +92,7 @@ class Driver {
8992 * @return {Session } new session.
9093 * @access private
9194 */
92- _destroyConnection ( conn ) {
95+ _destroyConnection ( conn ) {
9396 delete this . _openSessions [ conn . _id ] ;
9497 conn . close ( ) ;
9598 }
@@ -109,7 +112,11 @@ class Driver {
109112 */
110113 session ( ) {
111114 let conn = this . _pool . acquire ( this . _url ) ;
112- return new Session ( conn , ( cb ) => {
115+ return this . _createSession ( conn ) ;
116+ }
117+
118+ _createSession ( conn ) {
119+ return new Session ( conn , ( cb ) => {
113120 // This gets called on Session#close(), and is where we return
114121 // the pooled 'connection' instance.
115122
@@ -126,7 +133,9 @@ class Driver {
126133 conn . _release ( ) ;
127134
128135 // Call user callback
129- if ( cb ) { cb ( ) ; }
136+ if ( cb ) {
137+ cb ( ) ;
138+ }
130139 } ) ;
131140 }
132141
@@ -144,25 +153,212 @@ class Driver {
144153 }
145154}
146155
156+ class RoundRobinArray {
157+ constructor ( items ) {
158+ this . _items = items || [ ] ;
159+ this . _index = 0 ;
160+ }
161+
162+ hop ( ) {
163+ let elem = this . _items [ this . _index ] ;
164+ this . _index = ( this . _index + 1 ) % ( this . _items . length - 1 ) ;
165+ return elem ;
166+ }
167+
168+ push ( elem ) {
169+ this . _items . push ( elem ) ;
170+ }
171+
172+ pushAll ( elems ) {
173+ Array . prototype . push . apply ( this . _items , elems ) ;
174+ }
175+
176+ empty ( ) {
177+ return this . _items . length === 0 ;
178+ }
179+
180+ clear ( ) {
181+ this . _items = [ ] ;
182+ this . _index = 0 ;
183+ }
184+
185+ size ( ) {
186+ return this . _items . length ;
187+ }
188+
189+ toArray ( ) {
190+ return this . _items ;
191+ }
192+
193+ remove ( item ) {
194+ let index = this . _items . indexOf ( item ) ;
195+ while ( index != - 1 ) {
196+ this . _items . splice ( index , 1 ) ;
197+ if ( index < this . _index ) {
198+ this . _index -= 1 ;
199+ }
200+ //make sure we are in range
201+ this . _index %= ( this . _items . length - 1 ) ;
202+ }
203+ }
204+ }
205+
206+ let GET_SERVERS = "CALL dbms.cluster.routing.getServers" ;
207+ class RoutingDriver extends Driver {
208+
209+ constructor ( url , userAgent = 'neo4j-javascript/0.0' , token = { } , config = { } ) {
210+ super ( url , userAgent , token , config ) ;
211+ this . _routers = new RoundRobinArray ( ) ;
212+ this . _routers . push ( url ) ;
213+ this . _readers = new RoundRobinArray ( ) ;
214+ this . _writers = new RoundRobinArray ( ) ;
215+ this . _expires = Date . now ( ) ;
216+ this . _checkServers ( ) ;
217+ }
218+
219+ _checkServers ( ) {
220+ if ( this . _expires < Date . now ( ) ||
221+ this . _routers . empty ( ) ||
222+ this . _readers . empty ( ) ||
223+ this . _writers . empty ( ) ) {
224+ this . _callServers ( ) ;
225+ }
226+ }
227+
228+ async _callServers ( ) {
229+ let seen = this . _allServers ( ) ;
230+ //clear writers and readers
231+ this . _writers . clear ( ) ;
232+ this . _readers . clear ( ) ;
233+ //we have to wait to clear routers until
234+ //we have discovered new ones
235+ let newRouters = new RoundRobinArray ( ) ;
236+ let success = false ;
237+
238+ while ( ! this . _routers . empty ( ) && ! success ) {
239+ let url = this . _routers . hop ( ) ;
240+ try {
241+ let res = await this . _call ( url ) ;
242+ if ( res . records . length != 1 ) continue ;
243+ let record = res . records [ 0 ] ;
244+ //Note we are loosing precision here but we are not
245+ //terribly worried since it is only
246+ //for dates more than 140000 years into the future.
247+ this . _expires += record . get ( 'ttl' ) . toNumber ( ) ;
248+ let servers = record . get ( 'servers' ) ;
249+ for ( let i = 0 ; i <= servers . length ; i ++ ) {
250+ let server = servers [ i ] ;
251+ seen . delete ( server ) ;
252+ let role = server [ 'role' ] ;
253+ let addresses = server [ 'addresses' ] ;
254+ if ( role === 'ROUTE' ) {
255+ newRouters . push ( server ) ;
256+ } else if ( role === 'WRITE' ) {
257+ this . _writers . push ( server ) ;
258+ } else if ( role === 'READ' ) {
259+ this . _readers . push ( server ) ;
260+ }
261+ }
262+
263+ if ( newRouters . empty ( ) ) continue ;
264+ //we have results
265+ this . _routers = newRouters ( ) ;
266+ //these are no longer valid according to server
267+ let self = this ;
268+ seen . forEach ( ( key ) => {
269+ self . _pool . purge ( key ) ;
270+ } ) ;
271+ success = true ;
272+ return ;
273+ } catch ( error ) {
274+ //continue
275+ this . _forget ( url ) ;
276+ console . log ( error ) ;
277+ }
278+ }
279+
280+ if ( this . onError ) {
281+ this . onError ( "Server could not perform discovery, please open a new driver with a different seed address." ) ;
282+ }
283+ this . close ( ) ;
284+ }
285+
286+ //TODO make nice, expose constants?
287+ session ( mode ) {
288+ let conn = this . _aquireConnection ( mode ) ;
289+ return this . _createSession ( conn ) ;
290+ }
291+
292+ _aquireConnection ( mode ) {
293+ //make sure we have enough servers
294+ this . _checkServers ( ) ;
295+
296+ let m = mode || WRITE ;
297+ if ( m === READ ) {
298+ return this . _pools . acquire ( this . _readers . hop ( ) ) ;
299+ } else if ( m === WRITE ) {
300+ return this . _pools . acquire ( this . _writers . hop ( ) ) ;
301+ } else {
302+ //TODO fail
303+ }
304+ }
305+
306+ _allServers ( ) {
307+ let seen = new Set ( this . _routers . toArray ( ) ) ;
308+ let writers = this . _writers . toArray ( )
309+ let readers = this . _readers . toArray ( )
310+ for ( let i = 0 ; i < writers . length ; i ++ ) {
311+ seen . add ( writers [ i ] ) ;
312+ }
313+ for ( let i = 0 ; i < readers . length ; i ++ ) {
314+ seen . add ( writers [ i ] ) ;
315+ }
316+ return seen ;
317+ }
318+
319+ async _call ( url ) {
320+ let conn = this . _pool . acquire ( url ) ;
321+ let session = this . _createSession ( conn ) ;
322+ console . log ( "calling " + GET_SERVERS ) ;
323+ return session . run ( GET_SERVERS )
324+ . then ( ( res ) => {
325+ session . close ( ) ;
326+ return res ;
327+ } ) . catch ( ( err ) => {
328+ this . _forget ( url ) ;
329+ return Promise . reject ( err ) ;
330+ } ) ;
331+ }
332+
333+ _forget ( url ) {
334+ this . _pools . purge ( url ) ;
335+ this . _routers . remove ( url ) ;
336+ this . _readers . remove ( url ) ;
337+ this . _writers . remove ( url ) ;
338+ }
339+ }
340+
147341/** Internal stream observer used for connection state */
148342class _ConnectionStreamObserver extends StreamObserver {
149343 constructor ( driver ) {
150344 super ( ) ;
151345 this . _driver = driver ;
152346 this . _hasFailed = false ;
153347 }
348+
154349 onError ( error ) {
155350 if ( ! this . _hasFailed ) {
156351 super . onError ( error ) ;
157- if ( this . _driver . onError ) {
352+ if ( this . _driver . onError ) {
158353 this . _driver . onError ( error ) ;
159354 }
160355 this . _hasFailed = true ;
161356 }
162357 }
358+
163359 onCompleted ( message ) {
164- if ( this . _driver . onCompleted ) {
165- this . _driver . onCompleted ( message ) ;
360+ if ( this . _driver . onCompleted ) {
361+ this . _driver . onCompleted ( message ) ;
166362 }
167363 }
168364}
@@ -205,7 +401,8 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
205401 * //
206402 * // TRUST_SYSTEM_CA_SIGNED_CERTIFICATES meand that you trust whatever certificates
207403 * // are in the default certificate chain of th
208- * trust: "TRUST_ON_FIRST_USE" | "TRUST_SIGNED_CERTIFICATES" | TRUST_CUSTOM_CA_SIGNED_CERTIFICATES | TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
404+ * trust: "TRUST_ON_FIRST_USE" | "TRUST_SIGNED_CERTIFICATES" | TRUST_CUSTOM_CA_SIGNED_CERTIFICATES |
405+ * TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
209406 *
210407 * // List of one or more paths to trusted encryption certificates. This only
211408 * // works in the NodeJS bundle, and only matters if you use "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES".
@@ -226,8 +423,13 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
226423 * @param {Object } config Configuration object. See the configuration section above for details.
227424 * @returns {Driver }
228425 */
229- function driver ( url , authToken , config = { } ) {
230- return new Driver ( url , USER_AGENT , authToken , config ) ;
426+ function driver ( url , authToken , config = { } ) {
427+ let sch = scheme ( url ) ;
428+ if ( sch === "bolt+routing://" ) {
429+ return new RoutingDriver ( url , USER_AGENT , authToken , config ) ;
430+ } else {
431+ return new Driver ( url , USER_AGENT , authToken , config ) ;
432+ }
231433}
232434
233435export { Driver , driver }
0 commit comments