Skip to content

Commit cd9d0f3

Browse files
authored
Merge pull request #21 from JavaSaBr/feature-broker-15
Implement QoS 1
2 parents eaf0130 + 269e264 commit cd9d0f3

File tree

56 files changed

+947
-197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+947
-197
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.5.0"
35+
rlibVersion = "9.6.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package com.ss.mqtt.broker.config;
22

3+
import com.ss.mqtt.broker.handler.client.DefaultMqttClientReleaseHandler;
4+
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
35
import com.ss.mqtt.broker.handler.packet.in.*;
6+
import com.ss.mqtt.broker.handler.publish.in.PublishInHandler;
7+
import com.ss.mqtt.broker.handler.publish.in.Qos0PublishInHandler;
8+
import com.ss.mqtt.broker.handler.publish.in.Qos1PublishInHandler;
9+
import com.ss.mqtt.broker.handler.publish.in.Qos2PublishInHandler;
10+
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
11+
import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler;
12+
import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler;
13+
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
414
import com.ss.mqtt.broker.model.MqttPropertyConstants;
515
import com.ss.mqtt.broker.model.QoS;
616
import com.ss.mqtt.broker.network.MqttConnection;
7-
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
8-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
917
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
10-
import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler;
18+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1119
import com.ss.mqtt.broker.network.packet.PacketType;
1220
import com.ss.mqtt.broker.service.*;
1321
import com.ss.mqtt.broker.service.impl.*;
@@ -86,29 +94,33 @@ private interface ChannelFactory extends
8694
@NotNull ClientIdRegistry clientIdRegistry,
8795
@NotNull SubscriptionService subscriptionService,
8896
@NotNull PublishingService publishingService,
89-
@NotNull MqttSessionService mqttSessionService
97+
@NotNull MqttSessionService mqttSessionService,
98+
@NotNull PublishRetryService publishRetryService
9099
) {
91100

92101
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
93102
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
94103
clientIdRegistry,
95104
authenticationService,
96-
mqttSessionService
105+
mqttSessionService,
106+
publishRetryService
97107
);
98108
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
99109
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
100110
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
101111
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
112+
handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler();
102113

103114
return handlers;
104115
}
105116

106117
@Bean
107-
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler(
118+
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
108119
@NotNull ClientIdRegistry clientIdRegistry,
109-
@NotNull MqttSessionService mqttSessionService
120+
@NotNull MqttSessionService mqttSessionService,
121+
@NotNull PublishRetryService publishRetryService
110122
) {
111-
return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
123+
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
112124
}
113125

114126
@Bean
@@ -130,6 +142,14 @@ private interface ChannelFactory extends
130142
);
131143
}
132144

145+
@Bean
146+
@NotNull PublishRetryService publishRetryService() {
147+
return new DefaultPublishRetryService(
148+
env.getProperty("publish.pending.check.interval", int.class, 60 * 1000),
149+
env.getProperty("publish.retry.interval", int.class, 60 * 1000)
150+
);
151+
}
152+
133153
@Bean
134154
@NotNull InetSocketAddress deviceNetworkAddress(
135155
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,
@@ -150,8 +170,29 @@ private interface ChannelFactory extends
150170
}
151171

152172
@Bean
153-
@NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) {
154-
return new SimplePublishingService(subscriptionService);
173+
@NotNull PublishOutHandler[] publishOutHandlers() {
174+
return new PublishOutHandler[] {
175+
new Qos0PublishOutHandler(),
176+
new Qos1PublishOutHandler(),
177+
new Qos2PublishOutHandler(),
178+
};
179+
}
180+
181+
@Bean
182+
@NotNull PublishInHandler[] publishInHandlers(
183+
@NotNull SubscriptionService subscriptionService,
184+
@NotNull PublishOutHandler[] publishOutHandlers
185+
) {
186+
return new PublishInHandler[] {
187+
new Qos0PublishInHandler(subscriptionService, publishOutHandlers),
188+
new Qos1PublishInHandler(subscriptionService, publishOutHandlers),
189+
new Qos2PublishInHandler(subscriptionService, publishOutHandlers),
190+
};
191+
}
192+
193+
@Bean
194+
@NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) {
195+
return new DefaultPublishingService(publishInHandlers);
155196
}
156197

157198
@Bean

src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.ss.mqtt.broker.config;
22

33
import com.ss.mqtt.broker.model.QoS;
4+
import lombok.Data;
45
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
57
import lombok.RequiredArgsConstructor;
68
import org.jetbrains.annotations.NotNull;
79

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
3030
}
3131

3232
@Override
33-
public @NotNull MqttWritablePacket newPublish(
33+
public @NotNull PublishOutPacket newPublish(
3434
@NotNull MqttClient client,
3535
int packetId,
3636
@NotNull QoS qos,

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
4444
}
4545

4646
@Override
47-
public @NotNull MqttWritablePacket newPublish(
47+
public @NotNull PublishOutPacket newPublish(
4848
@NotNull MqttClient client,
4949
int packetId,
5050
@NotNull QoS qos,

src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.ss.mqtt.broker.model.reason.code.*;
66
import com.ss.mqtt.broker.network.client.MqttClient;
77
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
8+
import com.ss.mqtt.broker.network.packet.out.PublishOutPacket;
89
import com.ss.rlib.common.util.ArrayUtils;
910
import com.ss.rlib.common.util.StringUtils;
1011
import com.ss.rlib.common.util.array.Array;
@@ -76,7 +77,7 @@ public abstract class MqttPacketOutFactory {
7677
}
7778

7879

79-
public @NotNull MqttWritablePacket newPublish(
80+
public @NotNull PublishOutPacket newPublish(
8081
@NotNull MqttClient client,
8182
int packetId,
8283
@NotNull QoS qos,
@@ -101,7 +102,7 @@ public abstract class MqttPacketOutFactory {
101102
);
102103
}
103104

104-
public abstract @NotNull MqttWritablePacket newPublish(
105+
public abstract @NotNull PublishOutPacket newPublish(
105106
@NotNull MqttClient client,
106107
int packetId,
107108
@NotNull QoS qos,

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
7+
import com.ss.mqtt.broker.service.PublishRetryService;
78
import com.ss.rlib.common.util.StringUtils;
89
import lombok.RequiredArgsConstructor;
910
import lombok.extern.log4j.Log4j2;
@@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1718

1819
private final @NotNull ClientIdRegistry clientIdRegistry;
1920
private final @NotNull MqttSessionService sessionService;
21+
private final @NotNull PublishRetryService publishRetryService;
2022

2123
@Override
2224
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -25,6 +27,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
2527
}
2628

2729
protected @NotNull Mono<?> releaseImpl(@NotNull T client) {
30+
publishRetryService.unregister(client);
2831

2932
var clientId = client.getClientId();
3033
client.setClientId(StringUtils.EMPTY);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.ss.mqtt.broker.handler.client;
2+
3+
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
4+
import com.ss.mqtt.broker.service.ClientIdRegistry;
5+
import com.ss.mqtt.broker.service.MqttSessionService;
6+
import com.ss.mqtt.broker.service.PublishRetryService;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
10+
11+
public DefaultMqttClientReleaseHandler(
12+
@NotNull ClientIdRegistry clientIdRegistry,
13+
@NotNull MqttSessionService sessionService,
14+
@NotNull PublishRetryService publishRetryService
15+
) {
16+
super(clientIdRegistry, sessionService, publishRetryService);
17+
}
18+
}

src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.ss.mqtt.broker.service.AuthenticationService;
1414
import com.ss.mqtt.broker.service.ClientIdRegistry;
1515
import com.ss.mqtt.broker.service.MqttSessionService;
16+
import com.ss.mqtt.broker.service.PublishRetryService;
1617
import com.ss.rlib.common.util.StringUtils;
1718
import lombok.RequiredArgsConstructor;
1819
import org.jetbrains.annotations.NotNull;
@@ -21,9 +22,10 @@
2122
@RequiredArgsConstructor
2223
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {
2324

24-
private final ClientIdRegistry clientIdRegistry;
25-
private final AuthenticationService authenticationService;
26-
private final MqttSessionService mqttSessionService;
25+
private final @NotNull ClientIdRegistry clientIdRegistry;
26+
private final @NotNull AuthenticationService authenticationService;
27+
private final @NotNull MqttSessionService mqttSessionService;
28+
private final @NotNull PublishRetryService publishRetryService;
2729

2830
@Override
2931
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -135,6 +137,8 @@ private Mono<Boolean> onConnected(
135137
packet.getReceiveMax()
136138
));
137139

140+
publishRetryService.register(client);
141+
138142
return Mono.just(Boolean.TRUE);
139143
}
140144

0 commit comments

Comments
 (0)