Skip to content

Commit 5c31e14

Browse files
authored
Update implementation of unsubscribing process (#52)
Update the implementation of the unsubscribing process
1 parent 6b912ea commit 5c31e14

24 files changed

+562
-107
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@ import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
1212
import javasabr.mqtt.network.message.in.PublishMqttInMessage
1313
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage
1414
import javasabr.mqtt.network.message.in.SubscribeAckMqttInMessage
15-
import javasabr.mqtt.network.message.out.*
15+
import javasabr.mqtt.network.message.out.ConnectMqtt311OutMessage
16+
import javasabr.mqtt.network.message.out.ConnectMqtt5OutMessage
17+
import javasabr.mqtt.network.message.out.PublishCompleteMqtt311OutMessage
18+
import javasabr.mqtt.network.message.out.PublishCompleteMqtt5OutMessage
19+
import javasabr.mqtt.network.message.out.PublishReceivedMqtt311OutMessage
20+
import javasabr.mqtt.network.message.out.PublishReceivedMqtt5OutMessage
21+
import javasabr.mqtt.network.message.out.SubscribeMqtt311OutMessage
22+
import javasabr.mqtt.network.message.out.SubscribeMqtt5OutMessage
1623
import javasabr.mqtt.service.session.MqttSessionService
1724
import javasabr.rlib.collections.array.Array
1825
import org.springframework.beans.factory.annotation.Autowired

model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ public boolean sessionsEnabled() {
3535
public int maxTopicLevels() {
3636
return server.maxTopicLevels();
3737
}
38+
39+
public int maxStringLength() {
40+
return server.maxStringLength();
41+
}
3842
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package javasabr.mqtt.model;
2+
3+
public interface MqttProtocolErrors {
4+
String NO_ANY_TOPIC_FILTER = "No any topic filters";
5+
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported qos or retain handling";
6+
}

network/src/main/java/javasabr/mqtt/network/message/in/SubscribeMqttInMessage.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import java.util.EnumSet;
55
import java.util.Set;
66
import javasabr.mqtt.base.util.DebugUtils;
7+
import javasabr.mqtt.model.MqttClientConnectionConfig;
78
import javasabr.mqtt.model.MqttMessageProperty;
89
import javasabr.mqtt.model.MqttProperties;
9-
import javasabr.mqtt.model.MqttServerConnectionConfig;
10+
import javasabr.mqtt.model.MqttProtocolErrors;
1011
import javasabr.mqtt.model.MqttVersion;
1112
import javasabr.mqtt.model.QoS;
1213
import javasabr.mqtt.model.SubscribeRetainHandling;
@@ -21,6 +22,7 @@
2122
import lombok.Getter;
2223
import lombok.experimental.Accessors;
2324
import lombok.experimental.FieldDefaults;
25+
import org.jspecify.annotations.Nullable;
2426

2527
/**
2628
* Subscribe request.
@@ -30,7 +32,10 @@
3032
@FieldDefaults(level = AccessLevel.PROTECTED)
3133
public class SubscribeMqttInMessage extends TrackableMqttInMessage {
3234

35+
private static final Array<RequestedSubscription> EMPTY_SUBSCRIPTIONS = Array.empty(RequestedSubscription.class);
36+
3337
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.SUBSCRIBE.ordinal();
38+
public static final byte MESSAGE_FLAGS = 0b0000_0010;
3439

3540
static {
3641
DebugUtils.registerIncludedFields("subscriptions");
@@ -53,14 +58,14 @@ public class SubscribeMqttInMessage extends TrackableMqttInMessage {
5358
*/
5459
MqttMessageProperty.USER_PROPERTY);
5560

56-
final MutableArray<RequestedSubscription> subscriptions;
61+
@Nullable
62+
MutableArray<RequestedSubscription> subscriptions;
5763

5864
// properties
5965
int subscriptionId;
6066

6167
public SubscribeMqttInMessage(byte info) {
6268
super(info);
63-
this.subscriptions = ArrayFactory.mutableArray(RequestedSubscription.class);
6469
this.subscriptionId = MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET;
6570
}
6671

@@ -71,22 +76,25 @@ public byte messageType() {
7176

7277
@Override
7378
protected boolean validMessageFlags(byte messageFlags) {
74-
return messageFlags == 0b0000_0010;
79+
return messageFlags == MESSAGE_FLAGS;
7580
}
7681

7782
@Override
7883
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
7984
if (buffer.remaining() < 1) {
80-
throw new MalformedProtocolMqttException("No any topic filters");
85+
throw new MalformedProtocolMqttException(MqttProtocolErrors.NO_ANY_TOPIC_FILTER);
8186
}
8287

83-
MqttServerConnectionConfig severConnConfig = connection.serverConnectionConfig();
88+
MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
89+
int maxStringLength = connectionConfig.maxStringLength();
8490
boolean isMqtt5 = connection.isSupported(MqttVersion.MQTT_5);
8591

92+
subscriptions = ArrayFactory.mutableArray(RequestedSubscription.class);
93+
8694
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718066
8795
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168
8896
while (buffer.hasRemaining()) {
89-
String topicFilter = readString(buffer, severConnConfig.maxStringLength());
97+
String topicFilter = readString(buffer, maxStringLength);
9098

9199
int options = readByteUnsigned(buffer);
92100
int qosLevel = options & 0b0000_0011;
@@ -105,7 +113,7 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
105113

106114
QoS qos = QoS.ofCode(qosLevel);
107115
if (qos == QoS.INVALID || retainHandling == SubscribeRetainHandling.INVALID) {
108-
throw new MalformedProtocolMqttException("Unsupported qos or retain handling");
116+
throw new MalformedProtocolMqttException(MqttProtocolErrors.UNSUPPORTED_QOS_OR_RETAIN_HANDLING);
109117
}
110118

111119
subscriptions.add(new RequestedSubscription(
@@ -131,11 +139,11 @@ protected void applyProperty(MqttMessageProperty property, long value) {
131139
}
132140

133141
public Array<RequestedSubscription> subscriptions() {
134-
return subscriptions;
142+
return subscriptions == null ? EMPTY_SUBSCRIPTIONS : subscriptions;
135143
}
136144

137145
public int subscriptionsCount() {
138-
return subscriptions.size();
146+
return subscriptions().size();
139147
}
140148

141149
private static void validateMqtt311Options(int options) {

network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeMqttInMessage.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import java.util.EnumSet;
55
import java.util.Set;
66
import javasabr.mqtt.base.util.DebugUtils;
7+
import javasabr.mqtt.model.MqttClientConnectionConfig;
78
import javasabr.mqtt.model.MqttMessageProperty;
9+
import javasabr.mqtt.model.MqttProtocolErrors;
810
import javasabr.mqtt.model.exception.MalformedProtocolMqttException;
911
import javasabr.mqtt.network.MqttConnection;
1012
import javasabr.mqtt.network.message.MqttMessageType;
@@ -22,14 +24,15 @@
2224
*/
2325
@Getter
2426
@Accessors(fluent = true)
25-
@FieldDefaults(level = AccessLevel.PRIVATE)
27+
@FieldDefaults(level = AccessLevel.PROTECTED)
2628
public class UnsubscribeMqttInMessage extends TrackableMqttInMessage {
2729

2830
static {
2931
DebugUtils.registerIncludedFields("rawTopicFilters");
3032
}
3133

3234
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.UNSUBSCRIBE.ordinal();
35+
public static final byte MESSAGE_FLAGS = 0b0000_0010;
3336

3437
private static final Set<MqttMessageProperty> AVAILABLE_PROPERTIES = EnumSet.of(
3538
/*
@@ -57,19 +60,27 @@ protected boolean validMessageFlags(byte messageFlags) {
5760

5861
@Override
5962
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
60-
if (buffer.remaining() < 1) {
61-
throw new MalformedProtocolMqttException("No any topic filters.");
63+
if (!buffer.hasRemaining()) {
64+
throw new MalformedProtocolMqttException(MqttProtocolErrors.NO_ANY_TOPIC_FILTER);
6265
}
66+
67+
MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
68+
int maxStringLength = connectionConfig.maxStringLength();
69+
6370
rawTopicFilters = ArrayFactory.mutableArray(String.class);
6471
while (buffer.hasRemaining()) {
65-
rawTopicFilters.add(readString(buffer, Integer.MAX_VALUE));
72+
rawTopicFilters.add(readString(buffer, maxStringLength));
6673
}
6774
}
6875

6976
public Array<String> rawTopicFilters() {
7077
return rawTopicFilters == null ? EMPTY_STRINGS : rawTopicFilters;
7178
}
7279

80+
public int topicFiltersCount() {
81+
return rawTopicFilters == null ? 0 : rawTopicFilters.size();
82+
}
83+
7384
@Override
7485
protected Set<MqttMessageProperty> availableProperties() {
7586
return AVAILABLE_PROPERTIES;

network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
import javasabr.mqtt.network.MqttConnection;
55
import javasabr.mqtt.network.message.MqttMessageType;
66
import lombok.AccessLevel;
7+
import lombok.Getter;
78
import lombok.RequiredArgsConstructor;
9+
import lombok.experimental.Accessors;
810
import lombok.experimental.FieldDefaults;
911

1012
/**
1113
* Unsubscribe acknowledgement.
1214
*/
15+
@Getter
16+
@Accessors(fluent = true)
1317
@RequiredArgsConstructor
1418
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
1519
public class UnsubscribeAckMqtt311OutMessage extends MqttOutMessage {

network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99
import javasabr.mqtt.network.MqttConnection;
1010
import javasabr.rlib.collections.array.Array;
1111
import lombok.AccessLevel;
12+
import lombok.Getter;
13+
import lombok.experimental.Accessors;
1214
import lombok.experimental.FieldDefaults;
1315

1416
/**
1517
* Unsubscribe acknowledgement.
1618
*/
19+
@Getter
20+
@Accessors(fluent = true)
1721
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
1822
public class UnsubscribeAckMqtt5OutMessage extends UnsubscribeAckMqtt311OutMessage {
1923

network/src/test/groovy/javasabr/mqtt/network/message/in/SubscribeMqttInMessageTest.groovy

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package javasabr.mqtt.network.message.in
22

3+
34
import javasabr.mqtt.model.MqttMessageProperty
45
import javasabr.mqtt.model.MqttProperties
6+
import javasabr.mqtt.model.MqttProtocolErrors
57
import javasabr.mqtt.model.QoS
68
import javasabr.mqtt.model.SubscribeRetainHandling
79
import javasabr.mqtt.model.exception.MalformedProtocolMqttException
@@ -139,7 +141,7 @@ class SubscribeMqttInMessageTest extends BaseMqttInMessageTest {
139141
then:
140142
!successful2
141143
inMessage2.exception() instanceof MalformedProtocolMqttException
142-
inMessage2.exception().message == 'Unsupported qos or retain handling'
144+
inMessage2.exception().message == MqttProtocolErrors.UNSUPPORTED_QOS_OR_RETAIN_HANDLING
143145
when:
144146
def dataBuffer3 = BufferUtils.prepareBuffer(512) {
145147
it.putShort(messageId)
@@ -150,6 +152,6 @@ class SubscribeMqttInMessageTest extends BaseMqttInMessageTest {
150152
then:
151153
!successful3
152154
inMessage3.exception() instanceof MalformedProtocolMqttException
153-
inMessage3.exception().message == 'No any topic filters'
155+
inMessage3.exception().message == MqttProtocolErrors.NO_ANY_TOPIC_FILTER
154156
}
155157
}

network/src/testFixtures/groovy/javasabr/mqtt/network/MqttMockClient.groovy

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package javasabr.mqtt.network
22

33
import javasabr.mqtt.network.message.MqttMessageType
4-
import javasabr.mqtt.network.message.in.*
4+
import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
5+
import javasabr.mqtt.network.message.in.MqttInMessage
6+
import javasabr.mqtt.network.message.in.PublishMqttInMessage
7+
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage
8+
import javasabr.mqtt.network.message.in.SubscribeAckMqttInMessage
59
import javasabr.mqtt.network.message.out.DisconnectMqtt311OutMessage
610
import javasabr.mqtt.network.message.out.MqttOutMessage
711
import javasabr.mqtt.network.util.MqttDataUtils

network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package javasabr.mqtt.network
22

3-
import javasabr.mqtt.model.*
3+
4+
import javasabr.mqtt.model.MqttClientConnectionConfig
5+
import javasabr.mqtt.model.MqttServerConnectionConfig
6+
import javasabr.mqtt.model.MqttVersion
7+
import javasabr.mqtt.model.QoS
8+
import javasabr.mqtt.model.SubscribeRetainHandling
49
import javasabr.mqtt.model.data.type.StringPair
510
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode
611
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode

0 commit comments

Comments
 (0)