Skip to content

Commit 510d527

Browse files
committed
[broker-15] fix for code review
1 parent 3bd1076 commit 510d527

File tree

3 files changed

+35
-33
lines changed

3 files changed

+35
-33
lines changed

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,4 @@ public interface MqttPropertyConstants {
5252
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = false;
5353

5454
int PACKET_ID_FOR_QOS_0 = 0;
55-
int PACKET_ID_MAX = 0xFFFF;
5655
}

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -81,38 +81,9 @@ public boolean hasPendingPackets() {
8181

8282
@Override
8383
public void removeExpiredPackets() {
84-
85-
if (pendingPublishes.isEmpty()) {
86-
return;
84+
if (!pendingPublishes.isEmpty()) {
85+
pendingPublishes.runInWriteLock(this::removeExpiredPackets);
8786
}
88-
89-
pendingPublishes.runInWriteLock(publishes -> {
90-
91-
var currentTime = System.currentTimeMillis();
92-
var array = publishes.array();
93-
94-
for (int i = 0, length = publishes.size(); i < length; i++) {
95-
96-
var pendingPublish = array[i];
97-
98-
var publish = pendingPublish.publish;
99-
var messageExpiryInterval = publish.getMessageExpiryInterval();
100-
101-
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
102-
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
103-
continue;
104-
}
105-
106-
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
107-
108-
if (expiredTime < currentTime) {
109-
log.debug("Remove pending publish {} by expiration reason", publish);
110-
publishes.fastRemove(i);
111-
i--;
112-
length--;
113-
}
114-
}
115-
});
11687
}
11788

11889
@Override
@@ -167,4 +138,32 @@ public void updatePendingPacket(
167138
public void clear() {
168139
pendingPublishes.runInWriteLock(Collection::clear);
169140
}
141+
142+
private void removeExpiredPackets(@NotNull Array<PendingPublish> publishes) {
143+
144+
var currentTime = System.currentTimeMillis();
145+
var array = publishes.array();
146+
147+
for (int i = 0, length = publishes.size(); i < length; i++) {
148+
149+
var pendingPublish = array[i];
150+
151+
var publish = pendingPublish.publish;
152+
var messageExpiryInterval = publish.getMessageExpiryInterval();
153+
154+
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
155+
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
156+
continue;
157+
}
158+
159+
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
160+
161+
if (expiredTime < currentTime) {
162+
log.debug("Remove pending publish {} by expiration reason", publish);
163+
publishes.fastRemove(i);
164+
i--;
165+
length--;
166+
}
167+
}
168+
}
170169
}

src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ public InMemoryMqttSessionService(int cleanInterval) {
6464
}
6565

6666
@Override
67-
public @NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) {
67+
public @NotNull Mono<Boolean> store(
68+
@NotNull String clientId,
69+
@NotNull MqttSession session,
70+
long expiryInterval
71+
) {
6872

6973
var unsafe = (UnsafeMqttSession) session;
7074
unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000));

0 commit comments

Comments
 (0)