Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,32 @@ class AddressGroup extends Base {
return weight;
}

async getFilterConnection(req) {
try {
return await this.getConnectionDefault(req, true);
} catch (error) {
// 兜底,connect异常,说明host不可用
if (error.message === "Cannot read property 'host' of undefined" && error.name === 'TypeError') {
this.logger.info('[AddressGroup] getFilterConnection error, will retry getConnectionDefault', error);
return null;
}
throw error;
}
}

async getConnection(req) {
const hasFilter = typeof this.options.balancerFilter === 'function';
// 增加一层filter判断,若正常则直接返回,否则走原 getConnection 逻辑
const filterConnection = await hasFilter ? await this.getFilterConnection(req) : null;
if (filterConnection) return filterConnection;
return this.getConnectionDefault(req);
}

async getConnectionDefault(req, needFilter) {
const meta = req.meta;
meta.connectionGroup = this.key;

const address = this._loadbalancer.select(req);
const address = this._loadbalancer.select(req, needFilter);
if (!address) return null;

const { connectionOpts, connectionClass } = this.options;
Expand Down
16 changes: 15 additions & 1 deletion lib/client/loadbalancer/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,21 @@ class LoadBalancer extends Base {
return this.addressGroup.logger;
}

select(request) {
select(request, needFilter) {
// 需要时才需要过滤
const hasFilter = needFilter && typeof this.addressGroup.options.balancerFilter === 'function';
// 透出addressList,供外部进行一次优先筛选
if (hasFilter) {
const list = this.addressGroup.options.balancerFilter(
this.addressList
);
if (Array.isArray(list)) {
// 若优先筛选后无可用,重新尝试全部地址
if (list.length === 0) return this._doSelect(request, this.addressList);
// list.length === 1 也进行一次 doSelect,有可能getWeight异常
return this._doSelect(request, list);
}
}
if (this.size === 0) return null;
if (this.size === 1) return this.addressList[0];

Expand Down
4 changes: 2 additions & 2 deletions lib/client/loadbalancer/weight_rr.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer {

_rr(request, addressList) {
const address = addressList[this._offset];
this._offset = (this._offset + 1) % this.size;
this._offset = (this._offset + 1) % addressList.length;

const weight = this.getWeight(address);
if (weight === DEFAULT_WEIGHT) return address;
Expand All @@ -29,7 +29,7 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer {

_doSelect(request, addressList) {
let address;
let count = this.size;
let count = addressList.length;
while (count--) {
address = this._rr(request, addressList);
if (address) return address;
Expand Down
33 changes: 33 additions & 0 deletions test/client/address_group.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,39 @@ describe('test/client/address_group.test.js', () => {
await utils.closeAll();
});

it('balancerFilter 优先匹配', async function() {
await Promise.all([
utils.startServer(13201),
utils.startServer(13202),
]);

const addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
balancerFilter: addressList => {
return addressList.filter(v => {
return v.host === '127.0.0.1:13202';
});
},
});
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13201', true),
urlparse('bolt://127.0.0.1:13202', true),
];
let count = 10;
while (count--) {
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
// 优先匹配
assert(connection.url === 'bolt://127.0.0.1:13202');
}

addressGroup.close();
await connectionManager.closeAllConnections();
await utils.closeAll();
});

describe('对于连不上地址的处理', () => {
const mod = 2;
const count = 10;
Expand Down