Skip to content

Commit 6b912ea

Browse files
authored
Improve subscription process, part 2 (#51)
* add supporting replacing subscriptions * improve error handling during subscribing * update tests * add validation packet flags * update validation message flags * extend tests for subscription process
1 parent e871b2e commit 6b912ea

File tree

99 files changed

+1521
-650
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+1521
-650
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[versions]
22
# https://gitlab.com/JavaSaBr/maven-repo/-/packages
3-
rlib = "10.0.alpha6"
3+
rlib = "10.0.alpha8"
44
# https://mvnrepository.com/artifact/org.projectlombok/lombok
55
lombok = "1.18.38"
66
# https://mvnrepository.com/artifact/org.jspecify/jspecify

model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,29 @@ public record MqttClientConnectionConfig(
1010
int topicAliasMaxValue,
1111
int keepAlive,
1212
boolean requestResponseInformation,
13-
boolean requestProblemInformation) {}
13+
boolean requestProblemInformation) {
14+
15+
public boolean subscriptionIdAvailable() {
16+
return server.subscriptionIdAvailable();
17+
}
18+
19+
public boolean retainAvailable() {
20+
return server.retainAvailable();
21+
}
22+
23+
public boolean wildcardSubscriptionAvailable() {
24+
return server.wildcardSubscriptionAvailable();
25+
}
26+
27+
public boolean sharedSubscriptionAvailable() {
28+
return server.sharedSubscriptionAvailable();
29+
}
30+
31+
public boolean sessionsEnabled() {
32+
return server.sessionsEnabled();
33+
}
34+
35+
public int maxTopicLevels() {
36+
return server.maxTopicLevels();
37+
}
38+
}

model/src/main/java/javasabr/mqtt/model/MqttProperties.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@ public interface MqttProperties {
4545
int TOPIC_ALIAS_MAX = 0xFFFF;
4646
int TOPIC_ALIAS_NOT_SET = 0;
4747

48-
int SUBSCRIPTION_ID_UNDEFINED = 0;
49-
50-
int MESSAGE_ID_UNDEFINED = -1;
48+
int SUBSCRIPTION_ID_IS_NOT_SET = 0;
5149
int MESSAGE_ID_IS_NOT_SET = 0;
5250

5351
boolean SESSIONS_ENABLED_DEFAULT = true;

model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,23 @@ public MqttServerConnectionConfig withSharedSubscriptionAvailable(boolean shared
126126
subscriptionIdAvailable,
127127
sharedSubscriptionAvailable);
128128
}
129+
130+
public MqttServerConnectionConfig withSubscriptionIdAvailable(boolean subscriptionIdAvailable) {
131+
return new MqttServerConnectionConfig(
132+
maxQos,
133+
maxMessageSize,
134+
maxStringLength,
135+
maxBinarySize,
136+
maxTopicLevels,
137+
minKeepAliveTime,
138+
receiveMaxPublishes,
139+
topicAliasMaxValue,
140+
defaultSessionExpiryInterval,
141+
keepAliveEnabled,
142+
sessionsEnabled,
143+
retainAvailable,
144+
wildcardSubscriptionAvailable,
145+
subscriptionIdAvailable,
146+
sharedSubscriptionAvailable);
147+
}
129148
}
Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package javasabr.mqtt.model;
22

33
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
4+
import javasabr.rlib.common.util.NumberedEnum;
5+
import javasabr.rlib.common.util.NumberedEnumMap;
46
import lombok.AccessLevel;
57
import lombok.Getter;
68
import lombok.RequiredArgsConstructor;
@@ -11,22 +13,28 @@
1113
@RequiredArgsConstructor
1214
@Accessors(fluent = true, chain = false)
1315
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
14-
public enum QoS {
16+
public enum QoS implements NumberedEnum<QoS> {
1517
AT_MOST_ONCE(0, SubscribeAckReasonCode.GRANTED_QOS_0),
1618
AT_LEAST_ONCE(1, SubscribeAckReasonCode.GRANTED_QOS_1),
1719
EXACTLY_ONCE(2, SubscribeAckReasonCode.GRANTED_QOS_2),
1820
INVALID(3, SubscribeAckReasonCode.IMPLEMENTATION_SPECIFIC_ERROR);
1921

20-
private static final QoS[] VALUES = values();
22+
private static final NumberedEnumMap<QoS> NUMBERED_MAP =
23+
new NumberedEnumMap<>(QoS.class);
2124

2225
public static QoS ofCode(int level) {
23-
if (level < 0 || level > EXACTLY_ONCE.ordinal()) {
24-
return INVALID;
25-
} else {
26-
return VALUES[level];
27-
}
26+
return NUMBERED_MAP.resolve(level, QoS.INVALID);
2827
}
2928

3029
int level;
3130
SubscribeAckReasonCode subscribeAckReasonCode;
31+
32+
@Override
33+
public int number() {
34+
return level;
35+
}
36+
37+
public QoS lower(QoS alternative) {
38+
return level > alternative.level ? alternative : this;
39+
}
3240
}

model/src/main/java/javasabr/mqtt/model/subscribtion/Subscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public record Subscription(
4141
public static Subscription minimal(TopicFilter topicFilter, QoS qos) {
4242
return new Subscription(
4343
topicFilter,
44-
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
44+
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
4545
qos,
4646
SubscribeRetainHandling.SEND,
4747
true,

model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import javasabr.rlib.common.ThreadSafe;
1111
import lombok.AccessLevel;
1212
import lombok.experimental.FieldDefaults;
13+
import org.jspecify.annotations.Nullable;
1314

1415
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
1516
public class ConcurrentTopicTree implements ThreadSafe {
@@ -20,8 +21,9 @@ public ConcurrentTopicTree() {
2021
this.rootNode = new TopicNode();
2122
}
2223

23-
public void subscribe(SubscriptionOwner owner, Subscription subscription) {
24-
rootNode.subscribe(0, owner, subscription, subscription.topicFilter());
24+
@Nullable
25+
public SingleSubscriber subscribe(SubscriptionOwner owner, Subscription subscription) {
26+
return rootNode.subscribe(0, owner, subscription, subscription.topicFilter());
2527
}
2628

2729
public boolean unsubscribe(SubscriptionOwner owner, TopicFilter topicFilter) {

model/src/main/java/javasabr/mqtt/model/topic/tree/TopicNode.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,16 @@ class TopicNode extends TopicTreeBase {
3535
@Nullable
3636
volatile LockableArray<Subscriber> subscribers;
3737

38-
public void subscribe(int level, SubscriptionOwner owner, Subscription subscription, TopicFilter topicFilter) {
38+
/**
39+
* @return the previous subscription from the same owner
40+
*/
41+
@Nullable
42+
public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscription subscription, TopicFilter topicFilter) {
3943
if (level == topicFilter.levelsCount()) {
40-
addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
41-
return;
44+
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
4245
}
4346
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
44-
childNode.subscribe(level + 1, owner, subscription, topicFilter);
47+
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
4548
}
4649

4750
public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {

model/src/main/java/javasabr/mqtt/model/topic/tree/TopicTreeBase.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
2121
abstract class TopicTreeBase {
2222

23-
protected static void addSubscriber(
23+
/**
24+
* @return previous subscriber with the same owner
25+
*/
26+
@Nullable
27+
protected static SingleSubscriber addSubscriber(
2428
LockableArray<Subscriber> subscribers,
2529
SubscriptionOwner owner,
2630
Subscription subscription,
@@ -29,14 +33,30 @@ protected static void addSubscriber(
2933
try {
3034
if (topicFilter instanceof SharedTopicFilter stf) {
3135
addSharedSubscriber(subscribers, owner, subscription, stf);
36+
return null;
3237
} else {
38+
SingleSubscriber previous = removePreviousIfExist(subscribers, owner);
3339
subscribers.add(new SingleSubscriber(owner, subscription));
40+
return previous;
3441
}
3542
} finally {
3643
subscribers.writeUnlock(stamp);
3744
}
3845
}
3946

47+
@Nullable
48+
private static SingleSubscriber removePreviousIfExist(
49+
LockableArray<Subscriber> subscribers,
50+
SubscriptionOwner owner) {
51+
int index = subscribers.indexOf(Subscriber::resolveOwner, owner);
52+
if (index < 0) {
53+
return null;
54+
}
55+
return subscribers
56+
.remove(index)
57+
.resolveSingle();
58+
}
59+
4060
private static void addSharedSubscriber(
4161
LockableArray<Subscriber> subscribers,
4262
SubscriptionOwner owner,

model/src/test/groovy/javasabr/mqtt/model/topic/tree/TopicTreeTest.groovy

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,14 +539,94 @@ class TopicTreeTest extends UnitSpecification {
539539
!id3WasUnsubscribed
540540
}
541541

542+
def "should replace the same subscriptions"() {
543+
given:
544+
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
545+
def owner1 = makeOwner("id1")
546+
def originalSub = makeSubscription('topic/name1')
547+
def replacementSub = makeSubscription('topic/name1')
548+
topicTree.subscribe(makeOwner("id2"), makeSubscription('topic/name1'))
549+
topicTree.subscribe(makeOwner("id3"), makeSubscription('topic/name1'))
550+
when:
551+
def previous = topicTree.subscribe(owner1, originalSub)
552+
def matched = topicTree
553+
.matches(TopicName.valueOf("topic/name1"))
554+
.toSet()
555+
then:
556+
matched.size() == 3
557+
previous == null;
558+
when:
559+
previous = topicTree.subscribe(owner1, replacementSub)
560+
matched = topicTree
561+
.matches(TopicName.valueOf("topic/name1"))
562+
.toSet()
563+
then:
564+
matched.size() == 3
565+
matched.first().subscription() == replacementSub
566+
previous != null
567+
previous.subscription() == originalSub
568+
}
569+
570+
def "should extend shared subscription group on multiply subscribing by the same topic"() {
571+
given:
572+
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
573+
def owner1 = makeOwner("id1")
574+
def owner2 = makeOwner("id2")
575+
topicTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1'))
576+
topicTree.subscribe(owner2, makeSharedSubscription('$share/group1/topic/name1'))
577+
when:
578+
def matched = topicTree
579+
.matches(TopicName.valueOf("topic/name1"))
580+
.toSet()
581+
then:
582+
matched.size() == 1
583+
matched.first().owner() == owner2
584+
when:
585+
matched = topicTree
586+
.matches(TopicName.valueOf("topic/name1"))
587+
.toSet()
588+
then:
589+
matched.size() == 1
590+
matched.first().owner() == owner1
591+
when:
592+
matched = topicTree
593+
.matches(TopicName.valueOf("topic/name1"))
594+
.toSet()
595+
then:
596+
matched.size() == 1
597+
matched.first().owner() == owner2
598+
when:
599+
topicTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1'))
600+
matched = topicTree
601+
.matches(TopicName.valueOf("topic/name1"))
602+
.toSet()
603+
then:
604+
matched.size() == 1
605+
matched.first().owner() == owner2
606+
when:
607+
matched = topicTree
608+
.matches(TopicName.valueOf("topic/name1"))
609+
.toSet()
610+
then:
611+
matched.size() == 1
612+
matched.first().owner() == owner1
613+
when:
614+
matched = topicTree
615+
.matches(TopicName.valueOf("topic/name1"))
616+
.toSet()
617+
then:
618+
matched.size() == 1
619+
matched.first().owner() == owner1
620+
}
621+
542622
static def makeOwner(String id) {
543623
return new TestSubscriptionOwner(id)
544624
}
545625

546626
static def makeSubscription(String topicFilter) {
547627
return new Subscription(
548628
TopicFilter.valueOf(topicFilter),
549-
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
629+
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
550630
QoS.AT_LEAST_ONCE,
551631
SubscribeRetainHandling.SEND,
552632
true,
@@ -556,7 +636,7 @@ class TopicTreeTest extends UnitSpecification {
556636
static def makeSharedSubscription(String topicFilter) {
557637
return new Subscription(
558638
SharedTopicFilter.valueOf(topicFilter),
559-
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
639+
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
560640
QoS.AT_LEAST_ONCE,
561641
SubscribeRetainHandling.SEND,
562642
true,
@@ -566,7 +646,7 @@ class TopicTreeTest extends UnitSpecification {
566646
static def makeSubscription(String topicFilter, int qos) {
567647
return new Subscription(
568648
TopicFilter.valueOf(topicFilter),
569-
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
649+
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
570650
QoS.ofCode(qos),
571651
SubscribeRetainHandling.SEND,
572652
true,

0 commit comments

Comments
 (0)