Skip to content

Commit 8fa1f70

Browse files
author
Simon MacMullen
committed
Merge in default
2 parents 61e0996 + 89ec40f commit 8fa1f70

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import java.util.Collection;
44
import java.util.HashMap;
55
import java.util.HashSet;
6-
import java.util.LinkedList;
76
import java.util.Map;
8-
import java.util.Queue;
97
import java.util.Set;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.LinkedBlockingQueue;
1010

1111
/**
1212
* This is a generic implementation of the <q>Channels</q> specification
@@ -67,12 +67,14 @@
6767
* @param <W> Work -- type of work item
6868
*/
6969
public class WorkPool<K, W> {
70+
private static final int MAX_QUEUE_LENGTH = 1000;
71+
7072
/** An injective queue of <i>ready</i> clients. */
7173
private final SetQueue<K> ready = new SetQueue<K>();
7274
/** The set of clients which have work <i>in progress</i>. */
7375
private final Set<K> inProgress = new HashSet<K>();
7476
/** The pool of registered clients, with their work queues. */
75-
private final Map<K, LinkedList<W>> pool = new HashMap<K, LinkedList<W>>();
77+
private final Map<K, BlockingQueue<W>> pool = new HashMap<K, BlockingQueue<W>>();
7678

7779
/**
7880
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -84,7 +86,7 @@ public class WorkPool<K, W> {
8486
public void registerKey(K key) {
8587
synchronized (this) {
8688
if (!this.pool.containsKey(key)) {
87-
this.pool.put(key, new LinkedList<W>());
89+
this.pool.put(key, new LinkedBlockingQueue<W>(MAX_QUEUE_LENGTH));
8890
}
8991
}
9092
}
@@ -126,7 +128,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
126128
synchronized (this) {
127129
K nextKey = readyToInProgress();
128130
if (nextKey != null) {
129-
LinkedList<W> queue = this.pool.get(nextKey);
131+
BlockingQueue<W> queue = this.pool.get(nextKey);
130132
drainTo(queue, to, size);
131133
}
132134
return nextKey;
@@ -141,7 +143,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
141143
* @param maxElements to take from deList
142144
* @return number of elements actually taken
143145
*/
144-
private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxElements) {
146+
private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int maxElements) {
145147
int n = 0;
146148
while (n < maxElements) {
147149
W first = deList.poll();
@@ -164,9 +166,14 @@ private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxEle
164166
*/
165167
public boolean addWorkItem(K key, W item) {
166168
synchronized (this) {
167-
Queue<W> queue = this.pool.get(key);
169+
BlockingQueue<W> queue = this.pool.get(key);
168170
if (queue != null) {
169-
queue.offer(item);
171+
try {
172+
queue.put(item);
173+
} catch (InterruptedException e) {
174+
// ok
175+
}
176+
170177
if (isDormant(key)) {
171178
dormantToReady(key);
172179
return true;
@@ -202,7 +209,7 @@ public boolean finishWorkBlock(K key) {
202209
}
203210

204211
private boolean moreWorkItems(K key) {
205-
LinkedList<W> leList = this.pool.get(key);
212+
BlockingQueue<W> leList = this.pool.get(key);
206213
return leList != null && !leList.isEmpty();
207214
}
208215

0 commit comments

Comments
 (0)