Skip to content

Commit 208f27c

Browse files
Merge pull request #81 from JaidenAshmore/issue/77_message_broker_as_runnable
refs 77: Changed message broker to be a runnable
2 parents 4128d4e + 0518c7d commit 208f27c

File tree

26 files changed

+134
-1085
lines changed

26 files changed

+134
-1085
lines changed

examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import com.jashmore.sqs.argument.payload.Payload;
1414
import com.jashmore.sqs.argument.payload.mapper.JacksonPayloadMapper;
1515
import com.jashmore.sqs.argument.payload.mapper.PayloadMapper;
16+
import com.jashmore.sqs.broker.concurrent.CachingConcurrentMessageBrokerProperties;
1617
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
17-
import com.jashmore.sqs.broker.concurrent.properties.CachingConcurrentMessageBrokerProperties;
18-
import com.jashmore.sqs.broker.concurrent.properties.ConcurrentMessageBrokerProperties;
18+
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
1919
import com.jashmore.sqs.container.MessageListenerContainer;
2020
import com.jashmore.sqs.container.SimpleMessageListenerContainer;
2121
import com.jashmore.sqs.processor.DefaultMessageProcessor;
@@ -120,7 +120,6 @@ public static void main(final String[] args) throws Exception {
120120
final ConcurrentMessageBroker concurrentMessageBroker = new ConcurrentMessageBroker(
121121
messageRetriever,
122122
messageProcessor,
123-
executorService,
124123
// Represents a concurrent implementation that will fluctuate between 0 and 10 threads all processing messages
125124
new CachingConcurrentMessageBrokerProperties(10000, new ConcurrentMessageBrokerProperties() {
126125
private final Random random = new Random(1);

java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/broker/MessageBroker.java

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.jashmore.sqs.processor.MessageProcessor;
44
import com.jashmore.sqs.retriever.MessageRetriever;
55

6-
import java.util.concurrent.Future;
76
import javax.annotation.concurrent.ThreadSafe;
87

98
/**
@@ -16,59 +15,6 @@
1615
* threads. Therefore, implementations of this class must be thread safe.
1716
*/
1817
@ThreadSafe
19-
public interface MessageBroker {
20-
/**
21-
* Start the brokerage of messages from the queue to the processor in a background thread.
22-
*
23-
* <p>Requirements for this method include:
24-
*
25-
* <ul>
26-
* <li>This method must be non-blocking and return once the background thread has started.</li>
27-
* <li>If this broker has already been started when this method is called again a {@link IllegalStateException} will be thrown until a subsequent
28-
* {@link #stop()} or {@link #stopWithChildrenThreadsInterrupted()} has been called.</li>
29-
* <li>If this broker is being stopped by calling {@link #stop()} or {@link #stopWithChildrenThreadsInterrupted()}, a call to this
30-
* method should not be blocked by the previous thread and should start a new thread.</li>
31-
* </ul>
32-
*
33-
* @throws IllegalStateException if the broker has already been started
34-
*/
35-
void start();
18+
public interface MessageBroker extends Runnable {
3619

37-
/**
38-
* Stop the brokerage of messages from the queue to the processor, returning the future that will be resolved when that shutdown is complete.
39-
*
40-
* <p>Requirements for this method include:
41-
*
42-
* <ul>
43-
* <li>The broker background thread and any threads created by this background thread must be completed before the {@link Future} returned
44-
* is resolved.</li>
45-
* <li>The children threads processing the messages will not be interrupted and therefore will fully process the message before the {@link Future}
46-
* is resolved.</li>
47-
* <li>If this broker has not been started or has already been stopped, any calls to this method will throw an {@link IllegalStateException}.</li>
48-
* <li>The returned {@link Future} does not have any requirements for the value resolved and therefore should not be relied upon.</li>
49-
* </ul>
50-
*
51-
* @return a future that will resolve when the message broker background thread and all child threads have been stopped/completed
52-
* @throws IllegalStateException if the broker has not been started or has already stopped
53-
*/
54-
Future<Object> stop();
55-
56-
/**
57-
* Stop the brokerage of messages from the queue to the processor, returning the future that will be resolved when that shutdown is complete.
58-
*
59-
* <p>Requirements for this method include:
60-
*
61-
* <ul>
62-
* <li>The broker background thread and any threads created by this background thread must be completed before the {@link Future} returned
63-
* is resolved.</li>
64-
* <li>The children threads processing the messages will be interrupted and therefore may not fully complete processing the messages before the
65-
* {@link Future} is resolved.</li>
66-
* <li>If this broker has not been started or has already been stopped, any calls to this method will throw an {@link IllegalStateException}.</li>
67-
* <li>The returned {@link Future} does not have any requirements for the value resolved and therefore should not be relied upon.</li>
68-
* </ul>
69-
*
70-
* @return a future that will resolve when the message broker and any child threads have been stopped
71-
* @throws IllegalStateException if the broker has not been started or has already stopped
72-
*/
73-
Future<Object> stopWithChildrenThreadsInterrupted();
7420
}

java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/container/MessageListenerContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
* Container used to handle the entire lifecycle of a wrapped method in the spring context.
1010
*
1111
* <p>The responsibility of this container is being able to start and stop the processing of messages for the wrapped method. This
12-
* may only need to call down to the underlying {@link MessageBroker#start()} but there could be more complicated actions needing
13-
* to be made, such as starting an {@link AsyncMessageRetriever}.
12+
* may will start any background threads needed for any component of the library.
1413
*
1514
* <p>These containers must be thread safe as there could be multiple threads starting and stopping these containers.
1615
*/

java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/resolver/MessageResolvedResponse.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/AbstractMessageBroker.java

Lines changed: 0 additions & 82 deletions
This file was deleted.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.jashmore.sqs.broker.concurrent.properties;
1+
package com.jashmore.sqs.broker.concurrent;
22

33
import com.google.common.cache.CacheBuilder;
44
import com.google.common.cache.CacheLoader;

0 commit comments

Comments
 (0)