Skip to content

Commit 269e264

Browse files
committed
[broker-15] fix for code review
1 parent 510d527 commit 269e264

File tree

1 file changed

+29
-29
lines changed

1 file changed

+29
-29
lines changed

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,34 @@ private static class PendingPublish {
3131
private volatile long lastAttemptTime;
3232
}
3333

34+
private static void removeExpiredPackets(@NotNull Array<PendingPublish> publishes) {
35+
36+
var currentTime = System.currentTimeMillis();
37+
var array = publishes.array();
38+
39+
for (int i = 0, length = publishes.size(); i < length; i++) {
40+
41+
var pendingPublish = array[i];
42+
43+
var publish = pendingPublish.publish;
44+
var messageExpiryInterval = publish.getMessageExpiryInterval();
45+
46+
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
47+
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
48+
continue;
49+
}
50+
51+
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
52+
53+
if (expiredTime < currentTime) {
54+
log.debug("Remove pending publish {} by expiration reason", publish);
55+
publishes.fastRemove(i);
56+
i--;
57+
length--;
58+
}
59+
}
60+
}
61+
3462
private final @NotNull String clientId;
3563
private final @NotNull ConcurrentArray<PendingPublish> pendingPublishes;
3664
private final @NotNull AtomicInteger packetIdGenerator;
@@ -82,7 +110,7 @@ public boolean hasPendingPackets() {
82110
@Override
83111
public void removeExpiredPackets() {
84112
if (!pendingPublishes.isEmpty()) {
85-
pendingPublishes.runInWriteLock(this::removeExpiredPackets);
113+
pendingPublishes.runInWriteLock(DefaultMqttSession::removeExpiredPackets);
86114
}
87115
}
88116

@@ -138,32 +166,4 @@ public void updatePendingPacket(
138166
public void clear() {
139167
pendingPublishes.runInWriteLock(Collection::clear);
140168
}
141-
142-
private void removeExpiredPackets(@NotNull Array<PendingPublish> publishes) {
143-
144-
var currentTime = System.currentTimeMillis();
145-
var array = publishes.array();
146-
147-
for (int i = 0, length = publishes.size(); i < length; i++) {
148-
149-
var pendingPublish = array[i];
150-
151-
var publish = pendingPublish.publish;
152-
var messageExpiryInterval = publish.getMessageExpiryInterval();
153-
154-
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
155-
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
156-
continue;
157-
}
158-
159-
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
160-
161-
if (expiredTime < currentTime) {
162-
log.debug("Remove pending publish {} by expiration reason", publish);
163-
publishes.fastRemove(i);
164-
i--;
165-
length--;
166-
}
167-
}
168-
}
169169
}

0 commit comments

Comments
 (0)