Skip to content

Commit 79d27e5

Browse files
author
Steve Powell
committed
Modify WorkPool so as not to complain so loudly if the channel is not in the pool.
Modify WorkPoolTests to check this correctly. Small JavaDoc updates in the area.
1 parent c9942b1 commit 79d27e5

File tree

4 files changed

+20
-24
lines changed

4 files changed

+20
-24
lines changed

src/com/rabbitmq/client/impl/SetQueue.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/** A generic queue-like implementation (supporting operations <code>addIfNotPresent</code>,
99
* <code>poll</code>, <code>contains</code>, and <code>isEmpty</code>)
1010
* which restricts a queue element to appear at most once.
11-
* If the element is already present {@link #addIfNotPresent(T)} returns <code><b>false</b></code>.
11+
* If the element is already present {@link #addIfNotPresent} returns <code><b>false</b></code>.
1212
* <p/>
1313
* Elements must not be <code><b>null</b></code>.
1414
* <p/><b>Concurrent Semantics</b><br/>
@@ -43,7 +43,8 @@ public T poll() {
4343
return item;
4444
}
4545

46-
/** @return <code><b>true</b></code> if and only if <b>item</b> is in the queue.*/
46+
/** @param item to look for in queue
47+
* @return <code><b>true</b></code> if and only if <b>item</b> is in the queue.*/
4748
public boolean contains(T item) {
4849
return this.members.contains(item);
4950
}
@@ -54,6 +55,7 @@ public boolean isEmpty() {
5455
}
5556

5657
/** Remove item from queue, if present.
58+
* @param item to remove
5759
* @return <code><b>true</b></code> if and only if item was initially present and was removed.
5860
*/
5961
public boolean remove(T item) {

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@
6363
* </pre>
6464
* <i>dormant</i> is not represented in the implementation state, and adding items
6565
* when the client is <i>in progress</i> or <i>ready</i> does not change its state.
66+
* @param <K> Key -- type of client
67+
* @param <W> Work -- type of work item
6668
*/
6769
public class WorkPool<K, W> {
6870

6971
/** protecting <code>ready</code>, <code>inProgress</code> and <code>pool</code> */
7072
private final Object monitor = new Object();
71-
/** An ordered queue of <i>ready</i> clients. */
73+
/** An injective queue of <i>ready</i> clients. */
7274
private final SetQueue<K> ready = new SetQueue<K>();
7375
/** The set of clients which have work <i>in progress</i>. */
7476
private final Set<K> inProgress = new HashSet<K>();
@@ -79,7 +81,7 @@ public class WorkPool<K, W> {
7981
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
8082
* A client is initially <i>dormant</i>.
8183
* <p/>
82-
* No-op if <code><b>key</b></code> already present.
84+
* No-op if <code><b>key</b></code> already present.
8385
* @param key client to add to pool
8486
*/
8587
public void registerKey(K key) {
@@ -162,18 +164,16 @@ private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxEle
162164
* @param item the work item to add to the client queue
163165
* @return <code><b>true</b></code> if and only if the client is marked <i>ready</i>
164166
* &mdash; <i>as a result of this work item</i>
165-
* @throws IllegalArgumentException if key not registered.
166167
*/
167168
public boolean addWorkItem(K key, W item) {
168169
synchronized (this.monitor) {
169170
Queue<W> queue = this.pool.get(key);
170-
if (queue == null) {
171-
throw new IllegalArgumentException("Client " + key + " not registered");
172-
}
173-
queue.offer(item);
174-
if (isDormant(key)) {
175-
dormantToReady(key);
176-
return true;
171+
if (queue != null) {
172+
queue.offer(item);
173+
if (isDormant(key)) {
174+
dormantToReady(key);
175+
return true;
176+
}
177177
}
178178
return false;
179179
}
@@ -208,7 +208,7 @@ private boolean moreWorkItems(K key) {
208208
LinkedList<W> leList = this.pool.get(key);
209209
return (leList==null ? false : !leList.isEmpty());
210210
}
211-
211+
212212
/* State identification functions */
213213
private boolean isInProgress(K key){ return this.inProgress.contains(key); }
214214
private boolean isReady(K key){ return this.ready.contains(key); }

test/src/com/rabbitmq/client/test/functional/RequeueOnClose.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
// Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
1515
//
1616

17-
1817
package com.rabbitmq.client.test.functional;
1918

2019
import com.rabbitmq.client.test.BrokerTestCase;
@@ -27,9 +26,9 @@
2726
public abstract class RequeueOnClose
2827
extends BrokerTestCase
2928
{
30-
public static final String Q = "RequeueOnClose";
31-
public static final int GRATUITOUS_DELAY = 100;
32-
public static final int MESSAGE_COUNT = 2000;
29+
private static final String Q = "RequeueOnClose";
30+
private static final int GRATUITOUS_DELAY = 100;
31+
private static final int MESSAGE_COUNT = 2000;
3332

3433
protected abstract void open() throws IOException;
3534

test/src/com/rabbitmq/client/test/impl/WorkPoolTests.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,8 @@ public class WorkPoolTests extends TestCase {
1212

1313
private WorkPool<String, Object> pool = new WorkPool<String, Object>();
1414

15-
public void testUnkownKey() {
16-
try {
17-
this.pool.addWorkItem("test", new Object());
18-
fail("Expected IllegalArgumentException");
19-
} catch (IllegalArgumentException e) {
20-
// expected
21-
}
15+
public void testUnknownKey() {
16+
assertFalse(this.pool.addWorkItem("test", new Object()));
2217
}
2318

2419
public void testBasicInOut() throws InterruptedException {

0 commit comments

Comments
 (0)