Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
import javasabr.mqtt.network.message.in.PublishMqttInMessage
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage
import javasabr.mqtt.network.message.in.SubscribeAckMqttInMessage
import javasabr.mqtt.network.message.out.*
import javasabr.mqtt.network.message.out.ConnectMqtt311OutMessage
import javasabr.mqtt.network.message.out.ConnectMqtt5OutMessage
import javasabr.mqtt.network.message.out.PublishCompleteMqtt311OutMessage
import javasabr.mqtt.network.message.out.PublishCompleteMqtt5OutMessage
import javasabr.mqtt.network.message.out.PublishReceivedMqtt311OutMessage
import javasabr.mqtt.network.message.out.PublishReceivedMqtt5OutMessage
import javasabr.mqtt.network.message.out.SubscribeMqtt311OutMessage
import javasabr.mqtt.network.message.out.SubscribeMqtt5OutMessage
import javasabr.mqtt.service.session.MqttSessionService
import javasabr.rlib.collections.array.Array
import org.springframework.beans.factory.annotation.Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public boolean sessionsEnabled() {
public int maxTopicLevels() {
return server.maxTopicLevels();
}

public int maxStringLength() {
return server.maxStringLength();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package javasabr.mqtt.model;

public interface MqttProtocolErrors {
String NO_ANY_TOPIC_FILTER = "No any topic filters";
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported qos or retain handling";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import java.util.EnumSet;
import java.util.Set;
import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttMessageProperty;
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.MqttProtocolErrors;
import javasabr.mqtt.model.MqttVersion;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.SubscribeRetainHandling;
Expand All @@ -21,6 +22,7 @@
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

/**
* Subscribe request.
Expand All @@ -30,7 +32,10 @@
@FieldDefaults(level = AccessLevel.PROTECTED)
public class SubscribeMqttInMessage extends TrackableMqttInMessage {

private static final Array<RequestedSubscription> EMPTY_SUBSCRIPTIONS = Array.empty(RequestedSubscription.class);

private static final byte MESSAGE_TYPE = (byte) MqttMessageType.SUBSCRIBE.ordinal();
public static final byte MESSAGE_FLAGS = 0b0000_0010;

static {
DebugUtils.registerIncludedFields("subscriptions");
Expand All @@ -53,14 +58,14 @@ public class SubscribeMqttInMessage extends TrackableMqttInMessage {
*/
MqttMessageProperty.USER_PROPERTY);

final MutableArray<RequestedSubscription> subscriptions;
@Nullable
MutableArray<RequestedSubscription> subscriptions;

// properties
int subscriptionId;

public SubscribeMqttInMessage(byte info) {
super(info);
this.subscriptions = ArrayFactory.mutableArray(RequestedSubscription.class);
this.subscriptionId = MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET;
}

Expand All @@ -71,22 +76,25 @@ public byte messageType() {

@Override
protected boolean validMessageFlags(byte messageFlags) {
return messageFlags == 0b0000_0010;
return messageFlags == MESSAGE_FLAGS;
}

@Override
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
if (buffer.remaining() < 1) {
throw new MalformedProtocolMqttException("No any topic filters");
throw new MalformedProtocolMqttException(MqttProtocolErrors.NO_ANY_TOPIC_FILTER);
}

MqttServerConnectionConfig severConnConfig = connection.serverConnectionConfig();
MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
int maxStringLength = connectionConfig.maxStringLength();
boolean isMqtt5 = connection.isSupported(MqttVersion.MQTT_5);

subscriptions = ArrayFactory.mutableArray(RequestedSubscription.class);

// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718066
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168
while (buffer.hasRemaining()) {
String topicFilter = readString(buffer, severConnConfig.maxStringLength());
String topicFilter = readString(buffer, maxStringLength);

int options = readByteUnsigned(buffer);
int qosLevel = options & 0b0000_0011;
Expand All @@ -105,7 +113,7 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) {

QoS qos = QoS.ofCode(qosLevel);
if (qos == QoS.INVALID || retainHandling == SubscribeRetainHandling.INVALID) {
throw new MalformedProtocolMqttException("Unsupported qos or retain handling");
throw new MalformedProtocolMqttException(MqttProtocolErrors.UNSUPPORTED_QOS_OR_RETAIN_HANDLING);
}

subscriptions.add(new RequestedSubscription(
Expand All @@ -131,11 +139,11 @@ protected void applyProperty(MqttMessageProperty property, long value) {
}

public Array<RequestedSubscription> subscriptions() {
return subscriptions;
return subscriptions == null ? EMPTY_SUBSCRIPTIONS : subscriptions;
}

public int subscriptionsCount() {
return subscriptions.size();
return subscriptions().size();
}

private static void validateMqtt311Options(int options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import java.util.EnumSet;
import java.util.Set;
import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttMessageProperty;
import javasabr.mqtt.model.MqttProtocolErrors;
import javasabr.mqtt.model.exception.MalformedProtocolMqttException;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.message.MqttMessageType;
Expand All @@ -22,14 +24,15 @@
*/
@Getter
@Accessors(fluent = true)
@FieldDefaults(level = AccessLevel.PRIVATE)
@FieldDefaults(level = AccessLevel.PROTECTED)
public class UnsubscribeMqttInMessage extends TrackableMqttInMessage {

static {
DebugUtils.registerIncludedFields("rawTopicFilters");
}

private static final byte MESSAGE_TYPE = (byte) MqttMessageType.UNSUBSCRIBE.ordinal();
public static final byte MESSAGE_FLAGS = 0b0000_0010;

private static final Set<MqttMessageProperty> AVAILABLE_PROPERTIES = EnumSet.of(
/*
Expand Down Expand Up @@ -57,19 +60,27 @@ protected boolean validMessageFlags(byte messageFlags) {

@Override
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
if (buffer.remaining() < 1) {
throw new MalformedProtocolMqttException("No any topic filters.");
if (!buffer.hasRemaining()) {
throw new MalformedProtocolMqttException(MqttProtocolErrors.NO_ANY_TOPIC_FILTER);
}

MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
int maxStringLength = connectionConfig.maxStringLength();

rawTopicFilters = ArrayFactory.mutableArray(String.class);
while (buffer.hasRemaining()) {
rawTopicFilters.add(readString(buffer, Integer.MAX_VALUE));
rawTopicFilters.add(readString(buffer, maxStringLength));
}
}

public Array<String> rawTopicFilters() {
return rawTopicFilters == null ? EMPTY_STRINGS : rawTopicFilters;
}

public int topicFiltersCount() {
return rawTopicFilters == null ? 0 : rawTopicFilters.size();
}

@Override
protected Set<MqttMessageProperty> availableProperties() {
return AVAILABLE_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.message.MqttMessageType;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;

/**
* Unsubscribe acknowledgement.
*/
@Getter
@Accessors(fluent = true)
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
public class UnsubscribeAckMqtt311OutMessage extends MqttOutMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import javasabr.mqtt.network.MqttConnection;
import javasabr.rlib.collections.array.Array;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;

/**
* Unsubscribe acknowledgement.
*/
@Getter
@Accessors(fluent = true)
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class UnsubscribeAckMqtt5OutMessage extends UnsubscribeAckMqtt311OutMessage {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package javasabr.mqtt.network.message.in


import javasabr.mqtt.model.MqttMessageProperty
import javasabr.mqtt.model.MqttProperties
import javasabr.mqtt.model.MqttProtocolErrors
import javasabr.mqtt.model.QoS
import javasabr.mqtt.model.SubscribeRetainHandling
import javasabr.mqtt.model.exception.MalformedProtocolMqttException
Expand Down Expand Up @@ -139,7 +141,7 @@ class SubscribeMqttInMessageTest extends BaseMqttInMessageTest {
then:
!successful2
inMessage2.exception() instanceof MalformedProtocolMqttException
inMessage2.exception().message == 'Unsupported qos or retain handling'
inMessage2.exception().message == MqttProtocolErrors.UNSUPPORTED_QOS_OR_RETAIN_HANDLING
when:
def dataBuffer3 = BufferUtils.prepareBuffer(512) {
it.putShort(messageId)
Expand All @@ -150,6 +152,6 @@ class SubscribeMqttInMessageTest extends BaseMqttInMessageTest {
then:
!successful3
inMessage3.exception() instanceof MalformedProtocolMqttException
inMessage3.exception().message == 'No any topic filters'
inMessage3.exception().message == MqttProtocolErrors.NO_ANY_TOPIC_FILTER
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package javasabr.mqtt.network

import javasabr.mqtt.network.message.MqttMessageType
import javasabr.mqtt.network.message.in.*
import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
import javasabr.mqtt.network.message.in.MqttInMessage
import javasabr.mqtt.network.message.in.PublishMqttInMessage
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage
import javasabr.mqtt.network.message.in.SubscribeAckMqttInMessage
import javasabr.mqtt.network.message.out.DisconnectMqtt311OutMessage
import javasabr.mqtt.network.message.out.MqttOutMessage
import javasabr.mqtt.network.util.MqttDataUtils
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package javasabr.mqtt.network

import javasabr.mqtt.model.*

import javasabr.mqtt.model.MqttClientConnectionConfig
import javasabr.mqtt.model.MqttServerConnectionConfig
import javasabr.mqtt.model.MqttVersion
import javasabr.mqtt.model.QoS
import javasabr.mqtt.model.SubscribeRetainHandling
import javasabr.mqtt.model.data.type.StringPair
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void processReceivedValidMessage(
try {
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
//noinspection DataFlowIssue
messageHandler.processReceivedValidMessage(connection, mqttInMessage);
messageHandler.processValidMessage(connection, mqttInMessage);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
log.warning(mqttInMessage, "Received not supported MQTT message:[%s]"::formatted);
}
Expand All @@ -90,7 +90,7 @@ protected void processReceivedInvalidMessage(
try {
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
//noinspection DataFlowIssue
messageHandler.processReceivedInvalidMessage(connection, mqttInMessage);
messageHandler.processInvalidMessage(connection, mqttInMessage);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
log.warning(mqttInMessage, "Received not supported MQTT message:[%s]"::formatted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface MqttInMessageHandler {

MqttMessageType messageType();

void processReceivedValidMessage(MqttConnection connection, MqttInMessage mqttInMessage);
void processValidMessage(MqttConnection connection, MqttInMessage mqttInMessage);

void processReceivedInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage);
void processInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage);
}
Loading
Loading