From da191680bf05010f077f9429a03efcf29c74461c Mon Sep 17 00:00:00 2001 From: Moritz Becker Date: Thu, 19 Sep 2019 12:54:13 +0200 Subject: [PATCH] fix #81 add async send methods to SQSMessageProducer --- .../sqs/javamessaging/CompletionListener.java | 30 ++++ .../sqs/javamessaging/SQSMessageProducer.java | 142 +++++++++++++++--- 2 files changed, 148 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/amazon/sqs/javamessaging/CompletionListener.java diff --git a/src/main/java/com/amazon/sqs/javamessaging/CompletionListener.java b/src/main/java/com/amazon/sqs/javamessaging/CompletionListener.java new file mode 100644 index 0000000..c86707e --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/CompletionListener.java @@ -0,0 +1,30 @@ +package com.amazon.sqs.javamessaging; + +import javax.jms.Message; + +/** + * Mimics {@link javax.jms.CompletionListener} from JMS 2.0 API + */ +public interface CompletionListener { + + /** + * Notifies the application that the message has been successfully sent + * + * @param message + * the message that was sent. + */ + void onCompletion(Message message); + + /** + * Notifies user that the specified exception was thrown while attempting to + * send the specified message. If an exception occurs it is undefined + * whether or not the message was successfully sent. + * + * @param message + * the message that was sent. + * @param exception + * the exception + * + */ + void onException(Message message, Exception exception); +} diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java index 8093f04..eae0459 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java @@ -17,6 +17,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,6 +31,8 @@ import javax.jms.Queue; import javax.jms.QueueSender; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sqs.AmazonSQSAsync; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -90,13 +93,75 @@ public class SQSMessageProducer implements MessageProducer, QueueSender { void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSException { checkClosed(); - String sqsMessageBody = null; - String messageType = null; if (!(rawMessage instanceof SQSMessage)) { throw new MessageFormatException( - "Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage"); + "Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage"); + } + SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage); + SQSMessage message = (SQSMessage) rawMessage; + + SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest); + String messageId = sendMessageResult.getMessageId(); + LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId); + applySendMessageResult(sendMessageResult, message); + } + + private void sendInternalAsync(SQSQueueDestination queue, Message rawMessage, final CompletionListener completionListener) throws JMSException { + checkClosed(); + if (!(amazonSQSClient.getAmazonSQSClient() instanceof AmazonSQSAsync)) { + throw new UnsupportedOperationException("Expected instance of " + SQSMessageProducer.class.getName() + " to be backed by an instance of " + + AmazonSQSAsync.class.getName() + + " but was: " + amazonSQSClient.getAmazonSQSClient().getClass().getName()); + } + if (!(rawMessage instanceof SQSMessage)) { + throw new MessageFormatException( + "Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage"); + } + AmazonSQSAsync amazonSQSAsync = (AmazonSQSAsync) amazonSQSClient.getAmazonSQSClient(); + + SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage); + final SQSMessage message = (SQSMessage) rawMessage; + + amazonSQSAsync.sendMessageAsync(sendMessageRequest, new AsyncHandler() { + @Override + public void onError(Exception e) { + if (completionListener != null) { + completionListener.onException(message, e); + } + } + + @Override + public void onSuccess(SendMessageRequest request, SendMessageResult sendMessageResult) { + String messageId = sendMessageResult.getMessageId(); + LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId); + try { + applySendMessageResult(sendMessageResult, message); + } catch (JMSException e) { + throw new RuntimeException(e); + } + if (completionListener != null) { + completionListener.onCompletion(message); + } + } + }); + } + + private void applySendMessageResult(SendMessageResult sendMessageResult, SQSMessage message) throws JMSException { + /** TODO: Do not support disableMessageID for now. */ + message.setSQSMessageId(sendMessageResult.getMessageId()); + + // if the message was sent to FIFO queue, the sequence number will be + // set in the response + // pass it to JMS user through provider specific JMS property + if (sendMessageResult.getSequenceNumber() != null) { + message.setSequenceNumber(sendMessageResult.getSequenceNumber()); } - + } + + private SendMessageRequest createSendMessageRequest(SQSQueueDestination queue, Message rawMessage) throws JMSException { + String sqsMessageBody = null; + String messageType = null; + SQSMessage message = (SQSMessage)rawMessage; message.setJMSDestination(queue); if (message instanceof SQSBytesMessage) { @@ -105,11 +170,11 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep } else if (message instanceof SQSObjectMessage) { sqsMessageBody = ((SQSObjectMessage) message).getMessageBody(); messageType = SQSMessage.OBJECT_MESSAGE_TYPE; - } else if (message instanceof SQSTextMessage) { + } else if (message instanceof SQSTextMessage) { sqsMessageBody = ((SQSTextMessage) message).getText(); messageType = SQSMessage.TEXT_MESSAGE_TYPE; } - + if (sqsMessageBody == null || sqsMessageBody.isEmpty()) { throw new JMSException("Message body cannot be null or empty"); } @@ -139,19 +204,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep sendMessageRequest.setMessageGroupId(message.getSQSMessageGroupId()); sendMessageRequest.setMessageDeduplicationId(message.getSQSMessageDeduplicationId()); } - - SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest); - String messageId = sendMessageResult.getMessageId(); - LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId); - /** TODO: Do not support disableMessageID for now. */ - message.setSQSMessageId(messageId); - - // if the message was sent to FIFO queue, the sequence number will be - // set in the response - // pass it to JMS user through provider specific JMS property - if (sendMessageResult.getSequenceNumber() != null) { - message.setSequenceNumber(sendMessageResult.getSequenceNumber()); - } + return sendMessageRequest; } @Override @@ -187,6 +240,15 @@ public void send(Queue queue, Message message) throws JMSException { sendInternal((SQSQueueDestination)queue, message); } + public void sendAsync(Queue queue, Message message, CompletionListener completionListener) throws JMSException { + if (!(queue instanceof SQSQueueDestination)) { + throw new InvalidDestinationException( + "Incompatible implementation of Queue. Please use SQSQueueDestination implementation."); + } + checkIfDestinationAlreadySet(); + sendInternalAsync((SQSQueueDestination)queue, message, completionListener); + } + /** * Not verified on the client side, but SQS Attribute names must be valid * letter or digit on the basic multilingual plane in addition to allowing @@ -314,7 +376,12 @@ public void send(Queue queue, Message message, int deliveryMode, int priority, l throws JMSException { send(queue, message); } - + + public void sendAsync(Queue queue, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) + throws JMSException { + sendAsync(queue, message, completionListener); + } + /** * Gets the destination associated with this MessageProducer. * @@ -358,19 +425,27 @@ public void send(Message message) throws JMSException { } sendInternal(sqsDestination, message); } - + + public void sendAsync(Message message, CompletionListener completionListener) throws JMSException { + if (sqsDestination == null) { + throw new UnsupportedOperationException( + "MessageProducer has to specify a destination at creation time."); + } + sendInternalAsync(sqsDestination, message, completionListener); + } + /** * Sends a message to a destination created during the creation time of this * message producer. *

* Send does not support deliveryMode, priority, and timeToLive. It will * ignore anything in deliveryMode, priority, and timeToLive. - * + * * @param message * the message to send * @param deliveryMode * @param priority - * @param timeToLive + * @param timeToLive * @throws MessageFormatException * If an invalid message is specified. * @throws UnsupportedOperationException @@ -383,7 +458,11 @@ public void send(Message message) throws JMSException { public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { send(message); } - + + public void sendAsync(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { + sendAsync(message, completionListener); + } + /** * Sends a message to a queue destination. * @@ -414,6 +493,17 @@ public void send(Destination destination, Message message) throws JMSException { } } + public void sendAsync(Destination destination, Message message, CompletionListener completionListener) throws JMSException { + if (destination == null) { + throw new InvalidDestinationException("Destination cannot be null"); + } + if (destination instanceof SQSQueueDestination) { + sendAsync((Queue) destination, message, completionListener); + } else { + throw new InvalidDestinationException("Incompatible implementation of Destination. Please use SQSQueueDestination implementation."); + } + } + /** * Sends a message to a queue destination. *

@@ -443,6 +533,10 @@ public void send(Destination destination, Message message, int deliveryMode, int send(destination, message); } + public void sendAsync(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { + sendAsync(destination, message, completionListener); + } + /** This method is not supported. */ @Override public void setDisableMessageID(boolean value) throws JMSException {