Skip to content

Commit cb55ac3

Browse files
refs #77: Made AsyncMessageRetriever a Runnable instead of using start/stop methods.
1 parent 2e126b4 commit cb55ac3

File tree

14 files changed

+427
-675
lines changed

14 files changed

+427
-675
lines changed

examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
1717
import com.jashmore.sqs.broker.concurrent.properties.CachingConcurrentMessageBrokerProperties;
1818
import com.jashmore.sqs.broker.concurrent.properties.ConcurrentMessageBrokerProperties;
19+
import com.jashmore.sqs.container.MessageListenerContainer;
20+
import com.jashmore.sqs.container.SimpleMessageListenerContainer;
1921
import com.jashmore.sqs.processor.DefaultMessageProcessor;
2022
import com.jashmore.sqs.processor.MessageProcessor;
2123
import com.jashmore.sqs.resolver.AsyncMessageResolver;
@@ -95,13 +97,9 @@ public static void main(final String[] args) throws Exception {
9597
.desiredMinPrefetchedMessages(10)
9698
.maxPrefetchedMessages(20)
9799
.maxWaitTimeInSecondsToObtainMessagesFromServer(10)
98-
.build(),
99-
executorService
100+
.build()
100101
);
101102

102-
// As this retrieves messages asynchronously we need to start the background thread
103-
messageRetriever.start();
104-
105103
// Creates the class that will deal with taking messages and getting them processed by the message consumer
106104
final MessageConsumer messageConsumer = new MessageConsumer();
107105
final Method messageReceivedMethod = MessageConsumer.class.getMethod("method", Request.class, String.class);
@@ -139,11 +137,9 @@ public Integer getConcurrencyLevel() {
139137
})
140138
);
141139

142-
// As the BatchingMessageResolver uses a background thread to delete the messages in batches we need to start it in a background thread
143-
executorService.submit(messageResolver);
144-
145-
// When we start listening it will receive messages from SQS and pass them to the MessageConsumer for processing
146-
concurrentMessageBroker.start();
140+
final MessageListenerContainer messageListenerContainer
141+
= new SimpleMessageListenerContainer(messageRetriever, concurrentMessageBroker, messageResolver);
142+
messageListenerContainer.start();
147143

148144
// Create some producers of messages
149145
final Future<?> producerFuture = executorService.submit(new Producer(sqsAsyncClient, queueUrl));

examples/java-dynamic-sqs-listener-spring-starter-examples/src/main/java/com/jashmore/sqs/examples/MessageListeners.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class MessageListeners {
1414
*
1515
* @param payload the payload of the SQS Message
1616
*/
17-
@QueueListener(value = "test")
17+
@QueueListener(value = "test", identifier = "test")
1818
public void method(@Payload final String payload) {
1919
log.info("Message Received: {}", payload);
2020
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.jashmore.sqs.examples;
2+
3+
import com.jashmore.sqs.spring.QueueContainerService;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.BeanFactory;
7+
import org.springframework.scheduling.annotation.Scheduled;
8+
import org.springframework.stereotype.Component;
9+
10+
@Slf4j
11+
@Component
12+
@RequiredArgsConstructor
13+
public class ScheduledQueueListenerEnabler {
14+
// TODO: #79: Allow for dependency injection of QueueContainerService
15+
private final BeanFactory beanFactory;
16+
17+
/**
18+
* Just a scheduled job that shows that there are methods of turning off the container when necessary.
19+
*
20+
* @throws InterruptedException if the thread was interrupted while sleeping
21+
*/
22+
@Scheduled(initialDelay = 10_000, fixedDelay = 30_000)
23+
public void turnOfSqsListener() throws InterruptedException {
24+
log.info("Turning off SQS Listener for a short period");
25+
26+
final QueueContainerService queueContainerService = beanFactory.getBean(QueueContainerService.class);
27+
queueContainerService.stopContainer("test");
28+
Thread.sleep(5_000);
29+
log.info("Turning SQS Listener back om");
30+
queueContainerService.startContainer("test");
31+
}
32+
}
Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,25 @@
11
package com.jashmore.sqs.retriever;
22

3-
import java.util.concurrent.Future;
4-
53
/**
64
* {@link MessageRetriever} that obtains messages in the background for consumption.
75
*
86
* <p>For example, this could be used when messages are pulled from the queue and cached locally to improve performance by reducing the number
97
* of times and latency of pulling messages from the queue.
8+
*
9+
* <p>To utilise this retriever, the class must run on a separate thread. For example:
10+
*
11+
* <pre class="code">
12+
* final AsyncMessageRetriever messageRetriever = new SomeAsyncMessageRetrieverImpl(...);
13+
* // start it on a background thread
14+
* Future&lt;?&gt; retrieverFuture = Executors.newCachedThreadPool().submit(messageRetriever);
15+
*
16+
* // Now messages can be retrieved
17+
* Message message = retrieverFuture.retrieveMessage(message);
18+
*
19+
* // Stop the message retriever when you are done
20+
* retrieverFuture.cancel(true);
21+
* </pre>
1022
*/
11-
public interface AsyncMessageRetriever extends MessageRetriever {
12-
/**
13-
* Start retrieve messages from the queue in a background thread.
14-
*
15-
* <p>Requirements for this method include:
16-
*
17-
* <ul>
18-
* <li>This method must be non-blocking and return once the background thread has started.</li>
19-
* <li>If this retriever has already been started, any calls to this method will throw an {@link IllegalStateException}.</li>
20-
* <li>If this retriever is being stopped by calling {@link #stop()}, a call to this method should <b>not</b> be blocked by the previous thread and
21-
* should start a new thread.</li>
22-
* </ul>
23-
*
24-
* @throws IllegalStateException if the retriever has already been started
25-
*/
26-
void start();
23+
public interface AsyncMessageRetriever extends MessageRetriever, Runnable {
2724

28-
/**
29-
* Stop the retrieval of messages.
30-
*
31-
* <p>Requirements for this method include:
32-
*
33-
* <ul>
34-
* <li>This method must be non-blocking and return once the background thread has been triggered to stop.</li>
35-
* <li>If this retriever has not been started or has already been stopped, any calls to this method will return a resolved {@link Future}</li>
36-
* <li>The returned {@link Future} does not have any requirements for the value resolved and therefore should not be relied upon.</li>
37-
* </ul>
38-
*
39-
* @return future that will resolve when the background message retriever thread has stopped
40-
* @throws IllegalStateException if the retriever has not been started or has already stopped
41-
*/
42-
Future<Object> stop();
4325
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/container/SimpleMessageListenerContainer.java

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import com.jashmore.sqs.resolver.MessageResolver;
99
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
1010
import com.jashmore.sqs.retriever.MessageRetriever;
11+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1112
import lombok.extern.slf4j.Slf4j;
1213

1314
import java.util.concurrent.ExecutionException;
1415
import java.util.concurrent.ExecutorService;
1516
import java.util.concurrent.Executors;
1617
import java.util.concurrent.Future;
18+
import java.util.concurrent.TimeUnit;
1719
import javax.annotation.concurrent.GuardedBy;
1820

1921
/**
@@ -45,18 +47,8 @@ public class SimpleMessageListenerContainer implements MessageListenerContainer
4547
*/
4648
private final MessageResolver messageResolver;
4749

48-
private final ExecutorService executorService;
49-
50-
/**
51-
* Stores whether the container is currently running.
52-
*
53-
* <p>This is kept thread safe by making sure all methods for this container are synchronized.
54-
*/
55-
@GuardedBy("this")
56-
private volatile boolean isRunning;
57-
5850
@GuardedBy("this")
59-
private Future<?> messageResolverCompletableFuture;
51+
private ExecutorService executorService;
6052

6153
/**
6254
* Container that can be built when the {@link MessageBroker} is using an {@link AsyncMessageRetriever}. This takes the {@link AsyncMessageRetriever} so
@@ -72,64 +64,62 @@ public SimpleMessageListenerContainer(final MessageRetriever messageRetriever,
7264
this.messageRetriever = messageRetriever;
7365
this.messageBroker = messageBroker;
7466
this.messageResolver = messageResolver;
75-
this.executorService = Executors.newCachedThreadPool();
7667

77-
this.messageResolverCompletableFuture = null;
78-
}
79-
80-
@VisibleForTesting
81-
SimpleMessageListenerContainer(final MessageRetriever messageRetriever,
82-
final MessageBroker messageBroker,
83-
final MessageResolver messageResolver,
84-
final ExecutorService executorService) {
85-
this.messageRetriever = messageRetriever;
86-
this.messageBroker = messageBroker;
87-
this.messageResolver = messageResolver;
88-
this.executorService = executorService;
89-
90-
this.messageResolverCompletableFuture = null;
68+
this.executorService = null;
9169
}
9270

71+
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
9372
@Override
9473
public synchronized void start() {
95-
if (isRunning) {
74+
if (executorService != null) {
9675
return;
9776
}
9877

78+
executorService = getNewExecutorService();
79+
9980
if (messageRetriever instanceof AsyncMessageRetriever) {
100-
((AsyncMessageRetriever)messageRetriever).start();
81+
executorService.submit((AsyncMessageRetriever) messageRetriever);
10182
}
10283

10384
if (messageResolver instanceof AsyncMessageResolver) {
104-
messageResolverCompletableFuture = executorService.submit((AsyncMessageResolver) messageResolver);
85+
executorService.submit((AsyncMessageResolver) messageResolver);
10586
}
10687

10788
messageBroker.start();
108-
109-
isRunning = true;
11089
}
11190

11291
@Override
11392
public synchronized void stop() {
114-
if (!isRunning) {
93+
if (executorService == null) {
11594
return;
11695
}
11796

11897
try {
119-
final Future<?> messageBrokerStoppedFuture = messageBroker.stop();
120-
if (messageRetriever instanceof AsyncMessageRetriever) {
121-
((AsyncMessageRetriever)messageRetriever).stop().get();
122-
}
98+
executorService.shutdownNow();
12399

124-
if (messageResolverCompletableFuture != null) {
125-
messageResolverCompletableFuture.cancel(true);
100+
try {
101+
final Future<?> messageBrokerStoppedFuture = messageBroker.stop();
102+
103+
messageBrokerStoppedFuture.get();
104+
} catch (final InterruptedException interruptedException) {
105+
Thread.currentThread().interrupt();
106+
} catch (final ExecutionException executionException) {
107+
log.error("Error waiting for container to stop", executionException.getCause());
126108
}
127109

128-
messageBrokerStoppedFuture.get();
129-
} catch (final InterruptedException | ExecutionException exception) {
130-
log.error("Error waiting for container to stop", exception.getCause());
110+
try {
111+
executorService.awaitTermination(1, TimeUnit.MINUTES);
112+
} catch (final InterruptedException interruptedException) {
113+
Thread.currentThread().interrupt();
114+
}
131115
} finally {
132-
isRunning = false;
116+
// Reset so we can start the container again in the future
117+
executorService = null;
133118
}
134119
}
120+
121+
@VisibleForTesting
122+
ExecutorService getNewExecutorService() {
123+
return Executors.newCachedThreadPool();
124+
}
135125
}

0 commit comments

Comments
 (0)