Skip to content

Commit 54b292b

Browse files
author
Steve Powell
committed
Move WorkPoolTests.java and fix JavaDoc, sundry files
ConsumerDispatcher.java -- reference convention ConsumerWorkService.java -- minor refactor RequeueOnClose.java -- some privatisation and JavaDoc
1 parent f076346 commit 54b292b

File tree

10 files changed

+215
-175
lines changed

10 files changed

+215
-175
lines changed

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* ch1.{@link Channel#queueBind queueBind}(queueName, exchangeName, queueName);
4343
*
4444
* // Create the QueueingConsumer and have it consume from the queue
45-
* QueueingConsumer consumer = new {@link QueueingConsumer#QueueingConsumer QueueingConsumer}(ch1);
45+
* QueueingConsumer consumer = new {@link QueueingConsumer#QueueingConsumer(Channel) QueueingConsumer}(ch1);
4646
* ch1.{@link Channel#basicConsume basicConsume}(queueName, false, consumer);
4747
*
4848
* // Process deliveries
@@ -54,7 +54,7 @@
5454
* </pre>
5555
*
5656
*
57-
* <p>For a more complete example, see LogTail in the test/src/com/rabbitmq/examples
57+
* <p>For a more complete example, see LogTail in the <code>test/src/com/rabbitmq/examples</code>
5858
* directory of the source distribution.</p>
5959
* <p/>
6060
* <b>deprecated</b> <i><code>QueueingConsumer</code> was introduced to allow
@@ -64,7 +64,7 @@
6464
* <code>Connection's</code> thread. This had two main drawbacks. Firstly, the
6565
* <code>Consumer</code> could stall the processing of all
6666
* <code>Channels</code> on the <code>Connection</code>. Secondly, if a
67-
* <code>Consumer</code> made a recursive synchronous call into its
67+
* <code>Consumer</code> made a recursive synchronous call into its
6868
* <code>Channel</code> the client would deadlock.
6969
* <p/>
7070
* <code>QueueingConsumer</code> provided client code with an easy way to
@@ -207,6 +207,7 @@ private Delivery handle(Delivery delivery) {
207207
* @return the next message
208208
* @throws InterruptedException if an interrupt is received while waiting
209209
* @throws ShutdownSignalException if the connection is shut down while waiting
210+
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
210211
*/
211212
public Delivery nextDelivery()
212213
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
@@ -220,6 +221,7 @@ public Delivery nextDelivery()
220221
* @return the next message or null if timed out
221222
* @throws InterruptedException if an interrupt is received while waiting
222223
* @throws ShutdownSignalException if the connection is shut down while waiting
224+
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
223225
*/
224226
public Delivery nextDelivery(long timeout)
225227
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -968,14 +968,14 @@ public Consumer transformReply(AMQCommand replyCommand) {
968968
}
969969

970970

971-
/** Public API - {@inheritDoc} */
971+
/** Public API - {@inheritDoc} */
972972
public Basic.RecoverOk basicRecover()
973973
throws IOException
974974
{
975975
return basicRecover(true);
976976
}
977977

978-
/** Public API - {@inheritDoc} */
978+
/** Public API - {@inheritDoc} */
979979
public Basic.RecoverOk basicRecover(boolean requeue)
980980
throws IOException
981981
{

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ final class ConsumerDispatcher {
4545
private volatile boolean shutdownConsumersDriven = false;
4646
private volatile CountDownLatch shutdownConsumersComplete;
4747

48-
private volatile ShutdownSignalException shutdownSignal;
48+
private volatile ShutdownSignalException shutdownSignal = null;
4949

5050
public ConsumerDispatcher(AMQConnection connection,
5151
Channel channel,
@@ -144,7 +144,7 @@ public CountDownLatch handleShutdownSignal(final Map<String, Consumer> consumers
144144
// Execute shutdown processing even if there are no consumers.
145145
execute(new Runnable() {
146146
public void run() {
147-
notifyConsumersOfShutdown(consumers, signal);
147+
ConsumerDispatcher.this.notifyConsumersOfShutdown(consumers, signal);
148148
ConsumerDispatcher.this.shutdown(signal);
149149
ConsumerDispatcher.this.workService.stopWork(ConsumerDispatcher.this.channel);
150150
latch.countDown();

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,9 @@ final class ConsumerWorkService {
3030
private final WorkPool<Channel, Runnable> workPool;
3131

3232
public ConsumerWorkService(ExecutorService executor) {
33-
if (executor == null) {
34-
privateExecutor = true;
35-
this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
36-
} else {
37-
privateExecutor = false;
38-
this.executor = executor;
39-
}
33+
this.privateExecutor = (executor == null);
34+
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
35+
: executor;
4036
this.workPool = new WorkPool<Channel, Runnable>();
4137
}
4238

src/com/rabbitmq/client/impl/SetQueue.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/** A generic queue-like implementation (supporting operations <code>addIfNotPresent</code>,
99
* <code>poll</code>, <code>contains</code>, and <code>isEmpty</code>)
1010
* which restricts a queue element to appear at most once.
11-
* If the element is already present {@link #addIfNotPresent(T)} returns <code><b>false</b></code>.
11+
* If the element is already present {@link #addIfNotPresent} returns <code><b>false</b></code>.
1212
* <p/>
1313
* Elements must not be <code><b>null</b></code>.
1414
* <p/><b>Concurrent Semantics</b><br/>
@@ -43,7 +43,8 @@ public T poll() {
4343
return item;
4444
}
4545

46-
/** @return <code><b>true</b></code> if and only if <b>item</b> is in the queue.*/
46+
/** @param item to look for in queue
47+
* @return <code><b>true</b></code> if and only if <b>item</b> is in the queue.*/
4748
public boolean contains(T item) {
4849
return this.members.contains(item);
4950
}
@@ -54,6 +55,7 @@ public boolean isEmpty() {
5455
}
5556

5657
/** Remove item from queue, if present.
58+
* @param item to remove
5759
* @return <code><b>true</b></code> if and only if item was initially present and was removed.
5860
*/
5961
public boolean remove(T item) {

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@
6363
* </pre>
6464
* <i>dormant</i> is not represented in the implementation state, and adding items
6565
* when the client is <i>in progress</i> or <i>ready</i> does not change its state.
66+
* @param <K> Key -- type of client
67+
* @param <W> Work -- type of work item
6668
*/
6769
public class WorkPool<K, W> {
6870

6971
/** protecting <code>ready</code>, <code>inProgress</code> and <code>pool</code> */
7072
private final Object monitor = new Object();
71-
/** An ordered queue of <i>ready</i> clients. */
73+
/** An injective queue of <i>ready</i> clients. */
7274
private final SetQueue<K> ready = new SetQueue<K>();
7375
/** The set of clients which have work <i>in progress</i>. */
7476
private final Set<K> inProgress = new HashSet<K>();
@@ -79,7 +81,7 @@ public class WorkPool<K, W> {
7981
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
8082
* A client is initially <i>dormant</i>.
8183
* <p/>
82-
* No-op if <code><b>key</b></code> already present.
84+
* No-op if <code><b>key</b></code> already present.
8385
* @param key client to add to pool
8486
*/
8587
public void registerKey(K key) {
@@ -208,7 +210,7 @@ private boolean moreWorkItems(K key) {
208210
LinkedList<W> leList = this.pool.get(key);
209211
return (leList==null ? false : !leList.isEmpty());
210212
}
211-
213+
212214
/* State identification functions */
213215
private boolean isInProgress(K key){ return this.inProgress.contains(key); }
214216
private boolean isReady(K key){ return this.ready.contains(key); }
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import junit.framework.TestCase;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
/**
9+
* Unit tests for {@link WorkPool}
10+
*/
11+
public class WorkPoolTests extends TestCase {
12+
13+
private WorkPool<String, Object> pool = new WorkPool<String, Object>();
14+
15+
/**
16+
* Test that an unknown key is rejected.
17+
*/
18+
public void testUnkownKey() {
19+
try {
20+
this.pool.addWorkItem("test", new Object());
21+
fail("Expected IllegalArgumentException");
22+
} catch (IllegalArgumentException e) {
23+
// expected
24+
}
25+
}
26+
27+
/**
28+
* Test basic add work and remove work.
29+
* @throws Exception untested
30+
*/
31+
public void testBasicInOut() throws Exception {
32+
Object one = new Object();
33+
Object two = new Object();
34+
35+
this.pool.registerKey("test");
36+
assertTrue(this.pool.addWorkItem("test", one));
37+
assertFalse(this.pool.addWorkItem("test", two));
38+
39+
List<Object> workList = new ArrayList<Object>(16);
40+
String key = this.pool.nextWorkBlock(workList, 1);
41+
assertEquals("test", key);
42+
assertEquals(1, workList.size());
43+
assertEquals(one, workList.get(0));
44+
45+
assertTrue("Should be made ready", this.pool.finishWorkBlock(key));
46+
47+
workList.clear();
48+
key = this.pool.nextWorkBlock(workList, 1);
49+
assertEquals("Work client key wrong", "test", key);
50+
assertEquals("Wrong work delivered", two, workList.get(0));
51+
52+
assertFalse("Should not be made ready after this.", this.pool.finishWorkBlock(key));
53+
54+
assertNull("Shouldn't be more work", this.pool.nextWorkBlock(workList, 1));
55+
}
56+
57+
/**
58+
* Test add work when work in progress.
59+
* @throws Exception untested
60+
*/
61+
public void testWorkInWhileInProgress() throws Exception {
62+
Object one = new Object();
63+
Object two = new Object();
64+
65+
this.pool.registerKey("test");
66+
assertTrue(this.pool.addWorkItem("test", one));
67+
68+
List<Object> workList = new ArrayList<Object>(16);
69+
String key = this.pool.nextWorkBlock(workList, 1);
70+
assertEquals("test", key);
71+
assertEquals(1, workList.size());
72+
assertEquals(one, workList.get(0));
73+
74+
assertFalse(this.pool.addWorkItem("test", two));
75+
76+
assertTrue(this.pool.finishWorkBlock(key));
77+
78+
workList.clear();
79+
key = this.pool.nextWorkBlock(workList, 1);
80+
assertEquals("test", key);
81+
assertEquals(1, workList.size());
82+
assertEquals(two, workList.get(0));
83+
}
84+
85+
/**
86+
* Test multiple work keys.
87+
* @throws Exception untested
88+
*/
89+
public void testInterleavingKeys() throws Exception {
90+
Object one = new Object();
91+
Object two = new Object();
92+
Object three = new Object();
93+
94+
this.pool.registerKey("test1");
95+
this.pool.registerKey("test2");
96+
97+
assertTrue(this.pool.addWorkItem("test1", one));
98+
assertTrue(this.pool.addWorkItem("test2", two));
99+
assertFalse(this.pool.addWorkItem("test1", three));
100+
101+
List<Object> workList = new ArrayList<Object>(16);
102+
String key = this.pool.nextWorkBlock(workList, 3);
103+
assertEquals("test1", key);
104+
assertEquals(2, workList.size());
105+
assertEquals(one, workList.get(0));
106+
assertEquals(three, workList.get(1));
107+
108+
workList.clear();
109+
110+
key = this.pool.nextWorkBlock(workList, 2);
111+
assertEquals("test2", key);
112+
assertEquals(1, workList.size());
113+
assertEquals(two, workList.get(0));
114+
}
115+
116+
/**
117+
* Test removal of key (with work)
118+
* @throws Exception untested
119+
*/
120+
public void testUnregisterKey() throws Exception {
121+
Object one = new Object();
122+
Object two = new Object();
123+
Object three = new Object();
124+
125+
this.pool.registerKey("test1");
126+
this.pool.registerKey("test2");
127+
128+
assertTrue(this.pool.addWorkItem("test1", one));
129+
assertTrue(this.pool.addWorkItem("test2", two));
130+
assertFalse(this.pool.addWorkItem("test1", three));
131+
132+
this.pool.unregisterKey("test1");
133+
134+
List<Object> workList = new ArrayList<Object>(16);
135+
String key = this.pool.nextWorkBlock(workList, 3);
136+
assertEquals("test2", key);
137+
assertEquals(1, workList.size());
138+
assertEquals(two, workList.get(0));
139+
}
140+
141+
/**
142+
* Test removal of all keys (with work).
143+
* @throws Exception untested
144+
*/
145+
public void testUnregisterAllKeys() throws Exception {
146+
Object one = new Object();
147+
Object two = new Object();
148+
Object three = new Object();
149+
150+
this.pool.registerKey("test1");
151+
this.pool.registerKey("test2");
152+
153+
assertTrue(this.pool.addWorkItem("test1", one));
154+
assertTrue(this.pool.addWorkItem("test2", two));
155+
assertFalse(this.pool.addWorkItem("test1", three));
156+
157+
this.pool.unregisterAllKeys();
158+
159+
List<Object> workList = new ArrayList<Object>(16);
160+
assertNull(this.pool.nextWorkBlock(workList, 1));
161+
}
162+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package com.rabbitmq.client.test.functional;
1919

20+
import com.rabbitmq.client.impl.WorkPoolTests;
2021
import com.rabbitmq.client.test.Bug20004Test;
21-
import com.rabbitmq.client.test.impl.WorkPoolTests;
2222

2323
import junit.framework.TestCase;
2424
import junit.framework.TestSuite;

0 commit comments

Comments
 (0)