diff --git a/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java index 5f12e24..327538b 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java +++ b/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,9 +17,12 @@ public class ProviderConfiguration { private int numberOfMessagesToPrefetch; + private boolean cacheQueues; + public ProviderConfiguration() { // Set default numberOfMessagesToPrefetch to MIN_BATCH. - this.numberOfMessagesToPrefetch = SQSMessagingClientConstants.MIN_BATCH; + this.numberOfMessagesToPrefetch = SQSMessagingClientConstants.MIN_BATCH; + this.cacheQueues = false; } public int getNumberOfMessagesToPrefetch() { @@ -38,4 +41,17 @@ public ProviderConfiguration withNumberOfMessagesToPrefetch(int numberOfMessages return this; } + public boolean isCacheQueues() { + return this.cacheQueues; + } + + public void setCacheQueues(boolean cacheQueues) { + this.cacheQueues = cacheQueues; + } + + public ProviderConfiguration withCacheQueues(boolean cacheQueues) { + setCacheQueues(cacheQueues); + return this; + } + } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java index da4dd14..746b3c3 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -92,6 +92,14 @@ public class SQSConnection implements Connection, QueueConnection { * but it will make multiple calls as necessary. */ private final int numberOfMessagesToPrefetch; + + /** + * Configures sessions to cache queues created JMS Queue objects. + * @see SQSSession#createQueue(String) + * @see SQSSession#createQueue(String, String) + */ + private final boolean cacheQueues; + private volatile boolean closed = false; private volatile boolean closing = false; @@ -106,10 +114,11 @@ public class SQSConnection implements Connection, QueueConnection { private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap<>()); - SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) { + SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch, + boolean cacheQueues) { amazonSQSClient = amazonSQSClientJMSWrapper; this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; - + this.cacheQueues = cacheQueues; } /** @@ -184,11 +193,11 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS SQSSession sqsSession; if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) { - sqsSession = new SQSSession(this, AcknowledgeMode.ACK_AUTO.withOriginalAcknowledgeMode(acknowledgeMode)); + sqsSession = new SQSSession(this, AcknowledgeMode.ACK_AUTO.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues); } else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) { - sqsSession = new SQSSession(this, AcknowledgeMode.ACK_RANGE.withOriginalAcknowledgeMode(acknowledgeMode)); + sqsSession = new SQSSession(this, AcknowledgeMode.ACK_RANGE.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues); } else if (acknowledgeMode == SQSSession.UNORDERED_ACKNOWLEDGE) { - sqsSession = new SQSSession(this, AcknowledgeMode.ACK_UNORDERED.withOriginalAcknowledgeMode(acknowledgeMode)); + sqsSession = new SQSSession(this, AcknowledgeMode.ACK_UNORDERED.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues); } else { LOG.error("Unrecognized acknowledgeMode. Cannot create Session."); throw new JMSException("Unrecognized acknowledgeMode. Cannot create Session."); diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java index 99c3add..52e1abc 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -152,7 +152,8 @@ public SQSConnection createConnection(AwsCredentialsProvider awsCredentialsProvi private SQSConnection createConnection(SqsClient amazonSQS, AwsCredentialsProvider awsCredentialsProvider) throws JMSException { AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = new AmazonSQSMessagingClientWrapper(amazonSQS, awsCredentialsProvider); - return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch()); + return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch(), + providerConfiguration.isCacheQueues()); } @Override diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java index ed81646..ebc3994 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -181,15 +181,18 @@ public class SQSSession implements Session, QueueSession { */ private SQSMessageConsumer activeConsumerInCallback = null; - SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode) throws JMSException { - this(parentSQSConnection, acknowledgeMode, - Collections.newSetFromMap(new ConcurrentHashMap<>()), - Collections.newSetFromMap(new ConcurrentHashMap<>())); + private final boolean cacheQueues; + + private final Map queueCache = new ConcurrentHashMap<>(); + + SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode, boolean cacheQueues) throws JMSException { + this(parentSQSConnection, acknowledgeMode, Collections.newSetFromMap(new ConcurrentHashMap<>()), + Collections.newSetFromMap(new ConcurrentHashMap<>()), cacheQueues); } SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode, - Set messageConsumers, - Set messageProducers) throws JMSException { + Set messageConsumers, Set messageProducers, boolean cacheQueues) + throws JMSException { this.parentSQSConnection = parentSQSConnection; this.amazonSQSClient = parentSQSConnection.getWrappedAmazonSQSClient(); this.acknowledgeMode = acknowledgeMode; @@ -199,6 +202,7 @@ public class SQSSession implements Session, QueueSession { this.executor = Executors.newSingleThreadExecutor(SESSION_THREAD_FACTORY); this.messageConsumers = messageConsumers; this.messageProducers = messageProducers; + this.cacheQueues = cacheQueues; executor.execute(sqsSessionRunnable); } @@ -614,8 +618,7 @@ public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscripti */ @Override public Queue createQueue(String queueName) throws JMSException { - checkClosed(); - return new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName).queueUrl()); + return createQueue(queueName, null); } /** @@ -630,8 +633,18 @@ public Queue createQueue(String queueName) throws JMSException { */ public Queue createQueue(String queueName, String ownerAccountId) throws JMSException { checkClosed(); - return new SQSQueueDestination( - queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl()); + SQSQueueDestination queue = null; + String cacheKey = (ownerAccountId != null) ? (ownerAccountId + "/" + queueName) : queueName; + if (this.cacheQueues) { + queue = this.queueCache.get(cacheKey); + } + if (queue == null) { + queue = new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl()); + } + if (this.cacheQueues) { + this.queueCache.put(cacheKey, queue); + } + return queue; } /** diff --git a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java index 045e776..d99d135 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -84,8 +84,8 @@ public void Setup() throws JMSException { SQSQueueDestination sqsDestination = new SQSQueueDestination(QUEUE_NAME, QUEUE_URL); amazonSQSClient = mock(AmazonSQSMessagingClientWrapper.class); - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); - session = new SQSSession(connection, AcknowledgeMode.ACK_AUTO); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); + session = new SQSSession(connection, AcknowledgeMode.ACK_AUTO, false); SQSSessionCallbackScheduler sqsSessionRunnable = new SQSSessionCallbackScheduler(session, AcknowledgeMode.ACK_AUTO, acknowledger, negativeAcknowledger); @@ -125,7 +125,7 @@ public void verify() { // Test session close operation with create producer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, closeSessionOperation); @@ -134,7 +134,7 @@ public void verify() { // Test session close operation with create consumer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, closeSessionOperation); @@ -169,7 +169,7 @@ public void verify() { // Test connection start operation with create producer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, startConnectionOperation); @@ -178,7 +178,7 @@ public void verify() { // Test connection start operation with create consumer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, startConnectionOperation); @@ -211,7 +211,7 @@ public void verify() { // Test connection close operation with create producer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, closeConnectionOperation); @@ -219,7 +219,7 @@ public void verify() { // Test connection close operation with create consumer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, closeConnectionOperation); @@ -254,7 +254,7 @@ public void verify() { // Test connection stop operation with create producer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesProducer, stopConnectionOperation); @@ -263,7 +263,7 @@ public void verify() { // Test connection stop operation with create consumer operation for (int i = 0; i < 10; ++i) { - connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH); + connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false); session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); testConcurrentExecution(msgListenerCreatesConsumer, stopConnectionOperation); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java index c2a7b43..e9ecd68 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ public void setup() throws JMSException { int numberOfMessagesToPrefetch = 10; AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class); - sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch)); + sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch, false)); session1 = mock(SQSSession.class); session2 = mock(SQSSession.class); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerTest.java index bbda3b7..bcf689a 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ public class SQSMessageConsumerTest { public void setup() throws JMSException { sqsConnection = mock(SQSConnection.class); - sqsSession = spy(new SQSSession(sqsConnection, AcknowledgeMode.ACK_AUTO));//mock(SQSSession.class); + sqsSession = spy(new SQSSession(sqsConnection, AcknowledgeMode.ACK_AUTO, false));//mock(SQSSession.class); sqsSessionRunnable = mock(SQSSessionCallbackScheduler.class); acknowledger = mock(Acknowledger.class); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java index 5dc4b51..ad28948 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -105,7 +105,7 @@ public void setup() throws JMSException { messageProducers = new HashSet<>(Set.of(producer1, producer2)); sqsSession = spy(new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, - messageConsumers, messageProducers)); + messageConsumers, messageProducers, false)); } /** @@ -116,7 +116,7 @@ public void testStopNoOpIfAlreadyClosed() throws JMSException { /* * Set up session */ - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers, false); sqsSession.close(); /* @@ -189,7 +189,7 @@ public void testStartNoOpIfAlreadyClosed() throws JMSException { /* * Set up session */ - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers, false); sqsSession.close(); SQSMessageConsumer consumer1 = mock(SQSMessageConsumer.class); SQSMessageConsumer consumer2 = mock(SQSMessageConsumer.class); @@ -397,7 +397,7 @@ public void testWaitForAllCallbackCompleteBlocksOnStateLock() throws Interrupted final CountDownLatch beforeSessionWaitCall = new CountDownLatch(1); final CountDownLatch passedSessionWaitCall = new CountDownLatch(1); - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers, false); sqsSession.start(); PrefetchManager prefetchManager = new PrefetchManager() { @@ -456,7 +456,7 @@ public void testWaitForAllCallbackComplete() throws InterruptedException, JMSExc /* * Set up session and mocks */ - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers, false); sqsSession.start(); sqsSession.startingCallback(consumer1); final CountDownLatch beforeWaitCall = new CountDownLatch(1); @@ -653,7 +653,7 @@ public void testCreateQueueWhenAlreadyClosed() { @Test public void testCreateQueue() throws JMSException { GetQueueUrlResponse result = GetQueueUrlResponse.builder().queueUrl(QUEUE_URL).build(); - when(sqsClientJMSWrapper.getQueueUrl(QUEUE_NAME)) + when(sqsClientJMSWrapper.getQueueUrl(QUEUE_NAME, null)) .thenReturn(result); /* @@ -904,10 +904,10 @@ public void testCreateProducer() throws JMSException { */ @Test public void testRecover() throws JMSException, InterruptedException { - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_UNORDERED); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_UNORDERED, false); when(parentSQSConnection.getNumberOfMessagesToPrefetch()).thenReturn(4); - when(sqsClientJMSWrapper.getQueueUrl("queue1")) + when(sqsClientJMSWrapper.getQueueUrl("queue1", null)) .thenReturn(GetQueueUrlResponse.builder().queueUrl("queueUrl1").build()); when(sqsClientJMSWrapper.receiveMessage(argThat(new ReceiveRequestMatcher("queueUrl1")))) .thenReturn(ReceiveMessageResponse.builder().messages(createFifoMessage("group1", "message1", "queue1-group1-message1")).build()) @@ -918,7 +918,7 @@ public void testRecover() throws JMSException, InterruptedException { .thenReturn(ReceiveMessageResponse.builder().messages(createFifoMessage("group3", "message6", "queue1-group3-message6")).build()) .thenReturn(ReceiveMessageResponse.builder().build()); - when(sqsClientJMSWrapper.getQueueUrl("queue2")) + when(sqsClientJMSWrapper.getQueueUrl("queue2", null)) .thenReturn(GetQueueUrlResponse.builder().queueUrl("queueUrl2").build()); when(sqsClientJMSWrapper.receiveMessage(argThat(new ReceiveRequestMatcher("queueUrl2")))) .thenReturn(ReceiveMessageResponse.builder().messages(createFifoMessage("group1", "message1", "queue2-group1-message1")).build()) @@ -1109,7 +1109,7 @@ public void testDoCloseWhenClosing() throws InterruptedException { */ @Test public void testDoClose() throws JMSException { - sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers); + sqsSession = new SQSSession(parentSQSConnection, AcknowledgeMode.ACK_AUTO, messageConsumers, messageProducers, false); /* * Do close */