Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,6 @@
import jakarta.jms.JMSException;
import jakarta.jms.Queue;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueSession;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import jakarta.jms.Topic;
Expand Down Expand Up @@ -155,8 +154,8 @@ int getNumberOfMessagesToPrefetch() {
* transaction and acknowledge mode.
*/
@Override
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
return (QueueSession) createSession(transacted, acknowledgeMode);
public SQSSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
return createSession(transacted, acknowledgeMode);
}

/**
Expand All @@ -176,7 +175,7 @@ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
* transaction and acknowledge mode.
*/
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
public SQSSession createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
actionOnConnectionTaken = true;
if (transacted || acknowledgeMode == Session.SESSION_TRANSACTED)
Expand Down Expand Up @@ -216,12 +215,12 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
}

@Override
public Session createSession(int sessionMode) throws JMSException {
public SQSSession createSession(int sessionMode) throws JMSException {
return createSession(false, sessionMode);
}

@Override
public Session createSession() throws JMSException {
public SQSSession createSession() throws JMSException {
throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueConnectionFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
Expand Down Expand Up @@ -156,12 +155,12 @@ private SQSConnection createConnection(SqsClient amazonSQS, AwsCredentialsProvid
}

@Override
public QueueConnection createQueueConnection() throws JMSException {
public SQSConnection createQueueConnection() throws JMSException {
return createConnection();
}

@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
public SQSConnection createQueueConnection(String userName, String password) throws JMSException {
return createConnection(userName, password);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,12 @@
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import jakarta.jms.QueueReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,7 +104,7 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver {
* @return a queue destination
*/
@Override
public Queue getQueue() throws JMSException {
public SQSQueueDestination getQueue() throws JMSException {
return sqsDestination;
}

Expand Down Expand Up @@ -143,7 +143,7 @@ public void setMessageListener(MessageListener listener) throws JMSException {
* On internal error
*/
@Override
public Message receive() throws JMSException {
public SQSMessage receive() throws JMSException {
checkClosed();
return sqsMessageConsumerPrefetch.receive();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -273,7 +273,7 @@ protected void processReceivedMessages(List<Message> messages) {
List<MessageManager> messageManagers = new ArrayList<>();
for (Message message : messages) {
try {
jakarta.jms.Message jmsMessage = convertToJMSMessage(message);
SQSMessage jmsMessage = convertToJMSMessage(message);
messageManagers.add(new MessageManager(this, jmsMessage));
} catch (JMSException e) {
LOG.warn("Caught exception while converting received messages", e);
Expand Down Expand Up @@ -337,10 +337,10 @@ protected void waitForPrefetch() throws InterruptedException {
* @return Converted JMS message
* @throws JMSException
*/
protected jakarta.jms.Message convertToJMSMessage(Message message) throws JMSException {
protected SQSMessage convertToJMSMessage(Message message) throws JMSException {
MessageAttributeValue messageTypeAttribute = message.messageAttributes().get(
SQSMessage.JMS_SQS_MESSAGE_TYPE);
jakarta.jms.Message jmsMessage;
SQSMessage jmsMessage;
if (messageTypeAttribute == null) {
jmsMessage = new SQSTextMessage(acknowledger, queueUrl, message);
} else {
Expand Down Expand Up @@ -457,15 +457,15 @@ private void unrequestMessage() {
}
}

public record MessageManager(PrefetchManager prefetchManager, jakarta.jms.Message message) {
public record MessageManager(PrefetchManager prefetchManager, SQSMessage message) {

}

jakarta.jms.Message receive() throws JMSException {
SQSMessage receive() throws JMSException {
return receive(0);
}

jakarta.jms.Message receive(long timeout) throws JMSException {
SQSMessage receive(long timeout) throws JMSException {
if (cannotDeliver()) {
return null;
}
Expand Down Expand Up @@ -516,7 +516,7 @@ protected void notifyStateChange() {
}
}

jakarta.jms.Message receiveNoWait() throws JMSException {
SQSMessage receiveNoWait() throws JMSException {
if (cannotDeliver()) {
return null;
}
Expand Down Expand Up @@ -571,11 +571,11 @@ void close() {
/**
* Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge
*/
private jakarta.jms.Message messageHandler(MessageManager messageManager) throws JMSException {
private SQSMessage messageHandler(MessageManager messageManager) throws JMSException {
if (messageManager == null) {
return null;
}
jakarta.jms.Message message = messageManager.message();
SQSMessage message = messageManager.message();

// Notify PrefetchThread that message is dispatched
this.messageDispatched();
Expand Down Expand Up @@ -616,7 +616,7 @@ List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affecte
Iterator<MessageManager> managerIterator = messageQueue.iterator();
while (managerIterator.hasNext()) {
MessageManager messageManager = managerIterator.next();
SQSMessage prefetchedMessage = (SQSMessage)messageManager.message();
SQSMessage prefetchedMessage = messageManager.message();
SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(prefetchedMessage);

//is the prefetch entry for one of the affected group ids?
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -165,7 +165,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
}

@Override
public Queue getQueue() throws JMSException {
public SQSQueueDestination getQueue() throws JMSException {
return sqsDestination;
}

Expand Down Expand Up @@ -307,7 +307,7 @@ public void send(Queue queue, Message message, int deliveryMode, int priority, l
* @return this producer's queue destination
*/
@Override
public Destination getDestination() throws JMSException {
public SQSQueueDestination getDestination() throws JMSException {
return sqsDestination;
}

Expand Down
41 changes: 18 additions & 23 deletions src/main/java/com/amazon/sqs/javamessaging/SQSSession.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazon.sqs.javamessaging.util.SQSMessagingClientThreadFactory;
import jakarta.jms.BytesMessage;
import jakarta.jms.Destination;
import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
Expand All @@ -32,17 +31,13 @@
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Queue;
import jakarta.jms.QueueBrowser;
import jakarta.jms.QueueReceiver;
import jakarta.jms.QueueSender;
import jakarta.jms.QueueSession;
import jakarta.jms.Session;
import jakarta.jms.StreamMessage;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.TemporaryTopic;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import org.slf4j.Logger;
Expand Down Expand Up @@ -224,8 +219,8 @@ boolean isActiveCallbackSessionThread() {
* @throws JMSException If session is closed
*/
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return (QueueReceiver) createConsumer(queue);
public SQSMessageConsumer createReceiver(Queue queue) throws JMSException {
return createConsumer(queue);
}

/**
Expand All @@ -238,7 +233,7 @@ public QueueReceiver createReceiver(Queue queue) throws JMSException {
* @throws JMSException If session is closed
*/
@Override
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
public SQSMessageConsumer createReceiver(Queue queue, String messageSelector) throws JMSException {
return createReceiver(queue);
}

Expand All @@ -250,8 +245,8 @@ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws
* @throws JMSException If session is closed
*/
@Override
public QueueSender createSender(Queue queue) throws JMSException {
return (QueueSender) createProducer(queue);
public SQSMessageProducer createSender(Queue queue) throws JMSException {
return createProducer(queue);
}

/**
Expand All @@ -261,7 +256,7 @@ public QueueSender createSender(Queue queue) throws JMSException {
* @throws JMSException If session is closed or internal error
*/
@Override
public BytesMessage createBytesMessage() throws JMSException {
public SQSBytesMessage createBytesMessage() throws JMSException {
checkClosed();
return new SQSBytesMessage();
}
Expand All @@ -283,7 +278,7 @@ public Message createMessage() throws JMSException {
* @throws JMSException If session is closed or internal error
*/
@Override
public ObjectMessage createObjectMessage() throws JMSException {
public SQSObjectMessage createObjectMessage() throws JMSException {
checkClosed();
return new SQSObjectMessage();
}
Expand All @@ -296,7 +291,7 @@ public ObjectMessage createObjectMessage() throws JMSException {
* @throws JMSException If session is closed or internal error
*/
@Override
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
public SQSObjectMessage createObjectMessage(Serializable object) throws JMSException {
checkClosed();
return new SQSObjectMessage(object);
}
Expand All @@ -308,7 +303,7 @@ public ObjectMessage createObjectMessage(Serializable object) throws JMSExceptio
* @throws JMSException If session is closed or internal error
*/
@Override
public TextMessage createTextMessage() throws JMSException {
public SQSTextMessage createTextMessage() throws JMSException {
checkClosed();
return new SQSTextMessage();
}
Expand All @@ -321,7 +316,7 @@ public TextMessage createTextMessage() throws JMSException {
* @throws JMSException If session is closed or internal error
*/
@Override
public TextMessage createTextMessage(String text) throws JMSException {
public SQSTextMessage createTextMessage(String text) throws JMSException {
checkClosed();
return new SQSTextMessage(text);
}
Expand Down Expand Up @@ -469,7 +464,7 @@ public void recover() throws JMSException {
Map<String, Set<String>> queueToGroupsMapping = getAffectedGroupsPerQueueUrl(unAckedMessages);

for (SQSMessageConsumer consumer : this.messageConsumers) {
SQSQueueDestination sqsQueue = (SQSQueueDestination) consumer.getQueue();
SQSQueueDestination sqsQueue = consumer.getQueue();
Set<String> affectedGroups = queueToGroupsMapping.get(sqsQueue.getQueueUrl());
if (affectedGroups != null) {
unAckedMessages.addAll(consumer.purgePrefetchedMessagesWithGroups(affectedGroups));
Expand Down Expand Up @@ -509,7 +504,7 @@ public void run() {
* @throws JMSException If session is closed or queue destination is not used
*/
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
public SQSMessageProducer createProducer(Destination destination) throws JMSException {
checkClosed();
if (destination != null && !(destination instanceof SQSQueueDestination)) {
throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination");
Expand All @@ -532,7 +527,7 @@ public MessageProducer createProducer(Destination destination) throws JMSExcepti
* @throws JMSException If session is closed or queue destination is not used
*/
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
public SQSMessageConsumer createConsumer(Destination destination) throws JMSException {
checkClosed();
if (!(destination instanceof SQSQueueDestination sqsQueueDestination)) {
throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination");
Expand Down Expand Up @@ -568,7 +563,7 @@ SQSMessageConsumer createSQSMessageConsumer(SQSQueueDestination destination) {
*/

@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
public SQSMessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
if (messageSelector != null) {
throw new JMSException("SQSSession does not support MessageSelector. This should be null.");
}
Expand All @@ -587,7 +582,7 @@ public MessageConsumer createConsumer(Destination destination, String messageSel
* @throws JMSException If session is closed or queue destination is not used
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
public SQSMessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
if (messageSelector != null) {
throw new JMSException("SQSSession does not support MessageSelector. This should be null.");
}
Expand All @@ -613,7 +608,7 @@ public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscripti
* @throws JMSException If session is closed or invalid queue is provided
*/
@Override
public Queue createQueue(String queueName) throws JMSException {
public SQSQueueDestination createQueue(String queueName) throws JMSException {
checkClosed();
return new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName).queueUrl());
}
Expand All @@ -628,7 +623,7 @@ public Queue createQueue(String queueName) throws JMSException {
* @return a queue destination
* @throws JMSException If session is closed or invalid queue is provided
*/
public Queue createQueue(String queueName, String ownerAccountId) throws JMSException {
public SQSQueueDestination createQueue(String queueName, String ownerAccountId) throws JMSException {
checkClosed();
return new SQSQueueDestination(
queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl());
Expand Down
Loading