Skip to content

Commit 2330f71

Browse files
author
Simon MacMullen
committed
Disable the workpool capacity limit when we are awaiting an RPC reply; we must read from the socket in order to receive it!
1 parent 0485811 commit 2330f71

File tree

4 files changed

+83
-8
lines changed

4 files changed

+83
-8
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,18 @@ public AMQCommand rpc(Method method) throws IOException {
11861186
return exnWrappingRpc(method);
11871187
}
11881188

1189+
@Override
1190+
public void enqueueRpc(RpcContinuation k) {
1191+
super.enqueueRpc(k);
1192+
dispatcher.setLimited(false);
1193+
}
1194+
1195+
@Override
1196+
public RpcContinuation nextOutstandingRpc() {
1197+
dispatcher.setLimited(true);
1198+
return super.nextOutstandingRpc();
1199+
}
1200+
11891201
private void handleAckNack(long seqNo, boolean multiple, boolean nack) {
11901202
if (multiple) {
11911203
unconfirmedSet.headSet(seqNo + 1).clear();

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public void quiesce() {
6262
this.shuttingDown = true;
6363
}
6464

65+
public void setLimited(boolean limited) {
66+
this.workService.limit(channel, limited);
67+
}
68+
6569
public void handleConsumeOk(final Consumer delegate,
6670
final String consumerTag) {
6771
executeUnlessShuttingDown(

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public void registerKey(Channel channel) {
5858
this.workPool.registerKey(channel);
5959
}
6060

61+
public void limit(Channel channel, boolean limited) {
62+
this.workPool.limit(channel, limited);
63+
}
64+
6165
public void addWork(Channel channel, Runnable runnable) {
6266
if (this.workPool.addWorkItem(channel, runnable)) {
6367
this.executor.execute(new WorkPoolRunnable());

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import java.util.Collection;
44
import java.util.HashMap;
55
import java.util.HashSet;
6+
import java.util.LinkedList;
67
import java.util.Map;
78
import java.util.Set;
8-
import java.util.concurrent.BlockingQueue;
9-
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.Semaphore;
1010

1111
/**
1212
* This is a generic implementation of the <q>Channels</q> specification
@@ -69,12 +69,58 @@
6969
public class WorkPool<K, W> {
7070
private static final int MAX_QUEUE_LENGTH = 1000;
7171

72+
// This is like a LinkedBlockingQueue of limited length except you can turn the limit
73+
// on and off. And it only has the methods we need.
74+
// TODO synchronised how?
75+
private static class WorkQueue<T> {
76+
private Semaphore semaphore;
77+
private LinkedList<T> list;
78+
private boolean limited;
79+
private int maxLengthWhenLimited;
80+
81+
private WorkQueue(int maxLengthWhenLimited) {
82+
this.semaphore = new Semaphore(1);
83+
this.list = new LinkedList<T>();
84+
this.limited = true;
85+
this.maxLengthWhenLimited = maxLengthWhenLimited;
86+
}
87+
88+
public boolean put(T t) throws InterruptedException {
89+
if (limited && list.size() > maxLengthWhenLimited) {
90+
assert !semaphore.hasQueuedThreads();
91+
semaphore.acquire();
92+
}
93+
return list.offer(t);
94+
}
95+
96+
public T poll() {
97+
T res = list.poll();
98+
99+
if (list.size() <= maxLengthWhenLimited && semaphore.hasQueuedThreads()) {
100+
semaphore.release();
101+
}
102+
103+
return res;
104+
}
105+
106+
public void setLimited(boolean limited) {
107+
this.limited = limited;
108+
if (!limited && semaphore.hasQueuedThreads()) {
109+
semaphore.release();
110+
}
111+
}
112+
113+
public boolean isEmpty() {
114+
return list.isEmpty();
115+
}
116+
}
117+
72118
/** An injective queue of <i>ready</i> clients. */
73119
private final SetQueue<K> ready = new SetQueue<K>();
74120
/** The set of clients which have work <i>in progress</i>. */
75121
private final Set<K> inProgress = new HashSet<K>();
76122
/** The pool of registered clients, with their work queues. */
77-
private final Map<K, BlockingQueue<W>> pool = new HashMap<K, BlockingQueue<W>>();
123+
private final Map<K, WorkQueue<W>> pool = new HashMap<K, WorkQueue<W>>();
78124

79125
/**
80126
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -86,7 +132,16 @@ public class WorkPool<K, W> {
86132
public void registerKey(K key) {
87133
synchronized (this) {
88134
if (!this.pool.containsKey(key)) {
89-
this.pool.put(key, new LinkedBlockingQueue<W>(MAX_QUEUE_LENGTH));
135+
this.pool.put(key, new WorkQueue<W>(MAX_QUEUE_LENGTH));
136+
}
137+
}
138+
}
139+
140+
public void limit(K key, boolean limited) {
141+
synchronized (this) {
142+
WorkQueue<W> queue = this.pool.get(key);
143+
if (queue != null) {
144+
queue.setLimited(limited);
90145
}
91146
}
92147
}
@@ -128,7 +183,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
128183
synchronized (this) {
129184
K nextKey = readyToInProgress();
130185
if (nextKey != null) {
131-
BlockingQueue<W> queue = this.pool.get(nextKey);
186+
WorkQueue<W> queue = this.pool.get(nextKey);
132187
drainTo(queue, to, size);
133188
}
134189
return nextKey;
@@ -143,7 +198,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
143198
* @param maxElements to take from deList
144199
* @return number of elements actually taken
145200
*/
146-
private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int maxElements) {
201+
private static <W> int drainTo(WorkQueue<W> deList, Collection<W> c, int maxElements) {
147202
int n = 0;
148203
while (n < maxElements) {
149204
W first = deList.poll();
@@ -165,7 +220,7 @@ private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int max
165220
* &mdash; <i>as a result of this work item</i>
166221
*/
167222
public boolean addWorkItem(K key, W item) {
168-
BlockingQueue<W> queue;
223+
WorkQueue<W> queue;
169224
synchronized (this) {
170225
queue = this.pool.get(key);
171226
}
@@ -213,7 +268,7 @@ public boolean finishWorkBlock(K key) {
213268
}
214269

215270
private boolean moreWorkItems(K key) {
216-
BlockingQueue<W> leList = this.pool.get(key);
271+
WorkQueue<W> leList = this.pool.get(key);
217272
return leList != null && !leList.isEmpty();
218273
}
219274

0 commit comments

Comments
 (0)