Skip to content

Commit 89ec40f

Browse files
author
Simon MacMullen
committed
Naively use a maximum-length queue in the WorkPool.
1 parent 3f80149 commit 89ec40f

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import java.util.Collection;
44
import java.util.HashMap;
55
import java.util.HashSet;
6-
import java.util.LinkedList;
76
import java.util.Map;
8-
import java.util.Queue;
97
import java.util.Set;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.LinkedBlockingQueue;
1010

1111
/**
1212
* This is a generic implementation of the <q>Channels</q> specification
@@ -75,7 +75,9 @@ public class WorkPool<K, W> {
7575
/** The set of clients which have work <i>in progress</i>. */
7676
private final Set<K> inProgress = new HashSet<K>();
7777
/** The pool of registered clients, with their work queues. */
78-
private final Map<K, LinkedList<W>> pool = new HashMap<K, LinkedList<W>>();
78+
private final Map<K, BlockingQueue<W>> pool = new HashMap<K, BlockingQueue<W>>();
79+
80+
private int MAX_QUEUE_LENGTH = 1000;
7981

8082
/**
8183
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -87,7 +89,7 @@ public class WorkPool<K, W> {
8789
public void registerKey(K key) {
8890
synchronized (this.monitor) {
8991
if (!this.pool.containsKey(key)) {
90-
this.pool.put(key, new LinkedList<W>());
92+
this.pool.put(key, new LinkedBlockingQueue<W>(MAX_QUEUE_LENGTH));
9193
}
9294
}
9395
}
@@ -129,7 +131,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
129131
synchronized (this.monitor) {
130132
K nextKey = readyToInProgress();
131133
if (nextKey != null) {
132-
LinkedList<W> queue = this.pool.get(nextKey);
134+
BlockingQueue<W> queue = this.pool.get(nextKey);
133135
drainTo(queue, to, size);
134136
}
135137
return nextKey;
@@ -144,7 +146,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
144146
* @param maxElements to take from deList
145147
* @return number of elements actually taken
146148
*/
147-
private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxElements) {
149+
private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int maxElements) {
148150
int n = 0;
149151
while (n < maxElements) {
150152
W first = deList.poll();
@@ -167,9 +169,14 @@ private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxEle
167169
*/
168170
public boolean addWorkItem(K key, W item) {
169171
synchronized (this.monitor) {
170-
Queue<W> queue = this.pool.get(key);
172+
BlockingQueue<W> queue = this.pool.get(key);
171173
if (queue != null) {
172-
queue.offer(item);
174+
try {
175+
queue.put(item);
176+
} catch (InterruptedException e) {
177+
// ok
178+
}
179+
173180
if (isDormant(key)) {
174181
dormantToReady(key);
175182
return true;
@@ -205,7 +212,7 @@ public boolean finishWorkBlock(K key) {
205212
}
206213

207214
private boolean moreWorkItems(K key) {
208-
LinkedList<W> leList = this.pool.get(key);
215+
BlockingQueue<W> leList = this.pool.get(key);
209216
return (leList==null ? false : !leList.isEmpty());
210217
}
211218

0 commit comments

Comments
 (0)