Skip to content

Commit 93ab3ba

Browse files
committed
Routing context in Bolt URI
Commit makes it possible to specify routing parameters in Bolt URI query. These parameters are then send to server when doing rediscovery. This is true only for 3.2+ Neo4j database because it supports `getRoutingTable(context)` procedure. Renamed `GetServersUtil` to `RoutingUtil` because it now knows about both routing procedures `getServers` and `getRoutingTable`.
1 parent 4f8c75b commit 93ab3ba

18 files changed

+500
-115
lines changed

src/v1/driver.js

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ class Driver {
5858
Driver._validateConnection.bind(this),
5959
config.connectionPoolSize
6060
);
61-
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
61+
62+
/**
63+
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
64+
* @type {ConnectionProvider}
65+
* @private
66+
*/
67+
this._connectionProvider = null;
6268
}
6369

6470
/**
@@ -115,7 +121,8 @@ class Driver {
115121
*/
116122
session(mode, bookmark) {
117123
const sessionMode = Driver._validateSessionMode(mode);
118-
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
124+
const connectionProvider = this._getOrCreateConnectionProvider();
125+
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
119126
}
120127

121128
static _validateSessionMode(rawMode) {
@@ -142,6 +149,14 @@ class Driver {
142149
return SERVICE_UNAVAILABLE;
143150
}
144151

152+
_getOrCreateConnectionProvider() {
153+
if (!this._connectionProvider) {
154+
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this);
155+
this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback);
156+
}
157+
return this._connectionProvider;
158+
}
159+
145160
_driverOnErrorCallback(error) {
146161
const userDefinedOnErrorCallback = this.onError;
147162
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
@@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver {
189204
if (this._driver.onCompleted) {
190205
this._driver.onCompleted(message);
191206
}
192-
if (this._conn && message && message.server) {
193-
this._conn.setServerVersion(message.server);
207+
208+
if (this._observer && this._observer.onComplete) {
209+
this._observer.onCompleted(message);
194210
}
195211
}
196212
}

src/v1/index.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,18 @@ function driver(url, authToken, config = {}) {
128128
assertString(url, 'Bolt URL');
129129
const scheme = parseScheme(url);
130130
const routingContext = parseRoutingContext(url);
131-
if (scheme === "bolt+routing://") {
131+
if (scheme === 'bolt+routing://') {
132132
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
133-
} else if (scheme === "bolt://") {
134-
if(!isEmptyObjectOrNull(routingContext))
135-
{
136-
throw new Error("Routing context are not supported with scheme 'bolt'. Given URI: '" + url + "'");
133+
} else if (scheme === 'bolt://') {
134+
if (!isEmptyObjectOrNull(routingContext)) {
135+
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
137136
}
138137
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
139138
} else {
140-
throw new Error("Unknown scheme: " + scheme);
139+
throw new Error(`Unknown scheme: ${scheme}`);
141140
}
142141
}
143142

144-
145143
const types ={
146144
Node,
147145
Relationship,

src/v1/internal/connection-providers.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import RoutingTable from './routing-table';
2525
import Rediscovery from './rediscovery';
2626
import hasFeature from './features';
2727
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
28-
import GetServersUtil from './get-servers-util';
28+
import RoutingUtil from './routing-util';
2929

3030
class ConnectionProvider {
3131

@@ -66,7 +66,7 @@ export class LoadBalancer extends ConnectionProvider {
6666
super();
6767
this._seedRouter = address;
6868
this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter]));
69-
this._rediscovery = new Rediscovery(new GetServersUtil(routingContext));
69+
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
7070
this._connectionPool = connectionPool;
7171
this._driverOnErrorCallback = driverOnErrorCallback;
7272
this._hostNameResolver = LoadBalancer._createHostNameResolver();
@@ -172,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider {
172172

173173
_createSessionForRediscovery(routerAddress) {
174174
const connection = this._connectionPool.acquire(routerAddress);
175-
const connectionPromise = Promise.resolve(connection);
176-
const connectionProvider = new SingleConnectionProvider(connectionPromise);
175+
// initialized connection is required for routing procedure call
176+
// server version needs to be known to decide which routing procedure to use
177+
const initializedConnectionPromise = connection.initializationCompleted();
178+
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
177179
return new Session(READ, connectionProvider);
178180
}
179181

src/v1/internal/connector.js

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../gra
2626
import {newError} from './../error';
2727
import ChannelConfig from './ch-config';
2828
import {parseHost, parsePort} from './util';
29+
import StreamObserver from './stream-observer';
2930

3031
let Channel;
3132
if( NodeChannel.available ) {
@@ -183,6 +184,8 @@ class Connection {
183184
this._isHandlingFailure = false;
184185
this._currentFailure = null;
185186

187+
this._state = new ConnectionState(this);
188+
186189
// Set to true on fatal errors, to get this out of session pool.
187190
this._isBroken = false;
188191

@@ -330,7 +333,8 @@ class Connection {
330333
/** Queue an INIT-message to be sent to the database */
331334
initialize( clientName, token, observer ) {
332335
log("C", "INIT", clientName, token);
333-
this._queueObserver(observer);
336+
const initObserver = this._state.wrap(observer);
337+
this._queueObserver(initObserver);
334338
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
335339
(err) => this._handleFatalError(err) );
336340
this._chunker.messageBoundary();
@@ -416,6 +420,15 @@ class Connection {
416420
}
417421
}
418422

423+
/**
424+
* Get promise resolved when connection initialization succeed or rejected when it fails.
425+
* Connection is initialized using {@link initialize} function.
426+
* @return {Promise<Connection>} the result of connection initialization.
427+
*/
428+
initializationCompleted() {
429+
return this._state.initializationCompleted();
430+
}
431+
419432
/**
420433
* Synchronize - flush all queued outgoing messages and route their responses
421434
* to their respective handlers.
@@ -450,6 +463,59 @@ class Connection {
450463
}
451464
}
452465

466+
class ConnectionState {
467+
468+
/**
469+
* @constructor
470+
* @param {Connection} connection the connection to track state for.
471+
*/
472+
constructor(connection) {
473+
this._connection = connection;
474+
this._resolvePromise = null;
475+
this._promise = new Promise(resolve => {
476+
this._resolvePromise = resolve;
477+
});
478+
}
479+
480+
/**
481+
* Wrap the given observer to track connection's initialization state.
482+
* @param {StreamObserver} observer the observer used for INIT message.
483+
* @return {StreamObserver} updated observer.
484+
*/
485+
wrap(observer) {
486+
return {
487+
onNext: record => {
488+
if (observer && observer.onNext) {
489+
observer.onNext(record);
490+
}
491+
},
492+
onError: error => {
493+
this._resolvePromise(Promise.reject(error));
494+
if (observer && observer.onError) {
495+
observer.onError(error);
496+
}
497+
},
498+
onCompleted: metaData => {
499+
if (metaData && metaData.server) {
500+
this._connection.setServerVersion(metaData.server);
501+
}
502+
this._resolvePromise(this._connection);
503+
if (observer && observer.onCompleted) {
504+
observer.onCompleted(metaData);
505+
}
506+
}
507+
};
508+
}
509+
510+
/**
511+
* Get promise resolved when connection initialization succeed or rejected when it fails.
512+
* @return {Promise<Connection>} the result of connection initialization.
513+
*/
514+
initializationCompleted() {
515+
return this._promise;
516+
}
517+
}
518+
453519
/**
454520
* Crete new connection to the provided url.
455521
* @access private

src/v1/internal/rediscovery.js

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@
1717
* limitations under the License.
1818
*/
1919

20-
import GetServersUtil from "./get-servers-util";
21-
import RoutingTable from "./routing-table";
22-
import {newError, PROTOCOL_ERROR} from "../error";
20+
import RoutingTable from './routing-table';
21+
import {newError, PROTOCOL_ERROR} from '../error';
2322

2423
export default class Rediscovery {
2524

26-
constructor(getServersUtil) {
27-
this._getServersUtil = getServersUtil;
25+
/**
26+
* @constructor
27+
* @param {RoutingUtil} routingUtil the util to use.
28+
*/
29+
constructor(routingUtil) {
30+
this._routingUtil = routingUtil;
2831
}
2932

33+
/**
34+
* Try to fetch new routing table from the given router.
35+
* @param {Session} session the session to use.
36+
* @param {string} routerAddress the URL of the router.
37+
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
38+
*/
3039
lookupRoutingTableOnRouter(session, routerAddress) {
31-
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
40+
return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => {
3241
if (records === null) {
3342
// connection error happened, unable to retrieve routing table from this router, next one should be queried
3443
return null;
@@ -42,8 +51,8 @@ export default class Rediscovery {
4251

4352
const record = records[0];
4453

45-
const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
46-
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
54+
const expirationTime = this._routingUtil.parseTtl(record, routerAddress);
55+
const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress);
4756

4857
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
4958
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);

src/v1/internal/get-servers-util.js renamed to src/v1/internal/routing-util.js

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,39 @@
2020
import RoundRobinArray from './round-robin-array';
2121
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
2222
import Integer, {int} from '../integer';
23-
import {ServerVersion, VERSION3_2} from './server-version-util'
23+
import {ServerVersion, VERSION_3_2_0} from './server-version';
2424

2525
const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers';
26-
const GET_ROUTING_TABLE_PARAM = "context";
27-
const CALL_GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable({"
28-
+ GET_ROUTING_TABLE_PARAM + "})";
26+
const GET_ROUTING_TABLE_PARAM = 'context';
27+
const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable({' + GET_ROUTING_TABLE_PARAM + '})';
2928
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
3029

31-
export default class GetServersUtil {
30+
export default class RoutingUtil {
3231

33-
constructor(routingContext={}) {
32+
constructor(routingContext) {
3433
this._routingContext = routingContext;
3534
}
3635

37-
callGetServers(session, routerAddress) {
38-
session.run("RETURN 1").then(result=>{
39-
let statement = {text:CALL_GET_SERVERS};
40-
41-
if(ServerVersion.fromString(result.summary.server.version).compare(VERSION3_2)>=0)
42-
{
43-
statement = {
44-
text:CALL_GET_ROUTING_TABLE,
45-
parameters:{GET_ROUTING_TABLE_PARAM: this._routingContext}};
36+
/**
37+
* Invoke routing procedure using the given session.
38+
* @param {Session} session the session to use.
39+
* @param {string} routerAddress the URL of the router.
40+
* @return {Promise<Record[]>} promise resolved with records returned by the procedure call or null if
41+
* connection error happened.
42+
*/
43+
callRoutingProcedure(session, routerAddress) {
44+
return this._callAvailableRoutingProcedure(session).then(result => {
45+
session.close();
46+
return result.records;
47+
}).catch(error => {
48+
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
49+
// throw when getServers procedure not found because this is clearly a configuration issue
50+
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
51+
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
4652
}
47-
48-
return session.run(statement).then(result => {
49-
session.close();
50-
return result.records;
51-
}).catch(error => {
52-
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
53-
// throw when getServers procedure not found because this is clearly a configuration issue
54-
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
55-
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
56-
}
57-
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
58-
// different session towards a different router
59-
return null;
60-
});
53+
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
54+
// different session towards a different router
55+
return null;
6156
});
6257
}
6358

@@ -111,4 +106,18 @@ export default class GetServersUtil {
111106
PROTOCOL_ERROR);
112107
}
113108
}
109+
110+
_callAvailableRoutingProcedure(session) {
111+
return session._run(null, null, (connection, streamObserver) => {
112+
const serverVersionString = connection.server.version;
113+
const serverVersion = ServerVersion.fromString(serverVersionString);
114+
115+
if (serverVersion.compareTo(VERSION_3_2_0) >= 0) {
116+
const params = {[GET_ROUTING_TABLE_PARAM]: this._routingContext};
117+
connection.run(CALL_GET_ROUTING_TABLE, params, streamObserver);
118+
} else {
119+
connection.run(CALL_GET_SERVERS, {}, streamObserver);
120+
}
121+
});
122+
}
114123
}

src/v1/session.js

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,28 +59,34 @@ class Session {
5959
* @return {Result} - New Result
6060
*/
6161
run(statement, parameters = {}) {
62-
if(typeof statement === 'object' && statement.text) {
62+
if (typeof statement === 'object' && statement.text) {
6363
parameters = statement.parameters || {};
6464
statement = statement.text;
6565
}
66-
assertString(statement, "Cypher statement");
66+
assertString(statement, 'Cypher statement');
6767

68+
return this._run(statement, parameters, (connection, streamObserver) =>
69+
connection.run(statement, parameters, streamObserver)
70+
);
71+
}
72+
73+
_run(statement, parameters, statementRunner) {
6874
const streamObserver = new _RunObserver(this._onRunFailure());
6975
const connectionHolder = this._connectionHolderWithMode(this._mode);
7076
if (!this._hasTx) {
7177
connectionHolder.initializeConnection();
7278
connectionHolder.getConnection().then(connection => {
7379
streamObserver.resolveConnection(connection);
74-
connection.run(statement, parameters, streamObserver);
80+
statementRunner(connection, streamObserver);
7581
connection.pullAll(streamObserver);
7682
connection.sync();
7783
}).catch(error => streamObserver.onError(error));
7884
} else {
79-
streamObserver.onError(newError("Statements cannot be run directly on a "
80-
+ "session with an open transaction; either run from within the "
81-
+ "transaction or use a different session."));
85+
streamObserver.onError(newError('Statements cannot be run directly on a ' +
86+
'session with an open transaction; either run from within the ' +
87+
'transaction or use a different session.'));
8288
}
83-
return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder );
89+
return new Result(streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder);
8490
}
8591

8692
/**

test/internal/connection-providers.test.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,10 @@ class FakeConnection {
11021102
static create(address, release) {
11031103
return new FakeConnection(address, release);
11041104
}
1105+
1106+
initializationCompleted() {
1107+
return Promise.resolve(this);
1108+
}
11051109
}
11061110

11071111
class FakeRediscovery {

0 commit comments

Comments
 (0)