Skip to content

Commit 2e0fc1e

Browse files
committed
Split routing logic into multiple components
Previously it was mostly located in routing driver which owned the cluster view and all logic to call `getServers` and interpret it's output/failures. This introduces two new internal components: * `GetServersUtil` - responsible for calling `getServers` procedure and processing it's output * `Rediscovery` - responsible for fetching a routing table from the specified server Routing driver now owns a routing table and an instance of rediscovery. It uses the later one to update the routing table when needed. Added dev dependency to lolex(https://github.com/sinonjs/lolex) to be able to mock system time and test things better.
1 parent 9a277b2 commit 2e0fc1e

File tree

12 files changed

+753
-172
lines changed

12 files changed

+753
-172
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"gulp-util": "^3.0.6",
5050
"gulp-watch": "^4.3.5",
5151
"jasmine-reporters": "^2.0.7",
52+
"lolex": "^1.5.2",
5253
"merge-stream": "^1.0.0",
5354
"minimist": "^1.2.0",
5455
"phantomjs-prebuilt": "^2.1.7 ",

src/v1/driver.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {connect} from "./internal/connector";
2424
import StreamObserver from './internal/stream-observer';
2525
import {newError, SERVICE_UNAVAILABLE} from "./error";
2626

27-
let READ = 'READ', WRITE = 'WRITE';
27+
const READ = 'READ', WRITE = 'WRITE';
2828
/**
2929
* A driver maintains one or more {@link Session sessions} with a remote
3030
* Neo4j instance. Through the {@link Session sessions} you can send statements

src/v1/error.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
// A common place for constructing error objects, to keep them
2121
// uniform across the driver surface.
2222

23-
let SERVICE_UNAVAILABLE = 'ServiceUnavailable';
24-
let SESSION_EXPIRED = 'SessionExpired';
23+
const SERVICE_UNAVAILABLE = 'ServiceUnavailable';
24+
const SESSION_EXPIRED = 'SessionExpired';
25+
const PROTOCOL_ERROR = 'ProtocolError';
26+
2527
function newError(message, code="N/A") {
2628
// TODO: Idea is that we can check the code here and throw sub-classes
2729
// of Neo4jError as appropriate
@@ -40,5 +42,6 @@ export {
4042
newError,
4143
Neo4jError,
4244
SERVICE_UNAVAILABLE,
43-
SESSION_EXPIRED
45+
SESSION_EXPIRED,
46+
PROTOCOL_ERROR
4447
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import RoundRobinArray from "./round-robin-array";
21+
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error";
22+
import Integer, {int} from "../integer";
23+
24+
const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
25+
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
26+
27+
export default class GetServersUtil {
28+
29+
callGetServers(session, routerAddress) {
30+
return session.run(PROCEDURE_CALL).then(result => {
31+
return result.records;
32+
}).catch(error => {
33+
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
34+
// throw when getServers procedure not found because this is clearly a configuration issue
35+
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
36+
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
37+
}
38+
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
39+
// different session towards a different router
40+
return null;
41+
});
42+
}
43+
44+
parseTtl(record, routerAddress) {
45+
try {
46+
const now = int(Date.now());
47+
const expires = record.get('ttl').multiply(1000).add(now);
48+
// if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed
49+
if (expires.lessThan(now)) {
50+
return Integer.MAX_VALUE;
51+
}
52+
return expires;
53+
} catch (error) {
54+
throw newError(
55+
'Unable to parse TTL entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record),
56+
PROTOCOL_ERROR);
57+
}
58+
}
59+
60+
parseServers(record, routerAddress) {
61+
try {
62+
const servers = record.get('servers');
63+
64+
const routers = new RoundRobinArray();
65+
const readers = new RoundRobinArray();
66+
const writers = new RoundRobinArray();
67+
68+
servers.forEach(server => {
69+
const role = server['role'];
70+
const addresses = server['addresses'];
71+
72+
if (role === 'ROUTE') {
73+
routers.pushAll(addresses);
74+
} else if (role === 'WRITE') {
75+
writers.pushAll(addresses);
76+
} else if (role === 'READ') {
77+
readers.pushAll(addresses);
78+
} else {
79+
throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR);
80+
}
81+
});
82+
83+
return {
84+
routers: routers,
85+
readers: readers,
86+
writers: writers
87+
}
88+
} catch (ignore) {
89+
throw newError(
90+
'Unable to parse servers entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record),
91+
PROTOCOL_ERROR);
92+
}
93+
}
94+
}

src/v1/internal/rediscovery.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import GetServersUtil from "./get-servers-util";
21+
import RoutingTable from "./routing-table";
22+
import {newError, PROTOCOL_ERROR} from "../error";
23+
24+
export default class Rediscovery {
25+
26+
constructor(getServersUtil) {
27+
this._getServersUtil = getServersUtil || new GetServersUtil();
28+
}
29+
30+
lookupRoutingTableOnRouter(session, routerAddress) {
31+
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
32+
if (records === null) {
33+
// connection error happened, unable to retrieve routing table from this router, next one should be queried
34+
return null;
35+
}
36+
37+
if (records.length !== 1) {
38+
throw newError('Illegal response from router "' + routerAddress + '". ' +
39+
'Received ' + records.length + ' records but expected only one.\n' + JSON.stringify(records),
40+
PROTOCOL_ERROR);
41+
}
42+
43+
const record = records[0];
44+
45+
const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
46+
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
47+
48+
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
49+
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);
50+
51+
if (writers.isEmpty()) {
52+
// retrieved routing table has no writers, next router should be queried
53+
return null;
54+
}
55+
56+
return new RoutingTable(routers, readers, writers, expirationTime);
57+
});
58+
}
59+
60+
static _assertNonEmpty(serversRoundRobinArray, serversName, routerAddress) {
61+
if (serversRoundRobinArray.isEmpty()) {
62+
throw newError('Received no ' + serversName + ' from router ' + routerAddress, PROTOCOL_ERROR);
63+
}
64+
}
65+
}

src/v1/internal/round-robin-array.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
/**
2121
* An array that lets you hop through the elements endlessly.
2222
*/
23-
class RoundRobinArray {
23+
export default class RoundRobinArray {
24+
2425
constructor(items) {
2526
this._items = items || [];
2627
this._offset = 0;
@@ -36,6 +37,10 @@ class RoundRobinArray {
3637
}
3738

3839
pushAll(elems) {
40+
if (!Array.isArray(elems)) {
41+
throw new TypeError('Array expected but got: ' + elems);
42+
}
43+
3944
Array.prototype.push.apply(this._items, elems);
4045
}
4146

@@ -55,5 +60,3 @@ class RoundRobinArray {
5560
this._items = this._items.filter(element => element !== item);
5661
}
5762
}
58-
59-
export default RoundRobinArray

src/v1/internal/routing-table.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* limitations under the License.
1818
*/
1919
import {int} from "../integer";
20+
import RoundRobinArray from "./round-robin-array";
2021

2122
const MIN_ROUTERS = 1;
2223

@@ -26,8 +27,7 @@ export default class RoutingTable {
2627
this.routers = routers || new RoundRobinArray();
2728
this.readers = readers || new RoundRobinArray();
2829
this.writers = writers || new RoundRobinArray();
29-
30-
this._expirationTime = expirationTime || int(0);
30+
this.expirationTime = expirationTime || int(0);
3131
}
3232

3333
forget(address) {
@@ -46,7 +46,7 @@ export default class RoutingTable {
4646
}
4747

4848
isStale() {
49-
return this._expirationTime.lessThan(Date.now()) ||
49+
return this.expirationTime.lessThan(Date.now()) ||
5050
this.routers.size() <= MIN_ROUTERS ||
5151
this.readers.isEmpty() ||
5252
this.writers.isEmpty();

0 commit comments

Comments
 (0)