Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,25 @@ class AddressGroup extends Base {
}

async getConnection(req) {
const hasFilter = typeof this.options.balancerFilter === 'function';
// 增加一层filter判断,若正常则直接返回,否则走原 getConnection 逻辑
if (hasFilter) {
try {
const filterConnection = await this.getConnectionDefault(req, true);
if (filterConnection) return filterConnection;
} catch (error) {
// filter 地址都失败后,多走一次原 getConnection 逻辑
this.logger.info('[AddressGroup] filterConnection error', error);
}
}
return await this.getConnectionDefault(req);
}

async getConnectionDefault(req, needFilter) {
const meta = req.meta;
meta.connectionGroup = this.key;

const address = this._loadbalancer.select(req);
const address = this._loadbalancer.select(req, needFilter);
if (!address) return null;

const { connectionOpts, connectionClass } = this.options;
Expand Down
16 changes: 15 additions & 1 deletion lib/client/loadbalancer/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,21 @@ class LoadBalancer extends Base {
return this.addressGroup.logger;
}

select(request) {
select(request, needFilter) {
// 需要时才需要过滤
const hasFilter = needFilter && typeof this.addressGroup.options.balancerFilter === 'function';
// 透出addressList,供外部进行一次优先筛选
if (hasFilter) {
const list = this.addressGroup.options.balancerFilter(
this.addressList
);
if (Array.isArray(list)) {
// 若优先筛选后无可用,重新尝试全部地址
if (list.length === 0) return this._doSelect(request, this.addressList);
// weight_rr 使用了this.addressList,增加了一个使用传参的filter_rr_doSelect方法
return this.filter_rr_doSelect(request, list);
}
}
if (this.size === 0) return null;
if (this.size === 1) return this.addressList[0];

Expand Down
25 changes: 25 additions & 0 deletions lib/client/loadbalancer/weight_rr.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,31 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer {
// 直接返回兜底
return addressList[this._offset];
}

// 有balancerFilter时使用外部传入的addressList
filter_rr_doSelect(request, addressList) {
let address;
let count = addressList.length;
this.filter_offset = utility.random(addressList.length);
while (count--) {
address = this.filter_rr(request, addressList);
if (address) return address;
}
// 直接返回兜底
return addressList[this.filter_offset];
}

filter_rr(request, addressList) {
const address = addressList[this.filter_offset];
this._offset = (this.filter_offset + 1) % addressList.length;

const weight = this.getWeight(address);
if (weight === DEFAULT_WEIGHT) return address;
if (weight === 0) return null;

const randNum = utility.random(DEFAULT_WEIGHT);
return weight >= randNum ? address : null;
}
}

module.exports = WeightRoundRobinLoadBalancer;
83 changes: 83 additions & 0 deletions test/client/address_group.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,89 @@ describe('test/client/address_group.test.js', () => {
await utils.closeAll();
});

it('balancerFilter 优先匹配', async function() {
await Promise.all([
utils.startServer(13201),
utils.startServer(13202),
utils.startServer(13203),
utils.startServer(13204),
]);

const addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
balancerFilter: addressList => {
return addressList.filter(v => {
return v.host === '127.0.0.1:13202';
});
},
});
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13201', true),
urlparse('bolt://127.0.0.1:13202', true),
urlparse('bolt://127.0.0.1:13203', true),
urlparse('bolt://127.0.0.1:13204', true),
];
let count = 3;
while (count--) {
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
// 优先匹配
assert(connection.url === 'bolt://127.0.0.1:13202');
}
// 匹配到的数据不可用时,会调用其他地址
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13201', true),
];
count = 3;
while (count--) {
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
assert(connection.url === 'bolt://127.0.0.1:13201');
}
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13202', true),
];
addressGroup._weightMap.set('127.0.0.1:13202', 20);
addressGroup._maxIdleWindow = addressGroup._maxIdleWindow + Date.now();
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
assert(connection.url === 'bolt://127.0.0.1:13202');
addressGroup.close();
await connectionManager.closeAllConnections();
await utils.closeAll();
});

it('balancerFilter 优先匹配 异常', async function() {
await Promise.all([
utils.startServer(13201),
utils.startServer(13202),
]);

const addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
balancerFilter: addressList => {
return addressList.map(() => [ 1 ]);
},
});
// 使用错误地址
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:132011', true),
urlparse('bolt://127.0.0.1:132022', true),
];
try {
await addressGroup.getConnection(req);
} catch (error) {
assert(error.code === 'ERR_SOCKET_BAD_PORT');
}
addressGroup.close();
await connectionManager.closeAllConnections();
await utils.closeAll();
});

describe('对于连不上地址的处理', () => {
const mod = 2;
const count = 10;
Expand Down