Skip to content

Commit e6f9fa7

Browse files
authored
Merge pull request #17 from JavaSaBr/feature-broker-14
[broker-14] improve publishing process
2 parents cd9d0f3 + ce5098d commit e6f9fa7

38 files changed

+985
-285
lines changed

build.gradle

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.6.0"
35+
rlibVersion = "9.8.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'
@@ -86,6 +86,15 @@ allprojects {
8686
tasks.withType(Test) {
8787
maxParallelForks = 2
8888
forkEvery = 100
89+
jvmArgs += "--enable-preview"
90+
}
91+
92+
tasks.withType(JavaCompile) {
93+
options.compilerArgs += "--enable-preview"
94+
}
95+
96+
tasks.withType(GroovyCompile) {
97+
options.forkOptions.jvmArgs += "--enable-preview"
8998
}
9099

91100
processResources {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ private interface ChannelFactory extends
103103
clientIdRegistry,
104104
authenticationService,
105105
mqttSessionService,
106-
publishRetryService
106+
publishRetryService,
107+
subscriptionService
107108
);
108109
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
109110
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -118,9 +119,15 @@ private interface ChannelFactory extends
118119
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
119120
@NotNull ClientIdRegistry clientIdRegistry,
120121
@NotNull MqttSessionService mqttSessionService,
121-
@NotNull PublishRetryService publishRetryService
122+
@NotNull PublishRetryService publishRetryService,
123+
@NotNull SubscriptionService subscriptionService
122124
) {
123-
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
125+
return new DefaultMqttClientReleaseHandler(
126+
clientIdRegistry,
127+
mqttSessionService,
128+
publishRetryService,
129+
subscriptionService
130+
);
124131
}
125132

126133
@Bean
@@ -166,7 +173,7 @@ private interface ChannelFactory extends
166173

167174
@Bean
168175
@NotNull SubscriptionService subscriptionService() {
169-
return new SimpleSubscriptionService(new SimpleSubscriptions());
176+
return new SimpleSubscriptionService();
170177
}
171178

172179
@Bean
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class InconsistentSubscriptionStateException extends RuntimeException {
6+
7+
public InconsistentSubscriptionStateException(@NotNull String message) {
8+
super(message);
9+
}
10+
11+
public InconsistentSubscriptionStateException(@NotNull Throwable cause) {
12+
super(cause);
13+
}
14+
}

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.ss.mqtt.broker.handler.client;
22

3-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
43
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
4+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
77
import com.ss.mqtt.broker.service.PublishRetryService;
8+
import com.ss.mqtt.broker.service.SubscriptionService;
89
import com.ss.rlib.common.util.StringUtils;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.log4j.Log4j2;
@@ -19,6 +20,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1920
private final @NotNull ClientIdRegistry clientIdRegistry;
2021
private final @NotNull MqttSessionService sessionService;
2122
private final @NotNull PublishRetryService publishRetryService;
23+
private final @NotNull SubscriptionService subscriptionService;
2224

2325
@Override
2426
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -41,9 +43,12 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
4143

4244
Mono<?> asyncActions = null;
4345

44-
if (session != null && client.getConnectionConfig().isSessionsEnabled()) {
45-
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
46-
client.setSession(null);
46+
if (session != null) {
47+
subscriptionService.cleanSubscriptions(client, session);
48+
if (client.getConnectionConfig().isSessionsEnabled()) {
49+
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
50+
client.setSession(null);
51+
}
4752
}
4853

4954
if (asyncActions != null) {

src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@
44
import com.ss.mqtt.broker.service.ClientIdRegistry;
55
import com.ss.mqtt.broker.service.MqttSessionService;
66
import com.ss.mqtt.broker.service.PublishRetryService;
7+
import com.ss.mqtt.broker.service.SubscriptionService;
78
import org.jetbrains.annotations.NotNull;
89

910
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
1011

1112
public DefaultMqttClientReleaseHandler(
1213
@NotNull ClientIdRegistry clientIdRegistry,
1314
@NotNull MqttSessionService sessionService,
14-
@NotNull PublishRetryService publishRetryService
15+
@NotNull PublishRetryService publishRetryService,
16+
@NotNull SubscriptionService subscriptionService
1517
) {
16-
super(clientIdRegistry, sessionService, publishRetryService);
18+
super(clientIdRegistry, sessionService, publishRetryService, subscriptionService);
1719
}
1820
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package com.ss.mqtt.broker.handler.packet.in;
22

3+
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
34
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
45
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
5-
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
66
import static com.ss.mqtt.broker.util.ReactorUtils.ifTrue;
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
9-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
109
import com.ss.mqtt.broker.model.MqttSession;
10+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1111
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1212
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.AuthenticationService;
14-
import com.ss.mqtt.broker.service.ClientIdRegistry;
15-
import com.ss.mqtt.broker.service.MqttSessionService;
16-
import com.ss.mqtt.broker.service.PublishRetryService;
13+
import com.ss.mqtt.broker.service.*;
1714
import com.ss.rlib.common.util.StringUtils;
1815
import lombok.RequiredArgsConstructor;
1916
import org.jetbrains.annotations.NotNull;
@@ -26,6 +23,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
2623
private final @NotNull AuthenticationService authenticationService;
2724
private final @NotNull MqttSessionService mqttSessionService;
2825
private final @NotNull PublishRetryService publishRetryService;
26+
private final @NotNull SubscriptionService subscriptionService;
2927

3028
@Override
3129
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -139,6 +137,8 @@ private Mono<Boolean> onConnected(
139137

140138
publishRetryService.register(client);
141139

140+
subscriptionService.restoreSubscriptions(client, session);
141+
142142
return Mono.just(Boolean.TRUE);
143143
}
144144

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.QoS;
6+
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
59
import com.ss.mqtt.broker.service.SubscriptionService;
610
import lombok.RequiredArgsConstructor;
711
import org.jetbrains.annotations.NotNull;
@@ -12,7 +16,31 @@ abstract class AbstractPublishInHandler implements PublishInHandler {
1216
protected final @NotNull SubscriptionService subscriptionService;
1317
protected final @NotNull PublishOutHandler[] publishOutHandlers;
1418

15-
protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
19+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20+
var result = subscriptionService.forEachTopicSubscriber(
21+
packet.getTopicName(),
22+
packet,
23+
this::publish
24+
);
25+
handleResult(client, packet, result);
26+
}
27+
28+
private @NotNull ActionResult publish(
29+
@NotNull Subscriber subscriber,
30+
@NotNull PublishInPacket packet
31+
) {
32+
return publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
33+
}
34+
35+
private @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
1636
return publishOutHandlers[qos.ordinal()];
1737
}
38+
39+
protected void handleResult(
40+
@NotNull MqttClient client,
41+
@NotNull PublishInPacket packet,
42+
@NotNull ActionResult result
43+
) {
44+
// nothing to do
45+
}
1846
}
Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4-
import com.ss.mqtt.broker.network.client.MqttClient;
5-
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
64
import com.ss.mqtt.broker.service.SubscriptionService;
75
import org.jetbrains.annotations.NotNull;
86

@@ -14,14 +12,4 @@ public Qos0PublishInHandler(
1412
) {
1513
super(subscriptionService, publishOutHandlers);
1614
}
17-
18-
@Override
19-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
21-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
22-
23-
for (var subscriber : subscribers) {
24-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25-
}
26-
}
2715
}
Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
56
import com.ss.mqtt.broker.network.client.MqttClient;
67
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
@@ -17,23 +18,21 @@ public Qos1PublishInHandler(
1718
}
1819

1920
@Override
20-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
21-
22-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
23-
24-
for (var subscriber : subscribers) {
25-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
26-
}
27-
28-
var reasonCode = subscribers.isEmpty() ?
29-
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;
30-
21+
protected void handleResult(
22+
@NotNull MqttClient client,
23+
@NotNull PublishInPacket packet,
24+
@NotNull ActionResult result
25+
) {
26+
var reasonCode = switch (result) {
27+
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
28+
case SUCCESS -> PublishAckReasonCode.SUCCESS;
29+
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;
30+
};
3131
var ackPacket = client.getPacketOutFactory().newPublishAck(
3232
client,
3333
packet.getPacketId(),
3434
reasonCode
3535
);
36-
3736
client.send(ackPacket);
3837
}
3938
}

src/main/java/com/ss/mqtt/broker/handler/publish/out/PublishOutHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ss.mqtt.broker.handler.publish.out;
22

3+
import com.ss.mqtt.broker.model.ActionResult;
34
import com.ss.mqtt.broker.model.Subscriber;
45
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
56
import org.jetbrains.annotations.NotNull;
@@ -9,5 +10,5 @@
910
*/
1011
public interface PublishOutHandler {
1112

12-
void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber);
13+
@NotNull ActionResult handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber);
1314
}

0 commit comments

Comments
 (0)