Skip to content

Commit 1a1883a

Browse files
author
Simon MacMullen
committed
Move the Semaphore up to be scoped to the WorkPool. If any WorkQueue aka channel is unlimited then they all are.
1 parent 48e5b95 commit 1a1883a

File tree

4 files changed

+63
-39
lines changed

4 files changed

+63
-39
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,15 +1190,15 @@ public AMQCommand rpc(Method method) throws IOException {
11901190
public void enqueueRpc(RpcContinuation k) {
11911191
synchronized (_channelMutex) {
11921192
super.enqueueRpc(k);
1193-
dispatcher.setLimited(false);
1193+
dispatcher.setUnlimited(true);
11941194
}
11951195
}
11961196

11971197
@Override
11981198
public RpcContinuation nextOutstandingRpc() {
11991199
synchronized (_channelMutex) {
12001200
RpcContinuation res = super.nextOutstandingRpc();
1201-
if (res != null) dispatcher.setLimited(true);
1201+
if (res != null) dispatcher.setUnlimited(false);
12021202
return res;
12031203
}
12041204
}

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public void quiesce() {
6262
this.shuttingDown = true;
6363
}
6464

65-
public void setLimited(boolean limited) {
66-
this.workService.limit(channel, limited);
65+
public void setUnlimited(boolean unlimited) {
66+
this.workService.unlimit(channel, unlimited);
6767
}
6868

6969
public void handleConsumeOk(final Consumer delegate,

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public void registerKey(Channel channel) {
5858
this.workPool.registerKey(channel);
5959
}
6060

61-
public void limit(Channel channel, boolean limited) {
62-
this.workPool.limit(channel, limited);
61+
public void unlimit(Channel channel, boolean unlimited) {
62+
this.workPool.unlimit(channel, unlimited);
6363
}
6464

6565
public void addWork(Channel channel, Runnable runnable) {

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Map;
88
import java.util.Set;
99
import java.util.concurrent.Semaphore;
10+
import java.util.concurrent.atomic.AtomicInteger;
1011

1112
/**
1213
* This is a generic implementation of the <q>Channels</q> specification
@@ -76,52 +77,52 @@ public class WorkPool<K, W> {
7677
//
7778
// a) we cannot make put(T) synchronised as it may block indefinitely. Therefore we
7879
// only lock before modifying the list.
79-
// b) we don't want to make setLimited() synchronised as it is called frequently by
80+
// b) we don't want to make setUnlimited() synchronised as it is called frequently by
8081
// the channel.
81-
// c) anyway the issue with setLimited() is not that it be synchronised itself but
82+
// c) anyway the issue with setUnlimited() is not that it be synchronised itself but
8283
// that calls to it should alternate between false and true. We assert this, but
8384
// it should not be able to go wrong because the RPC calls in AMQChannel and
8485
// ChannelN are all protected by the _channelMutex; we can't have more than one
8586
// outstanding RPC or finish the same RPC twice.
8687

87-
private static class WorkQueue<T> {
88-
private Semaphore semaphore;
89-
private LinkedList<T> list;
90-
private boolean limited;
88+
private class WorkQueue {
89+
private LinkedList<W> list;
90+
private boolean unlimited;
9191
private int maxLengthWhenLimited;
9292

9393
private WorkQueue(int maxLengthWhenLimited) {
94-
this.semaphore = new Semaphore(1);
95-
this.list = new LinkedList<T>();
96-
this.limited = true;
94+
this.list = new LinkedList<W>();
95+
this.unlimited = false; // Just for assertions
9796
this.maxLengthWhenLimited = maxLengthWhenLimited;
9897
}
9998

100-
public void put(T t) throws InterruptedException {
101-
if (limited && list.size() > maxLengthWhenLimited) {
102-
assert !semaphore.hasQueuedThreads();
103-
semaphore.acquire();
99+
public void put(W w) throws InterruptedException {
100+
if (list.size() > maxLengthWhenLimited) {
101+
acquireSemaphore();
104102
}
105103
synchronized (this) {
106-
list.add(t);
104+
list.add(w);
107105
}
108106
}
109107

110-
public synchronized T poll() {
111-
T res = list.poll();
108+
public synchronized W poll() {
109+
W res = list.poll();
112110

113-
if (list.size() <= maxLengthWhenLimited && semaphore.hasQueuedThreads()) {
114-
semaphore.release();
111+
if (list.size() <= maxLengthWhenLimited) {
112+
releaseSemaphore();
115113
}
116114

117115
return res;
118116
}
119117

120-
public void setLimited(boolean limited) {
121-
assert this.limited != limited;
122-
this.limited = limited;
123-
if (!limited && semaphore.hasQueuedThreads()) {
124-
semaphore.release();
118+
public void setUnlimited(boolean unlimited) {
119+
assert this.unlimited != unlimited;
120+
this.unlimited = unlimited;
121+
if (unlimited) {
122+
increaseUnlimited();
123+
}
124+
else {
125+
decreaseUnlimited();
125126
}
126127
}
127128

@@ -135,7 +136,31 @@ public boolean isEmpty() {
135136
/** The set of clients which have work <i>in progress</i>. */
136137
private final Set<K> inProgress = new HashSet<K>();
137138
/** The pool of registered clients, with their work queues. */
138-
private final Map<K, WorkQueue<W>> pool = new HashMap<K, WorkQueue<W>>();
139+
private final Map<K, WorkQueue> pool = new HashMap<K, WorkQueue>();
140+
141+
// The semaphore should only be used when unlimitedQueues == 0, otherwise we ignore it and
142+
// thus don't block the connection.
143+
private Semaphore semaphore = new Semaphore(1);
144+
private AtomicInteger unlimitedQueues = new AtomicInteger(0);
145+
146+
private void acquireSemaphore() throws InterruptedException {
147+
if (unlimitedQueues.get() == 0) {
148+
semaphore.acquire();
149+
}
150+
}
151+
152+
private void releaseSemaphore() {
153+
semaphore.release();
154+
}
155+
156+
private void increaseUnlimited() {
157+
unlimitedQueues.getAndIncrement();
158+
semaphore.release();
159+
}
160+
161+
private void decreaseUnlimited() {
162+
unlimitedQueues.getAndDecrement();
163+
}
139164

140165
/**
141166
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -147,16 +172,16 @@ public boolean isEmpty() {
147172
public void registerKey(K key) {
148173
synchronized (this) {
149174
if (!this.pool.containsKey(key)) {
150-
this.pool.put(key, new WorkQueue<W>(MAX_QUEUE_LENGTH));
175+
this.pool.put(key, new WorkQueue(MAX_QUEUE_LENGTH));
151176
}
152177
}
153178
}
154179

155-
public void limit(K key, boolean limited) {
180+
public void unlimit(K key, boolean unlimited) {
156181
synchronized (this) {
157-
WorkQueue<W> queue = this.pool.get(key);
182+
WorkQueue queue = this.pool.get(key);
158183
if (queue != null) {
159-
queue.setLimited(limited);
184+
queue.setUnlimited(unlimited);
160185
}
161186
}
162187
}
@@ -198,7 +223,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
198223
synchronized (this) {
199224
K nextKey = readyToInProgress();
200225
if (nextKey != null) {
201-
WorkQueue<W> queue = this.pool.get(nextKey);
226+
WorkQueue queue = this.pool.get(nextKey);
202227
drainTo(queue, to, size);
203228
}
204229
return nextKey;
@@ -207,13 +232,12 @@ public K nextWorkBlock(Collection<W> to, int size) {
207232

208233
/**
209234
* Private implementation of <code><b>drainTo</b></code> (not implemented for <code><b>LinkedList&lt;W&gt;</b></code>s).
210-
* @param <W> element type
211235
* @param deList to take (poll) elements from
212236
* @param c to add elements to
213237
* @param maxElements to take from deList
214238
* @return number of elements actually taken
215239
*/
216-
private static <W> int drainTo(WorkQueue<W> deList, Collection<W> c, int maxElements) {
240+
private int drainTo(WorkQueue deList, Collection<W> c, int maxElements) {
217241
int n = 0;
218242
while (n < maxElements) {
219243
W first = deList.poll();
@@ -235,7 +259,7 @@ private static <W> int drainTo(WorkQueue<W> deList, Collection<W> c, int maxElem
235259
* &mdash; <i>as a result of this work item</i>
236260
*/
237261
public boolean addWorkItem(K key, W item) {
238-
WorkQueue<W> queue;
262+
WorkQueue queue;
239263
synchronized (this) {
240264
queue = this.pool.get(key);
241265
}
@@ -283,7 +307,7 @@ public boolean finishWorkBlock(K key) {
283307
}
284308

285309
private boolean moreWorkItems(K key) {
286-
WorkQueue<W> leList = this.pool.get(key);
310+
WorkQueue leList = this.pool.get(key);
287311
return leList != null && !leList.isEmpty();
288312
}
289313

0 commit comments

Comments
 (0)