Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,12 @@
public class ProviderConfiguration {
private int numberOfMessagesToPrefetch;

private boolean cacheQueues;

public ProviderConfiguration() {
// Set default numberOfMessagesToPrefetch to MIN_BATCH.
this.numberOfMessagesToPrefetch = SQSMessagingClientConstants.MIN_BATCH;
this.numberOfMessagesToPrefetch = SQSMessagingClientConstants.MIN_BATCH;
this.cacheQueues = false;
}

public int getNumberOfMessagesToPrefetch() {
Expand All @@ -38,4 +41,17 @@ public ProviderConfiguration withNumberOfMessagesToPrefetch(int numberOfMessages
return this;
}

public boolean isCacheQueues() {
return this.cacheQueues;
}

public void setCacheQueues(boolean cacheQueues) {
this.cacheQueues = cacheQueues;
}

public ProviderConfiguration withCacheQueues(boolean cacheQueues) {
setCacheQueues(cacheQueues);
return this;
}

}
21 changes: 15 additions & 6 deletions src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,6 +92,14 @@ public class SQSConnection implements Connection, QueueConnection {
* but it will make multiple calls as necessary.
*/
private final int numberOfMessagesToPrefetch;

/**
* Configures sessions to cache queues created JMS Queue objects.
* @see SQSSession#createQueue(String)
* @see SQSSession#createQueue(String, String)
*/
private final boolean cacheQueues;

private volatile boolean closed = false;
private volatile boolean closing = false;

Expand All @@ -106,10 +114,11 @@ public class SQSConnection implements Connection, QueueConnection {

private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap<>());

SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) {
SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch,
boolean cacheQueues) {
amazonSQSClient = amazonSQSClientJMSWrapper;
this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;

this.cacheQueues = cacheQueues;
}

/**
Expand Down Expand Up @@ -184,11 +193,11 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS

SQSSession sqsSession;
if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) {
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_AUTO.withOriginalAcknowledgeMode(acknowledgeMode));
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_AUTO.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues);
} else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_RANGE.withOriginalAcknowledgeMode(acknowledgeMode));
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_RANGE.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues);
} else if (acknowledgeMode == SQSSession.UNORDERED_ACKNOWLEDGE) {
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_UNORDERED.withOriginalAcknowledgeMode(acknowledgeMode));
sqsSession = new SQSSession(this, AcknowledgeMode.ACK_UNORDERED.withOriginalAcknowledgeMode(acknowledgeMode), this.cacheQueues);
} else {
LOG.error("Unrecognized acknowledgeMode. Cannot create Session.");
throw new JMSException("Unrecognized acknowledgeMode. Cannot create Session.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -152,7 +152,8 @@ public SQSConnection createConnection(AwsCredentialsProvider awsCredentialsProvi

private SQSConnection createConnection(SqsClient amazonSQS, AwsCredentialsProvider awsCredentialsProvider) throws JMSException {
AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = new AmazonSQSMessagingClientWrapper(amazonSQS, awsCredentialsProvider);
return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch());
return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch(),
providerConfiguration.isCacheQueues());
}

@Override
Expand Down
35 changes: 24 additions & 11 deletions src/main/java/com/amazon/sqs/javamessaging/SQSSession.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -181,15 +181,18 @@ public class SQSSession implements Session, QueueSession {
*/
private SQSMessageConsumer activeConsumerInCallback = null;

SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode) throws JMSException {
this(parentSQSConnection, acknowledgeMode,
Collections.newSetFromMap(new ConcurrentHashMap<>()),
Collections.newSetFromMap(new ConcurrentHashMap<>()));
private final boolean cacheQueues;

private final Map<String, SQSQueueDestination> queueCache = new ConcurrentHashMap<>();

SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode, boolean cacheQueues) throws JMSException {
this(parentSQSConnection, acknowledgeMode, Collections.newSetFromMap(new ConcurrentHashMap<>()),
Collections.newSetFromMap(new ConcurrentHashMap<>()), cacheQueues);
}

SQSSession(SQSConnection parentSQSConnection, AcknowledgeMode acknowledgeMode,
Set<SQSMessageConsumer> messageConsumers,
Set<SQSMessageProducer> messageProducers) throws JMSException {
Set<SQSMessageConsumer> messageConsumers, Set<SQSMessageProducer> messageProducers, boolean cacheQueues)
throws JMSException {
this.parentSQSConnection = parentSQSConnection;
this.amazonSQSClient = parentSQSConnection.getWrappedAmazonSQSClient();
this.acknowledgeMode = acknowledgeMode;
Expand All @@ -199,6 +202,7 @@ public class SQSSession implements Session, QueueSession {
this.executor = Executors.newSingleThreadExecutor(SESSION_THREAD_FACTORY);
this.messageConsumers = messageConsumers;
this.messageProducers = messageProducers;
this.cacheQueues = cacheQueues;

executor.execute(sqsSessionRunnable);
}
Expand Down Expand Up @@ -614,8 +618,7 @@ public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscripti
*/
@Override
public Queue createQueue(String queueName) throws JMSException {
checkClosed();
return new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName).queueUrl());
return createQueue(queueName, null);
}

/**
Expand All @@ -630,8 +633,18 @@ public Queue createQueue(String queueName) throws JMSException {
*/
public Queue createQueue(String queueName, String ownerAccountId) throws JMSException {
checkClosed();
return new SQSQueueDestination(
queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl());
SQSQueueDestination queue = null;
String cacheKey = (ownerAccountId != null) ? (ownerAccountId + "/" + queueName) : queueName;
if (this.cacheQueues) {
queue = this.queueCache.get(cacheKey);
}
if (queue == null) {
queue = new SQSQueueDestination(queueName, amazonSQSClient.getQueueUrl(queueName, ownerAccountId).queueUrl());
}
if (this.cacheQueues) {
this.queueCache.put(cacheKey, queue);
}
return queue;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,8 +84,8 @@ public void Setup() throws JMSException {
SQSQueueDestination sqsDestination = new SQSQueueDestination(QUEUE_NAME, QUEUE_URL);
amazonSQSClient = mock(AmazonSQSMessagingClientWrapper.class);

connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
session = new SQSSession(connection, AcknowledgeMode.ACK_AUTO);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = new SQSSession(connection, AcknowledgeMode.ACK_AUTO, false);
SQSSessionCallbackScheduler sqsSessionRunnable = new SQSSessionCallbackScheduler(session,
AcknowledgeMode.ACK_AUTO, acknowledger, negativeAcknowledger);

Expand Down Expand Up @@ -125,7 +125,7 @@ public void verify() {

// Test session close operation with create producer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesProducer, closeSessionOperation);
Expand All @@ -134,7 +134,7 @@ public void verify() {

// Test session close operation with create consumer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesConsumer, closeSessionOperation);
Expand Down Expand Up @@ -169,7 +169,7 @@ public void verify() {

// Test connection start operation with create producer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesProducer, startConnectionOperation);
Expand All @@ -178,7 +178,7 @@ public void verify() {

// Test connection start operation with create consumer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesConsumer, startConnectionOperation);
Expand Down Expand Up @@ -211,15 +211,15 @@ public void verify() {

// Test connection close operation with create producer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesProducer, closeConnectionOperation);
}

// Test connection close operation with create consumer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesConsumer, closeConnectionOperation);
Expand Down Expand Up @@ -254,7 +254,7 @@ public void verify() {

// Test connection stop operation with create producer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesProducer, stopConnectionOperation);
Expand All @@ -263,7 +263,7 @@ public void verify() {

// Test connection stop operation with create consumer operation
for (int i = 0; i < 10; ++i) {
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH);
connection = new SQSConnection(amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH, false);
session = (SQSSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

testConcurrentExecution(msgListenerCreatesConsumer, stopConnectionOperation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@ public void setup() throws JMSException {

int numberOfMessagesToPrefetch = 10;
AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class);
sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch));
sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch, false));

session1 = mock(SQSSession.class);
session2 = mock(SQSSession.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public class SQSMessageConsumerTest {
public void setup() throws JMSException {
sqsConnection = mock(SQSConnection.class);

sqsSession = spy(new SQSSession(sqsConnection, AcknowledgeMode.ACK_AUTO));//mock(SQSSession.class);
sqsSession = spy(new SQSSession(sqsConnection, AcknowledgeMode.ACK_AUTO, false));//mock(SQSSession.class);
sqsSessionRunnable = mock(SQSSessionCallbackScheduler.class);

acknowledger = mock(Acknowledger.class);
Expand Down
Loading