Skip to content

Commit c368552

Browse files
committed
[broker-22] implement QoS 2
1 parent cd9d0f3 commit c368552

16 files changed

+411
-125
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.6.0"
35+
rlibVersion = "9.7.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ private interface ChannelFactory extends
110110
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
111111
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
112112
handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler();
113+
handlers[PacketType.PUBLISH_RECEIVED.ordinal()] = new PublishReceiveInPacketHandler();
114+
handlers[PacketType.PUBLISH_RELEASED.ordinal()] = new PublishReleaseInPacketHandler();
115+
handlers[PacketType.PUBLISH_COMPLETED.ordinal()] = new PublishCompleteInPacketHandler();
113116

114117
return handlers;
115118
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ public class PublishAckInPacketHandler extends AbstractPacketHandler<UnsafeMqttC
1010

1111
@Override
1212
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) {
13-
client.getSession().updatePendingPacket(client, packet);
13+
var session = client.getSession();
14+
if (session != null) {
15+
session.updateOutPendingPacket(client, packet);
16+
}
1417
}
1518
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
5+
import com.ss.mqtt.broker.network.packet.in.PublishCompleteInPacket;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
public class PublishCompleteInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishCompleteInPacket> {
11+
12+
@Override
13+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishCompleteInPacket packet) {
14+
var session = client.getSession();
15+
if (session != null) {
16+
session.updateOutPendingPacket(client, packet);
17+
}
18+
}
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
5+
import com.ss.mqtt.broker.network.packet.in.PublishReceivedInPacket;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
public class PublishReceiveInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishReceivedInPacket> {
11+
12+
@Override
13+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishReceivedInPacket packet) {
14+
var session = client.getSession();
15+
if (session != null) {
16+
session.updateOutPendingPacket(client, packet);
17+
}
18+
}
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
5+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
public class PublishReleaseInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishReleaseInPacket> {
11+
12+
@Override
13+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishReleaseInPacket packet) {
14+
var session = client.getSession();
15+
if (session != null) {
16+
session.updateInPendingPacket(client, packet);
17+
}
18+
}
19+
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.MqttSession;
5+
import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode;
6+
import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode;
47
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.HasPacketId;
59
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
10+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket;
611
import com.ss.mqtt.broker.service.SubscriptionService;
712
import org.jetbrains.annotations.NotNull;
813

9-
public class Qos2PublishInHandler extends AbstractPublishInHandler {
14+
public class Qos2PublishInHandler extends AbstractPublishInHandler implements MqttSession.PendingPacketHandler {
1015

1116
public Qos2PublishInHandler(
1217
@NotNull SubscriptionService subscriptionService,
@@ -17,6 +22,56 @@ public Qos2PublishInHandler(
1722

1823
@Override
1924
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
throw new UnsupportedOperationException();
25+
26+
var session = client.getSession();
27+
28+
// it means this client was already closed
29+
if (session == null) {
30+
return;
31+
}
32+
33+
// if this packet is re-try from client
34+
if (packet.isDuplicate()) {
35+
// if this packet was accepted before then we can skip it
36+
if (session.hasInPending(packet.getPacketId())) {
37+
return;
38+
}
39+
}
40+
41+
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
42+
43+
for (var subscriber : subscribers) {
44+
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
45+
}
46+
47+
var reasonCode = subscribers.isEmpty() ?
48+
PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS : PublishReceivedReasonCode.SUCCESS;
49+
50+
var packetId = packet.getPacketId();
51+
session.registerInPublish(packet, this, packetId);
52+
53+
var packetOutFactory = client.getPacketOutFactory();
54+
client.send(packetOutFactory.newPublishReceived(
55+
client,
56+
packet.getPacketId(),
57+
reasonCode
58+
));
59+
}
60+
61+
@Override
62+
public boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response) {
63+
64+
if (!(response instanceof PublishReleaseInPacket)) {
65+
throw new IllegalStateException("Unexpected response " + response);
66+
}
67+
68+
var packetOutFactory = client.getPacketOutFactory();
69+
client.send(packetOutFactory.newPublishCompleted(
70+
client,
71+
response.getPacketId(),
72+
PublishCompletedReasonCode.SUCCESS
73+
));
74+
75+
return true;
2176
}
2277
}
Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,57 @@
11
package com.ss.mqtt.broker.handler.publish.out;
22

3-
abstract class AbstractPublishOutHandler implements PublishOutHandler {}
3+
import com.ss.mqtt.broker.model.MqttPropertyConstants;
4+
import com.ss.mqtt.broker.model.MqttSession;
5+
import com.ss.mqtt.broker.model.QoS;
6+
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
9+
import org.jetbrains.annotations.NotNull;
10+
11+
abstract class AbstractPublishOutHandler implements PublishOutHandler {
12+
13+
@Override
14+
public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
15+
16+
var client = subscriber.getMqttClient();
17+
var session = client.getSession();
18+
19+
// if session is null it means this client was already closed
20+
if (session != null) {
21+
handleImpl(packet, subscriber, client, session);
22+
}
23+
}
24+
25+
protected abstract void handleImpl(
26+
@NotNull PublishInPacket packet,
27+
@NotNull Subscriber subscriber,
28+
@NotNull MqttClient client,
29+
@NotNull MqttSession session
30+
);
31+
32+
protected abstract @NotNull QoS getQoS();
33+
34+
protected void sendPublish(
35+
@NotNull MqttClient client,
36+
@NotNull PublishInPacket packet,
37+
int packetId,
38+
boolean duplicate
39+
) {
40+
41+
var packetOutFactory = client.getPacketOutFactory();
42+
client.send(packetOutFactory.newPublish(
43+
client,
44+
packetId,
45+
getQoS(),
46+
packet.isRetained(),
47+
duplicate,
48+
packet.getTopicName(),
49+
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
50+
packet.getPayload(),
51+
packet.isPayloadFormatIndicator(),
52+
packet.getResponseTopic(),
53+
packet.getCorrelationData(),
54+
packet.getUserProperties()
55+
));
56+
}
57+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.ss.mqtt.broker.handler.publish.out;
2+
3+
import com.ss.mqtt.broker.model.MqttSession;
4+
import com.ss.mqtt.broker.model.Subscriber;
5+
import com.ss.mqtt.broker.network.client.MqttClient;
6+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
public abstract class PersistentPublishOutHandler extends AbstractPublishOutHandler implements
10+
MqttSession.PendingPacketHandler {
11+
12+
@Override
13+
protected void handleImpl(
14+
@NotNull PublishInPacket packet,
15+
@NotNull Subscriber subscriber,
16+
@NotNull MqttClient client,
17+
@NotNull MqttSession session
18+
) {
19+
// generate new uniq packet id per client
20+
var packetId = session.nextPacketId();
21+
22+
// register waiting async response
23+
session.registerOutPublish(packet, this, packetId);
24+
25+
// send publish
26+
sendPublish(client, packet, packetId, false);
27+
}
28+
29+
@Override
30+
public void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {
31+
sendPublish(client, packet, packetId, true);
32+
}
33+
}
Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,26 @@
11
package com.ss.mqtt.broker.handler.publish.out;
22

3-
import com.ss.mqtt.broker.model.MqttPropertyConstants;
3+
import com.ss.mqtt.broker.model.MqttSession;
44
import com.ss.mqtt.broker.model.QoS;
55
import com.ss.mqtt.broker.model.Subscriber;
6+
import com.ss.mqtt.broker.network.client.MqttClient;
67
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
78
import org.jetbrains.annotations.NotNull;
89

910
public class Qos0PublishOutHandler extends AbstractPublishOutHandler {
1011

1112
@Override
12-
public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
13-
14-
var client = subscriber.getMqttClient();
15-
var packetOutFactory = client.getPacketOutFactory();
13+
protected @NotNull QoS getQoS() {
14+
return QoS.AT_MOST_ONCE_DELIVERY;
15+
}
1616

17-
client.send(packetOutFactory.newPublish(
18-
client,
19-
MqttPropertyConstants.PACKET_ID_FOR_QOS_0,
20-
QoS.AT_MOST_ONCE_DELIVERY,
21-
packet.isRetained(),
22-
false,
23-
packet.getTopicName(),
24-
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
25-
packet.getPayload(),
26-
packet.isPayloadFormatIndicator(),
27-
packet.getResponseTopic(),
28-
packet.getCorrelationData(),
29-
packet.getUserProperties()
30-
));
17+
@Override
18+
protected void handleImpl(
19+
@NotNull PublishInPacket packet,
20+
@NotNull Subscriber subscriber,
21+
@NotNull MqttClient client,
22+
@NotNull MqttSession session
23+
) {
24+
sendPublish(client, packet, 0, false);
3125
}
3226
}

0 commit comments

Comments
 (0)