diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java index da4dd14..81d50f1 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import jakarta.jms.JMSException; import jakarta.jms.Queue; import jakarta.jms.QueueConnection; -import jakarta.jms.QueueSession; import jakarta.jms.ServerSessionPool; import jakarta.jms.Session; import jakarta.jms.Topic; @@ -155,8 +154,8 @@ int getNumberOfMessagesToPrefetch() { * transaction and acknowledge mode. */ @Override - public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { - return (QueueSession) createSession(transacted, acknowledgeMode); + public SQSSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { + return createSession(transacted, acknowledgeMode); } /** @@ -176,7 +175,7 @@ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) * transaction and acknowledge mode. */ @Override - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + public SQSSession createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosed(); actionOnConnectionTaken = true; if (transacted || acknowledgeMode == Session.SESSION_TRANSACTED) @@ -216,12 +215,12 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS } @Override - public Session createSession(int sessionMode) throws JMSException { + public SQSSession createSession(int sessionMode) throws JMSException { return createSession(false, sessionMode); } @Override - public Session createSession() throws JMSException { + public SQSSession createSession() throws JMSException { throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java index 99c3add..8324d7e 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import jakarta.jms.JMSContext; import jakarta.jms.JMSException; import jakarta.jms.JMSRuntimeException; -import jakarta.jms.QueueConnection; import jakarta.jms.QueueConnectionFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; @@ -156,12 +155,12 @@ private SQSConnection createConnection(SqsClient amazonSQS, AwsCredentialsProvid } @Override - public QueueConnection createQueueConnection() throws JMSException { + public SQSConnection createQueueConnection() throws JMSException { return createConnection(); } @Override - public QueueConnection createQueueConnection(String userName, String password) throws JMSException { + public SQSConnection createQueueConnection(String userName, String password) throws JMSException { return createConnection(userName, password); } } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java index cfbb897..d930e9c 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,12 +17,12 @@ import com.amazon.sqs.javamessaging.acknowledge.Acknowledger; import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger; import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier; +import com.amazon.sqs.javamessaging.message.SQSMessage; import jakarta.jms.IllegalStateException; import jakarta.jms.JMSException; import jakarta.jms.Message; import jakarta.jms.MessageConsumer; import jakarta.jms.MessageListener; -import jakarta.jms.Queue; import jakarta.jms.QueueReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +104,7 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver { * @return a queue destination */ @Override - public Queue getQueue() throws JMSException { + public SQSQueueDestination getQueue() throws JMSException { return sqsDestination; } @@ -143,7 +143,7 @@ public void setMessageListener(MessageListener listener) throws JMSException { * On internal error */ @Override - public Message receive() throws JMSException { + public SQSMessage receive() throws JMSException { checkClosed(); return sqsMessageConsumerPrefetch.receive(); } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java index 7ea5258..24a400c 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -273,7 +273,7 @@ protected void processReceivedMessages(List messages) { List messageManagers = new ArrayList<>(); for (Message message : messages) { try { - jakarta.jms.Message jmsMessage = convertToJMSMessage(message); + SQSMessage jmsMessage = convertToJMSMessage(message); messageManagers.add(new MessageManager(this, jmsMessage)); } catch (JMSException e) { LOG.warn("Caught exception while converting received messages", e); @@ -337,10 +337,10 @@ protected void waitForPrefetch() throws InterruptedException { * @return Converted JMS message * @throws JMSException */ - protected jakarta.jms.Message convertToJMSMessage(Message message) throws JMSException { + protected SQSMessage convertToJMSMessage(Message message) throws JMSException { MessageAttributeValue messageTypeAttribute = message.messageAttributes().get( SQSMessage.JMS_SQS_MESSAGE_TYPE); - jakarta.jms.Message jmsMessage; + SQSMessage jmsMessage; if (messageTypeAttribute == null) { jmsMessage = new SQSTextMessage(acknowledger, queueUrl, message); } else { @@ -457,15 +457,15 @@ private void unrequestMessage() { } } - public record MessageManager(PrefetchManager prefetchManager, jakarta.jms.Message message) { + public record MessageManager(PrefetchManager prefetchManager, SQSMessage message) { } - jakarta.jms.Message receive() throws JMSException { + SQSMessage receive() throws JMSException { return receive(0); } - jakarta.jms.Message receive(long timeout) throws JMSException { + SQSMessage receive(long timeout) throws JMSException { if (cannotDeliver()) { return null; } @@ -516,7 +516,7 @@ protected void notifyStateChange() { } } - jakarta.jms.Message receiveNoWait() throws JMSException { + SQSMessage receiveNoWait() throws JMSException { if (cannotDeliver()) { return null; } @@ -571,11 +571,11 @@ void close() { /** * Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge */ - private jakarta.jms.Message messageHandler(MessageManager messageManager) throws JMSException { + private SQSMessage messageHandler(MessageManager messageManager) throws JMSException { if (messageManager == null) { return null; } - jakarta.jms.Message message = messageManager.message(); + SQSMessage message = messageManager.message(); // Notify PrefetchThread that message is dispatched this.messageDispatched(); @@ -616,7 +616,7 @@ List purgePrefetchedMessagesWithGroups(Set affecte Iterator managerIterator = messageQueue.iterator(); while (managerIterator.hasNext()) { MessageManager messageManager = managerIterator.next(); - SQSMessage prefetchedMessage = (SQSMessage)messageManager.message(); + SQSMessage prefetchedMessage = messageManager.message(); SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(prefetchedMessage); //is the prefetch entry for one of the affected group ids? diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java index b192099..224f4c9 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -165,7 +165,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep } @Override - public Queue getQueue() throws JMSException { + public SQSQueueDestination getQueue() throws JMSException { return sqsDestination; } @@ -307,7 +307,7 @@ public void send(Queue queue, Message message, int deliveryMode, int priority, l * @return this producer's queue destination */ @Override - public Destination getDestination() throws JMSException { + public SQSQueueDestination getDestination() throws JMSException { return sqsDestination; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java index ed81646..b385c46 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import com.amazon.sqs.javamessaging.message.SQSObjectMessage; import com.amazon.sqs.javamessaging.message.SQSTextMessage; import com.amazon.sqs.javamessaging.util.SQSMessagingClientThreadFactory; -import jakarta.jms.BytesMessage; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; import jakarta.jms.JMSException; @@ -32,17 +31,13 @@ import jakarta.jms.MessageConsumer; import jakarta.jms.MessageListener; import jakarta.jms.MessageProducer; -import jakarta.jms.ObjectMessage; import jakarta.jms.Queue; import jakarta.jms.QueueBrowser; -import jakarta.jms.QueueReceiver; -import jakarta.jms.QueueSender; import jakarta.jms.QueueSession; import jakarta.jms.Session; import jakarta.jms.StreamMessage; import jakarta.jms.TemporaryQueue; import jakarta.jms.TemporaryTopic; -import jakarta.jms.TextMessage; import jakarta.jms.Topic; import jakarta.jms.TopicSubscriber; import org.slf4j.Logger; @@ -224,8 +219,8 @@ boolean isActiveCallbackSessionThread() { * @throws JMSException If session is closed */ @Override - public QueueReceiver createReceiver(Queue queue) throws JMSException { - return (QueueReceiver) createConsumer(queue); + public SQSMessageConsumer createReceiver(Queue queue) throws JMSException { + return createConsumer(queue); } /** @@ -238,7 +233,7 @@ public QueueReceiver createReceiver(Queue queue) throws JMSException { * @throws JMSException If session is closed */ @Override - public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { + public SQSMessageConsumer createReceiver(Queue queue, String messageSelector) throws JMSException { return createReceiver(queue); } @@ -250,8 +245,8 @@ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws * @throws JMSException If session is closed */ @Override - public QueueSender createSender(Queue queue) throws JMSException { - return (QueueSender) createProducer(queue); + public SQSMessageProducer createSender(Queue queue) throws JMSException { + return createProducer(queue); } /** @@ -261,7 +256,7 @@ public QueueSender createSender(Queue queue) throws JMSException { * @throws JMSException If session is closed or internal error */ @Override - public BytesMessage createBytesMessage() throws JMSException { + public SQSBytesMessage createBytesMessage() throws JMSException { checkClosed(); return new SQSBytesMessage(); } @@ -283,7 +278,7 @@ public Message createMessage() throws JMSException { * @throws JMSException If session is closed or internal error */ @Override - public ObjectMessage createObjectMessage() throws JMSException { + public SQSObjectMessage createObjectMessage() throws JMSException { checkClosed(); return new SQSObjectMessage(); } @@ -296,7 +291,7 @@ public ObjectMessage createObjectMessage() throws JMSException { * @throws JMSException If session is closed or internal error */ @Override - public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + public SQSObjectMessage createObjectMessage(Serializable object) throws JMSException { checkClosed(); return new SQSObjectMessage(object); } @@ -308,7 +303,7 @@ public ObjectMessage createObjectMessage(Serializable object) throws JMSExceptio * @throws JMSException If session is closed or internal error */ @Override - public TextMessage createTextMessage() throws JMSException { + public SQSTextMessage createTextMessage() throws JMSException { checkClosed(); return new SQSTextMessage(); } @@ -321,7 +316,7 @@ public TextMessage createTextMessage() throws JMSException { * @throws JMSException If session is closed or internal error */ @Override - public TextMessage createTextMessage(String text) throws JMSException { + public SQSTextMessage createTextMessage(String text) throws JMSException { checkClosed(); return new SQSTextMessage(text); } @@ -469,7 +464,7 @@ public void recover() throws JMSException { Map> queueToGroupsMapping = getAffectedGroupsPerQueueUrl(unAckedMessages); for (SQSMessageConsumer consumer : this.messageConsumers) { - SQSQueueDestination sqsQueue = (SQSQueueDestination) consumer.getQueue(); + SQSQueueDestination sqsQueue = consumer.getQueue(); Set affectedGroups = queueToGroupsMapping.get(sqsQueue.getQueueUrl()); if (affectedGroups != null) { unAckedMessages.addAll(consumer.purgePrefetchedMessagesWithGroups(affectedGroups)); @@ -509,7 +504,7 @@ public void run() { * @throws JMSException If session is closed or queue destination is not used */ @Override - public MessageProducer createProducer(Destination destination) throws JMSException { + public SQSMessageProducer createProducer(Destination destination) throws JMSException { checkClosed(); if (destination != null && !(destination instanceof SQSQueueDestination)) { throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination"); @@ -532,7 +527,7 @@ public MessageProducer createProducer(Destination destination) throws JMSExcepti * @throws JMSException If session is closed or queue destination is not used */ @Override - public MessageConsumer createConsumer(Destination destination) throws JMSException { + public SQSMessageConsumer createConsumer(Destination destination) throws JMSException { checkClosed(); if (!(destination instanceof SQSQueueDestination sqsQueueDestination)) { throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination"); @@ -568,7 +563,7 @@ SQSMessageConsumer createSQSMessageConsumer(SQSQueueDestination destination) { */ @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + public SQSMessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { if (messageSelector != null) { throw new JMSException("SQSSession does not support MessageSelector. This should be null."); } @@ -587,7 +582,7 @@ public MessageConsumer createConsumer(Destination destination, String messageSel * @throws JMSException If session is closed or queue destination is not used */ @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { + public SQSMessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { if (messageSelector != null) { throw new JMSException("SQSSession does not support MessageSelector. This should be null."); } @@ -613,7 +608,7 @@ public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscripti * @throws JMSException If session is closed or invalid queue is provided */ @Override - public Queue createQueue(String queueName) throws JMSException { + public SQSQueueDestination createQueue(String queueName) throws JMSException { checkClosed(); return new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName).queueUrl()); } @@ -628,7 +623,7 @@ public Queue createQueue(String queueName) throws JMSException { * @return a queue destination * @throws JMSException If session is closed or invalid queue is provided */ - public Queue createQueue(String queueName, String ownerAccountId) throws JMSException { + public SQSQueueDestination createQueue(String queueName, String ownerAccountId) throws JMSException { checkClosed(); return new SQSQueueDestination( queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl()); diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java index cbb3848..e4971fc 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -111,7 +111,7 @@ public void run() { MessageListener messageListener = callbackEntry.messageListener(); MessageManager messageManager = callbackEntry.messageManager(); - SQSMessage message = (SQSMessage) messageManager.message(); + SQSMessage message = messageManager.message(); SQSMessageConsumer messageConsumer = messageManager.prefetchManager().getMessageConsumer(); if (messageConsumer.isClosed()) { nackReceivedMessage(message); @@ -186,7 +186,7 @@ public void run() { } } finally { if (callbackEntry != null) { - nackReceivedMessage((SQSMessage) callbackEntry.messageManager().message()); + nackReceivedMessage(callbackEntry.messageManager().message()); } nackQueuedMessages(); } @@ -214,7 +214,7 @@ void nackQueuedMessages() { try { List nackMessageIdentifiers = new ArrayList<>(); while (!callbackQueue.isEmpty()) { - SQSMessage nackMessage = (SQSMessage) callbackQueue.pollFirst().messageManager().message(); + SQSMessage nackMessage = callbackQueue.pollFirst().messageManager().message(); nackMessageIdentifiers.add(SQSMessageIdentifier.fromSQSMessage(nackMessage)); } @@ -254,7 +254,7 @@ List purgeScheduledCallbacksForQueuesAndGroups(Map callbackIterator = callbackQueue.iterator(); while (callbackIterator.hasNext()) { CallbackEntry callbackEntry = callbackIterator.next(); - SQSMessageIdentifier pendingCallbackIdentifier = SQSMessageIdentifier.fromSQSMessage((SQSMessage) callbackEntry.messageManager().message()); + SQSMessageIdentifier pendingCallbackIdentifier = SQSMessageIdentifier.fromSQSMessage(callbackEntry.messageManager().message()); //is the callback entry for one of the affected queues? Set affectedGroupsInQueue = queueToGroupsMapping.get(pendingCallbackIdentifier.getQueueUrl()); diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java index 4b4b581..c9a168c 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager; import com.amazon.sqs.javamessaging.SQSMessagingClientConstants; -import com.amazon.sqs.javamessaging.message.SQSMessage; import jakarta.jms.JMSException; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; @@ -59,7 +58,7 @@ public NegativeAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient) { public void bulkAction(ArrayDeque messageQueue, String queueUrl) throws JMSException { List receiptHandles = new ArrayList<>(); while (!messageQueue.isEmpty()) { - receiptHandles.add(((SQSMessage) (messageQueue.pollFirst().message())).getReceiptHandle()); + receiptHandles.add(messageQueue.pollFirst().message().getReceiptHandle()); // If there is more than 10 stop can call action if (receiptHandles.size() == SQSMessagingClientConstants.MAX_BATCH) { diff --git a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java index 045e776..b36b013 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -126,7 +126,7 @@ public void verify() { // Test session close operation with create producer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, closeSessionOperation); connection.close(); @@ -135,7 +135,7 @@ public void verify() { // Test session close operation with create consumer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, closeSessionOperation); connection.close(); @@ -170,7 +170,7 @@ public void verify() { // Test connection start operation with create producer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, startConnectionOperation); connection.close(); @@ -179,7 +179,7 @@ public void verify() { // Test connection start operation with create consumer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, startConnectionOperation); connection.close(); @@ -212,7 +212,7 @@ public void verify() { // Test connection close operation with create producer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, closeConnectionOperation); } @@ -220,7 +220,7 @@ public void verify() { // Test connection close operation with create consumer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, closeConnectionOperation); } @@ -255,7 +255,7 @@ public void verify() { // Test connection stop operation with create producer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, stopConnectionOperation); connection.close(); @@ -264,7 +264,7 @@ public void verify() { // Test connection stop operation with create consumer operation for (int i = 0; i < 10; ++i) { connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, stopConnectionOperation); connection.close(); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java index c2a7b43..be92dc5 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -667,25 +667,25 @@ public void testCreateSessionUnknownAckMode() { public void testCreateSessionWhenConnectionRunning() throws JMSException { sqsConnection.setRunning(true); - SQSSession session = (SQSSession) sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + SQSSession session = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertEquals(Session.AUTO_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertTrue(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + session = sqsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); assertEquals(Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertTrue(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + session = sqsConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); assertEquals(Session.DUPS_OK_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertTrue(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); + session = sqsConnection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); assertTrue(session.isRunning()); assertEquals(SQSSession.UNORDERED_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); @@ -705,25 +705,25 @@ public void testCreateSessionWhenConnectionRunning() throws JMSException { public void testCreateSessionWhenConnectionStopped() throws JMSException { sqsConnection.setRunning(false); - SQSSession session = (SQSSession) sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + SQSSession session = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertEquals(Session.AUTO_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertFalse(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + session = sqsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); assertEquals(Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertFalse(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + session = sqsConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); assertEquals(Session.DUPS_OK_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); assertTrue(sqsConnection.getSessions().contains(session)); assertFalse(session.isRunning()); - session = (SQSSession) sqsConnection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); + session = sqsConnection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); assertFalse(session.isRunning()); assertEquals(SQSSession.UNORDERED_ACKNOWLEDGE, session.getAcknowledgeMode()); assertEquals(sqsConnection, session.getParentConnection()); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java index a6d079d..ff48f63 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -150,7 +150,7 @@ public boolean matches(Object argument) { int index = 0; for (SQSMessageConsumerPrefetch.MessageManager messageManager : consumerPrefetch.messageQueue) { Message mockedMessage = messages.get(index); - SQSMessage sqsMessage = (SQSMessage) messageManager.message(); + SQSMessage sqsMessage = messageManager.message(); assertEquals(mockedMessage.receiptHandle(), sqsMessage.getReceiptHandle(), "Receipt handle is the same"); assertEquals( @@ -193,7 +193,7 @@ public void testConvertToJMSMessageNoTypeAttribute(int numberOfMessagesToPrefetc /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jmsMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jmsMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -234,7 +234,7 @@ public void testConvertToJMSMessageByteTypeAttribute(int numberOfMessagesToPrefe /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jmsMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jmsMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -315,7 +315,7 @@ public void testConvertToJMSMessageObjectTypeAttribute(int numberOfMessagesToPre /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jmsMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jmsMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -389,7 +389,7 @@ public void testConvertToJMSMessageTextTypeAttribute(int numberOfMessagesToPrefe /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jmsMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jmsMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java index 4d52c70..4654f19 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -133,7 +133,7 @@ public void testEndToEnd(int numberOfMessagesToPrefetch) throws InterruptedExcep //after we return 'isClosed() == true' we will empty the prefetch queue while nacking messages assertEquals(numMessages, consumerPrefetch.messageQueue.size()); for (MessageManager messageManager : consumerPrefetch.messageQueue) { - SQSMessage sqsMessage = (SQSMessage) messageManager.message(); + SQSMessage sqsMessage = messageManager.message(); assertTrue(receipt.contains(sqsMessage.getReceiptHandle())); } @@ -169,7 +169,7 @@ public void testEndToEnd(int numberOfMessagesToPrefetch) throws InterruptedExcep // Ensure message queue was filled with expected messages assertEquals(numMessages, consumerPrefetch.messageQueue.size()); for (SQSMessageConsumerPrefetch.MessageManager messageManager : consumerPrefetch.messageQueue) { - SQSMessage sqsMessage = (SQSMessage) messageManager.message(); + SQSMessage sqsMessage = messageManager.message(); assertTrue(receipt.contains(sqsMessage.getReceiptHandle())); } } @@ -484,8 +484,8 @@ public void testSetNullMessageListener(int numberOfMessagesToPrefetch) { @MethodSource("prefetchParameters") public void testSetMessageListener(int numberOfMessagesToPrefetch) { init(numberOfMessagesToPrefetch); - SQSMessageConsumerPrefetch.MessageManager msgManager1 = new SQSMessageConsumerPrefetch.MessageManager(null, mock(jakarta.jms.Message.class)); - SQSMessageConsumerPrefetch.MessageManager msgManager2 = new SQSMessageConsumerPrefetch.MessageManager(null, mock(jakarta.jms.Message.class)); + SQSMessageConsumerPrefetch.MessageManager msgManager1 = new SQSMessageConsumerPrefetch.MessageManager(null, mock(SQSMessage.class)); + SQSMessageConsumerPrefetch.MessageManager msgManager2 = new SQSMessageConsumerPrefetch.MessageManager(null, mock(SQSMessage.class)); consumerPrefetch.messageQueue.add(msgManager1); consumerPrefetch.messageQueue.add(msgManager2); @@ -798,7 +798,7 @@ public void testConvertToJMSMessageNoTypeAttribute(int numberOfMessagesToPrefetc /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jsmMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jsmMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -839,7 +839,7 @@ public void testConvertToJMSMessageByteTypeAttribute(int numberOfMessagesToPrefe /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jsmMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jsmMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -928,7 +928,7 @@ public void testConvertToJMSMessageObjectTypeAttribute(int numberOfMessagesToPre /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jsmMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jsmMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -1009,7 +1009,7 @@ public void testConvertToJMSMessageTextTypeAttribute(int numberOfMessagesToPrefe /* * Convert the SQS message to JMS Message */ - jakarta.jms.Message jsmMessage = consumerPrefetch.convertToJMSMessage(message); + SQSMessage jsmMessage = consumerPrefetch.convertToJMSMessage(message); /* * Verify results @@ -1082,7 +1082,7 @@ public void testReceiveMessagePrefetch(int numberOfMessagesToPrefetch) throws JM /* * Call receive messages */ - SQSMessage msg = (SQSMessage) consumerPrefetch.receive(); + SQSMessage msg = consumerPrefetch.receive(); /* * Verify results @@ -1148,7 +1148,7 @@ public void testReceiveMessageEmptyThenClosed(int numberOfMessagesToPrefetch) th executorService.execute(() -> { try { beforeReceiveCall.countDown(); - jakarta.jms.Message msg = consumerPrefetch.receive(0); + SQSMessage msg = consumerPrefetch.receive(0); if (msg == null) { noMessageReturned.set(true); } @@ -1198,7 +1198,7 @@ public void testReceiveMessageEmptyThenAddMessage(int numberOfMessagesToPrefetch executorService.execute(() -> { try { beforeReceiveCall.countDown(); - SQSMessage msg = (SQSMessage) consumerPrefetch.receive(0); + SQSMessage msg = consumerPrefetch.receive(0); if ((msg != null) && (msg.getReceiptHandle().equals(receiptHandle))) { messageReceived.set(true); } @@ -1245,7 +1245,7 @@ public void testReceiveMessageTimeout(int numberOfMessagesToPrefetch) throws JMS /* * Call receive messages */ - SQSMessage msg = (SQSMessage) consumerPrefetch.receive(waitTime); + SQSMessage msg = consumerPrefetch.receive(waitTime); assertNull(msg); @@ -1321,7 +1321,7 @@ public void testProcessReceivedMessages(int numberOfMessagesToPrefetch) throws J while (!consumerPrefetch.messageQueue.isEmpty()) { SQSMessageConsumerPrefetch.MessageManager msgManager = consumerPrefetch.messageQueue.pollFirst(); - SQSMessage msg = (SQSMessage) msgManager.message(); + SQSMessage msg = msgManager.message(); assertTrue(receiptHandlers.contains(msg.getReceiptHandle())); } @@ -1810,10 +1810,10 @@ public void testRequestedMessageTracking(int numberOfMessagesToPrefetch) throws }).when(consumerPrefetch).requestMessage(); // Close the prefetcher immediately after completing one loop - final List> receivedMessageFutures = new ArrayList<>(); + final List> receivedMessageFutures = new ArrayList<>(); doAnswer((Answer) invocation -> { invocation.callRealMethod(); - for (Future messageFuture : receivedMessageFutures) { + for (Future messageFuture : receivedMessageFutures) { assertNotNull(messageFuture.get()); } consumerPrefetch.close(); @@ -1867,7 +1867,7 @@ private void addMessagesToQueue(List receiptHandlers) throws JMSExceptio .receiptHandle(receiptHandler) .attributes(mapAttributes) .build(); - jakarta.jms.Message m1 = consumerPrefetch.convertToJMSMessage(message); + SQSMessage m1 = consumerPrefetch.convertToJMSMessage(message); SQSMessageConsumerPrefetch.MessageManager msgManager = new SQSMessageConsumerPrefetch.MessageManager(null, m1); consumerPrefetch.messageQueue.add(msgManager); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java index ace019a..d2d1a56 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -256,7 +256,7 @@ public void internalTestPropertyToMessageAttribute(SQSMessage sqsText) throws JM */ @Test public void testSendInternalNonSQSMessage() { - jakarta.jms.Message msg = mock(jakarta.jms.Message.class); + SQSMessage msg = mock(SQSMessage.class); assertThatThrownBy(() -> producer.sendInternal(destination, msg)).isInstanceOf(JMSException.class); } diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionCallbackSchedulerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionCallbackSchedulerTest.java index 8ba1b94..2cee70b 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionCallbackSchedulerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionCallbackSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -552,7 +552,7 @@ public void testWhenListenerThrowsWhenAutoAckThenCallbackQueuePurgedFromMessages MessageListener messageListener = mock(MessageListener.class); doThrow(RuntimeException.class) - .when(messageListener).onMessage(any(jakarta.jms.Message.class)); + .when(messageListener).onMessage(any(SQSMessage.class)); List messages = List.of( createFifoMessageManager("queue1", "group1", "message1", "handle1"), diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java index 5dc4b51..b23900f 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import jakarta.jms.MessageConsumer; import jakarta.jms.MessageProducer; import jakarta.jms.ObjectMessage; -import jakarta.jms.Queue; import jakarta.jms.TextMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -659,14 +658,13 @@ public void testCreateQueue() throws JMSException { /* * Create queue */ - Queue queue = sqsSession.createQueue(QUEUE_NAME); + SQSQueueDestination queue = sqsSession.createQueue(QUEUE_NAME); /* * Verify results */ - assert (queue instanceof SQSQueueDestination); assertEquals(QUEUE_NAME, queue.getQueueName()); - assertEquals(QUEUE_URL, ((SQSQueueDestination) queue).getQueueUrl()); + assertEquals(QUEUE_URL, queue.getQueueUrl()); } /** @@ -681,14 +679,13 @@ public void testCreateQueueWithOwnerAccountId() throws JMSException { /* * Create queue */ - Queue queue = sqsSession.createQueue(QUEUE_NAME, OWNER_ACCOUNT_ID); + SQSQueueDestination queue = sqsSession.createQueue(QUEUE_NAME, OWNER_ACCOUNT_ID); /* * Verify results */ - assert (queue instanceof SQSQueueDestination); assertEquals(QUEUE_NAME, queue.getQueueName()); - assertEquals(QUEUE_URL, ((SQSQueueDestination) queue).getQueueUrl()); + assertEquals(QUEUE_URL, queue.getQueueUrl()); } /** diff --git a/src/test/java/com/amazon/sqs/javamessaging/message/SQSBytesMessageTest.java b/src/test/java/com/amazon/sqs/javamessaging/message/SQSBytesMessageTest.java index 7e73473..d96c914 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/message/SQSBytesMessageTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/message/SQSBytesMessageTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -68,7 +68,7 @@ public void setUp() { @Test public void testReadWrite() throws JMSException { when(mockSQSSession.createBytesMessage()).thenReturn(new SQSBytesMessage()); - SQSBytesMessage msg = (SQSBytesMessage) mockSQSSession.createBytesMessage(); + SQSBytesMessage msg = mockSQSSession.createBytesMessage(); byte[] byteArray = new byte[]{1, 0, 'a', 65}; byte byteData = 'a'; @@ -348,7 +348,7 @@ public void testNotWriteable() throws JMSException { @Test public void testReadable() throws JMSException { when(mockSQSSession.createBytesMessage()).thenReturn(new SQSBytesMessage()); - SQSBytesMessage msg = (SQSBytesMessage) mockSQSSession.createBytesMessage(); + SQSBytesMessage msg = mockSQSSession.createBytesMessage(); byte[] byteArray = new byte[]{'a', 0, 34, 65}; msg.writeBytes(byteArray);