diff --git a/lib/client/address_group.js b/lib/client/address_group.js index 6e9d56f..f6e1ab9 100644 --- a/lib/client/address_group.js +++ b/lib/client/address_group.js @@ -341,10 +341,25 @@ class AddressGroup extends Base { } async getConnection(req) { + const hasFilter = typeof this.options.balancerFilter === 'function'; + // 增加一层filter判断,若正常则直接返回,否则走原 getConnection 逻辑 + if (hasFilter) { + try { + const filterConnection = await this.getConnectionDefault(req, true); + if (filterConnection) return filterConnection; + } catch (error) { + // filter 地址都失败后,多走一次原 getConnection 逻辑 + this.logger.info('[AddressGroup] filterConnection error', error); + } + } + return await this.getConnectionDefault(req); + } + + async getConnectionDefault(req, needFilter) { const meta = req.meta; meta.connectionGroup = this.key; - const address = this._loadbalancer.select(req); + const address = this._loadbalancer.select(req, needFilter); if (!address) return null; const { connectionOpts, connectionClass } = this.options; diff --git a/lib/client/loadbalancer/base.js b/lib/client/loadbalancer/base.js index a99fea0..c8600a4 100644 --- a/lib/client/loadbalancer/base.js +++ b/lib/client/loadbalancer/base.js @@ -23,7 +23,23 @@ class LoadBalancer extends Base { return this.addressGroup.logger; } - select(request) { + select(request, needFilter) { + // 需要时才需要过滤 + const hasFilter = needFilter && typeof this.addressGroup.options.balancerFilter === 'function'; + this.inBalancerFilterFilter = false; + // 透出addressList,供外部进行一次优先筛选 + if (hasFilter) { + const list = this.addressGroup.options.balancerFilter( + this.addressList + ); + if (Array.isArray(list) && list.length > 0) { + this.inBalancerFilterFilter = true; + const address = this._doSelect(request, list); + this.inBalancerFilterFilter = false; + // 没有数据使用this.addressList重试 + if (address) return address; + } + } if (this.size === 0) return null; if (this.size === 1) return this.addressList[0]; diff --git a/lib/client/loadbalancer/weight_rr.js b/lib/client/loadbalancer/weight_rr.js index fe4f7a9..5f6ecc2 100644 --- a/lib/client/loadbalancer/weight_rr.js +++ b/lib/client/loadbalancer/weight_rr.js @@ -28,6 +28,8 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer { } _doSelect(request, addressList) { + // 存在balancerFilterFilter标识使用filter_doSelect + if (this.inBalancerFilterFilter) return this.filter_doSelect(request, addressList); let address; let count = this.size; while (count--) { @@ -37,6 +39,31 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer { // 直接返回兜底 return addressList[this._offset]; } + + // 原逻辑this.size this.offset使用了this.addressList, 在这里新增一个使用内部addressList的方法 + filter_doSelect(request, addressList) { + let address; + let count = addressList.length; + this.filter_offset = utility.random(addressList.length); + while (count--) { + address = this.filter_rr(request, addressList); + if (address) return address; + } + // 全失败 + return null; + } + + filter_rr(request, addressList) { + const address = addressList[this.filter_offset]; + this._offset = (this.filter_offset + 1) % addressList.length; + + const weight = this.getWeight(address); + if (weight === DEFAULT_WEIGHT) return address; + if (weight === 0) return null; + + const randNum = utility.random(DEFAULT_WEIGHT); + return weight >= randNum ? address : null; + } } module.exports = WeightRoundRobinLoadBalancer; diff --git a/test/client/address_group.test.js b/test/client/address_group.test.js index 003d36d..f05bb25 100644 --- a/test/client/address_group.test.js +++ b/test/client/address_group.test.js @@ -203,6 +203,90 @@ describe('test/client/address_group.test.js', () => { await utils.closeAll(); }); + it('balancerFilter 优先匹配', async function() { + await Promise.all([ + utils.startServer(13201), + utils.startServer(13202), + utils.startServer(13203), + utils.startServer(13204), + ]); + + const addressGroup = new AddressGroup({ + key: 'xxx', + logger, + connectionManager, + balancerFilter: addressList => { + return addressList.filter(v => { + return v.host === '127.0.0.1:13202'; + }); + }, + }); + addressGroup.addressList = [ + urlparse('bolt://127.0.0.1:13201', true), + urlparse('bolt://127.0.0.1:13202', true), + urlparse('bolt://127.0.0.1:13203', true), + urlparse('bolt://127.0.0.1:13204', true), + ]; + await addressGroup.ready(); + let count = 3; + while (count--) { + const connection = await addressGroup.getConnection(req); + assert(connection && connection.isConnected); + // 优先匹配 + assert(connection.url === 'bolt://127.0.0.1:13202'); + } + // 匹配到的数据不可用时,会调用其他地址 + addressGroup.addressList = [ + urlparse('bolt://127.0.0.1:13201', true), + ]; + count = 3; + while (count--) { + const connection = await addressGroup.getConnection(req); + assert(connection && connection.isConnected); + assert(connection.url === 'bolt://127.0.0.1:13201'); + } + addressGroup.addressList = [ + urlparse('bolt://127.0.0.1:13202', true), + ]; + addressGroup._weightMap.set('127.0.0.1:13202', 20); + addressGroup._maxIdleWindow = addressGroup._maxIdleWindow + Date.now(); + const connection = await addressGroup.getConnection(req); + assert(connection && connection.isConnected); + assert(connection.url === 'bolt://127.0.0.1:13202'); + addressGroup.close(); + await connectionManager.closeAllConnections(); + await utils.closeAll(); + }); + + it('balancerFilter 优先匹配 异常', async function() { + await Promise.all([ + utils.startServer(13201), + utils.startServer(13202), + ]); + + const addressGroup = new AddressGroup({ + key: 'xxx', + logger, + connectionManager, + balancerFilter: addressList => { + return addressList.map(() => [ 1 ]); + }, + }); + // 使用错误地址 + addressGroup.addressList = [ + urlparse('bolt://127.0.0.1:132011', true), + urlparse('bolt://127.0.0.1:132022', true), + ]; + try { + await addressGroup.getConnection(req); + } catch (error) { + assert(error.code === 'ERR_SOCKET_BAD_PORT'); + } + addressGroup.close(); + await connectionManager.closeAllConnections(); + await utils.closeAll(); + }); + describe('对于连不上地址的处理', () => { const mod = 2; const count = 10;