Skip to content

Commit 0a1cc6d

Browse files
author
lichengsheng
committed
1,修复偏移量提交延迟的bug
2,添加实时消息。 3,添加报表 4,优化topic删除功能
1 parent 6e37654 commit 0a1cc6d

File tree

261 files changed

+12228
-6764
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

261 files changed

+12228
-6764
lines changed

doc/MQ3.0多语言方案v6.pptx

161 KB
Binary file not shown.

doc/Mq3.0设计v7.pptx

339 KB
Binary file not shown.

doc/Mq3.pptx

308 KB
Binary file not shown.

doc/mq_basic.sql

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ CREATE TABLE `consumer_group` (
8888
`meta_update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据更新时间',
8989
`mode` int(11) NOT NULL DEFAULT '1' COMMENT '1,为集群模式,2,为广播模式,3,为代理模式',
9090
`origin_name` varchar(50) DEFAULT NULL COMMENT '原始的消费者组名',
91+
`sub_env` varchar(45) NOT NULL DEFAULT 'default' COMMENT '子环境名称',
92+
`push_flag` int(11) NOT NULL DEFAULT '0' COMMENT '1,表示实时推送,0,表示非实时推送',
9193
PRIMARY KEY (`id`),
9294
UNIQUE KEY `name_index` (`name`)
9395
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@@ -192,16 +194,6 @@ CREATE TABLE `dic` (
192194
UNIQUE KEY `key_index` (`key1`)
193195
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
194196

195-
-- ----------------------------
196-
-- Table structure for group_topic
197-
-- ----------------------------
198-
DROP TABLE IF EXISTS `group_topic`;
199-
CREATE TABLE `group_topic` (
200-
`group_id` int(11) DEFAULT NULL,
201-
`group_name` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
202-
`topic_id` int(11) DEFAULT NULL,
203-
`topic_name` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL
204-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
205197

206198
-- ----------------------------
207199
-- Table structure for message_01
@@ -329,6 +321,7 @@ CREATE TABLE `queue_offset` (
329321
`meta_update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据更新时间',
330322
`origin_consumer_group_name` varchar(50) DEFAULT NULL COMMENT '原始的消费者组名',
331323
`consumer_group_mode` int(11) NOT NULL DEFAULT '1' COMMENT '1,为集群模式,2,为广播模式,3,为代理模式',
324+
`sub_env` varchar(45) NOT NULL DEFAULT 'default' COMMENT '子环境名称',
332325
PRIMARY KEY (`id`),
333326
UNIQUE KEY `uq_group_id_topic_id` (`consumer_group_id`,`topic_id`,`queue_id`),
334327
KEY `consumer_group_id_idx` (`consumer_group_id`),
@@ -346,12 +339,14 @@ CREATE TABLE `server` (
346339
`heart_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
347340
`server_type` tinyint(4) DEFAULT '1' COMMENT '1 表示broker,0 表示portal。当值为0时,这个是用在做批量清理时使用。',
348341
`status_flag` tinyint(4) DEFAULT '1' COMMENT '1 表示状态为up,0 表示状态为down,此状态在系统灰度平滑发布时使用。默认是1 表示up',
342+
`server_version` varchar(100) DEFAULT NULL COMMENT 'broker 版本号',
349343
`insert_by` varchar(100) DEFAULT NULL COMMENT '操作人',
350344
`insert_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
351345
`update_by` varchar(100) DEFAULT NULL COMMENT '操作人',
352346
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
353347
`is_active` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除',
354-
PRIMARY KEY (`id`),
348+
PRIMARY KEY (`id`),
349+
UNIQUE KEY `ip_port_uq` (`ip`,`port`),
355350
KEY `ip_port_idx` (`ip`,`port`)
356351
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
357352

doc/sql.txt

Lines changed: 312 additions & 0 deletions
Large diffs are not rendered by default.

doc/使用说明.md

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# 接入说明:
2+
3+
1. [各环境地址](#sp1)
4+
2. [客户端接入](#sp2)
5+
3. [Mq3模型](#sp7)
6+
4. [2.0升级方案](#sp8)
7+
8+
##<span id="sp1">各环境地址</span>
9+
**测试客户端broker地址**http://fat-mqbroker4.ppdapi.com
10+
**测试客户端portal地址**http://fat-mqportal4.ppdaicorp.com
11+
**UAT客户端broker地址**http://uat-mqbroker4.ppdapi.com
12+
**UAT客户端portal地址**http://uat-mqportal4.ppdaicorp.com
13+
**生产客户端broker地址**http://mqbroker4.ppdapi.com
14+
**生产客户端portal地址**http://mqportal4.ppdaicorp.com
15+
完整的host如下:
16+
```
17+
10.114.26.227 fat-mqportal4.ppdaicorp.com
18+
10.114.26.227 fat-mqbroker4.ppdapi.com
19+
10.0.23.22 uat-mqportal4.ppdaicorp.com
20+
```
21+
22+
23+
## <span id='sp2'>客户端接入</span>
24+
25+
####说明
26+
* 客户端接入分为`producer`接入和`consumer`接入
27+
* 客户端实例既可以发消息也可以订阅消息
28+
29+
## <span id='sp2'>`spring boot`接入</span>
30+
31+
- 添加maven引用
32+
33+
```
34+
<dependency>
35+
<groupId>com.ppdai.infrastructure</groupId>
36+
<artifactId>mq-client-springboot</artifactId>
37+
<version>*****</version>(推荐最新版)
38+
</dependency>
39+
```
40+
41+
- producer和consumer实例properties 配置(也可yml)
42+
43+
```
44+
mq.broker.url=http://fat-mqbroker4.ppdapi.com #//broker地址必须
45+
```
46+
47+
##producer发送端使用方法
48+
49+
```
50+
MqClient.publish("test", env.getProperty("test-token", ""), new ProducerDataDto(""));
51+
```
52+
## 发送端使用注意事项
53+
54+
* publish方法有多个重载方法,可以批量发送。
55+
* 消息支持批量发送,但是一次不能超过20条。
56+
* 调用发送和消费接口都是在ContextRefreshedEvent 事件之后,完成初始化才能使用,否则会报 <font color="red">MqNotInitException</font>。这时需要手动调用 MqClient.start(broker 地址)。
57+
58+
##consumer消费端使用方法
59+
consumer消费端订阅有两种方式,一种是基于配置订阅,一种基于代码订阅。
60+
61+
##配置订阅
62+
```
63+
<?xml version="1.0" encoding="UTF-8" ?>
64+
<messageQueue>
65+
<consumer groupName="lorgine">
66+
<topics>
67+
<topic name="test" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
68+
<topic name="test2" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
69+
</topics>
70+
</consumer>
71+
</messageQueue>
72+
```
73+
74+
对应的路径为 <font color="red">/src/main/resources/messageQueue/messageQueue.xml</font>
75+
76+
##代码订阅
77+
```
78+
ConsumerGroupVo consumerGroup = new ConsumerGroupVo("ffSub");
79+
ConsumerGroupTopicVo topicVo = new ConsumerGroupTopicVo();
80+
topicVo.setName("test");
81+
topicVo.setSubscriber(new ISubscriber() {
82+
@Override
83+
public List<Long> onMessageReceived(List<MessageDto> messages) {
84+
//do somthing
85+
return 失败的id列表;
86+
}
87+
});
88+
consumerGroup.addTopic(topicVo);
89+
MqClient.registerConsumerGroup(consumerGroup);
90+
91+
```
92+
93+
注意事项
94+
* 可以订阅多个topic,一个应用可以订阅多个消费者组,同时用户比如订阅topic组里面所有的成功topic,注意消费者组中失败的topic不用订阅。
95+
* 订阅的时候,必须要订阅消费者下所有的topic,不能只订阅部分的topic,否则会报错。
96+
* 消息3.0自动在客户宿主机上进行了cat埋点监控,用户可以实时查看客户端cat情况
97+
98+
99+
## <span id='sp7'>Mq模型</span>
100+
101+
<img src="assets/mq_m.png" width = "800" alt="Mq模型" align=center />
102+
103+
## <span id='sp8'>消息2.0升级消息3.0方案</span>
104+
105+
* 注意mq4完全兼容mq2,mq4是基于最新的mq2客户端 1.1.7.8.5版本做兼容处理的。所以如果mq2客户端支持mq2 客户端1.1.7.8.5版本,则只需要修改pom即可。
106+
* 为了平滑升级与降级,消息客户端需要接入apollo,使得用户可以通过修改配置进行平滑升级与降级。
107+
* 接入新版客户端后,客户端有三个配置项。
108+
109+
配置项 |默认值|含义
110+
---------|----------|----------
111+
mq3.pb.loc|1 |1:表示采用mq3模式发送消息,0表示采用mq2模式发送消息。默认采用mq3发送消息。
112+
mq3.cs.loc|0 |1:表示采用mq3模式消费消息,0表示采用mq2模式消费消息。默认采用mq2消费消息。
113+
mq3.fc.loc|1 |1:表示采用mq3模式获取消息数量,0表示采用mq2模式获取消息数量。默认采用mq3模式。
114+
115+
在新版的消息客户端中,为了实现平滑迁移,内置了mq2与mq3的客户端,可以根据上面的配置说明,添加修改相应的配置项,来切换不同的使用模式。注意客户端只有在mq3消费模式下,才有消息3.0动态重平衡,动态修改偏移等高级功能,否则还是跟mq2一样,无这些高级功能。
116+
117+
详细文档请参考
118+
http://confluence.ppdai.com/pages/viewpage.action?pageId=24549662

doc/升级sql.txt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
ALTER TABLE `consumer_group`
2+
ADD COLUMN `sub_env` VARCHAR(45) NULL DEFAULT 'default' AFTER `origin_name`;
3+
4+
5+
ALTER TABLE `queue_offset`
6+
ADD COLUMN `sub_env` VARCHAR(45) NULL DEFAULT 'default' AFTER `consumer_group_mode`;
7+
8+
9+
ALTER TABLE `server`
10+
ADD UNIQUE INDEX `ip_port_uq` (`ip` ASC, `port` ASC);
11+
12+
13+
ALTER TABLE `server`
14+
ADD COLUMN `server_version` VARCHAR(100) NULL COMMENT 'broker 版本号' AFTER `status_flag`;
15+
801 KB
Binary file not shown.
368 KB
Binary file not shown.

doc/消费者广播模式.pptx

38.6 KB
Binary file not shown.

0 commit comments

Comments
 (0)