Skip to content

Commit 48e5b95

Browse files
author
Simon MacMullen
committed
Make some synchronisation changes and explain them.
1 parent f6e086c commit 48e5b95

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,14 +1188,19 @@ public AMQCommand rpc(Method method) throws IOException {
11881188

11891189
@Override
11901190
public void enqueueRpc(RpcContinuation k) {
1191-
super.enqueueRpc(k);
1192-
dispatcher.setLimited(false);
1191+
synchronized (_channelMutex) {
1192+
super.enqueueRpc(k);
1193+
dispatcher.setLimited(false);
1194+
}
11931195
}
11941196

11951197
@Override
11961198
public RpcContinuation nextOutstandingRpc() {
1197-
dispatcher.setLimited(true);
1198-
return super.nextOutstandingRpc();
1199+
synchronized (_channelMutex) {
1200+
RpcContinuation res = super.nextOutstandingRpc();
1201+
if (res != null) dispatcher.setLimited(true);
1202+
return res;
1203+
}
11991204
}
12001205

12011206
private void handleAckNack(long seqNo, boolean multiple, boolean nack) {

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,19 @@ public class WorkPool<K, W> {
7171

7272
// This is like a LinkedBlockingQueue of limited length except you can turn the limit
7373
// on and off. And it only has the methods we need.
74-
// TODO synchronised how?
74+
//
75+
// This class is partly synchronised because:
76+
//
77+
// a) we cannot make put(T) synchronised as it may block indefinitely. Therefore we
78+
// only lock before modifying the list.
79+
// b) we don't want to make setLimited() synchronised as it is called frequently by
80+
// the channel.
81+
// c) anyway the issue with setLimited() is not that it be synchronised itself but
82+
// that calls to it should alternate between false and true. We assert this, but
83+
// it should not be able to go wrong because the RPC calls in AMQChannel and
84+
// ChannelN are all protected by the _channelMutex; we can't have more than one
85+
// outstanding RPC or finish the same RPC twice.
86+
7587
private static class WorkQueue<T> {
7688
private Semaphore semaphore;
7789
private LinkedList<T> list;
@@ -90,10 +102,12 @@ public void put(T t) throws InterruptedException {
90102
assert !semaphore.hasQueuedThreads();
91103
semaphore.acquire();
92104
}
93-
list.add(t);
105+
synchronized (this) {
106+
list.add(t);
107+
}
94108
}
95109

96-
public T poll() {
110+
public synchronized T poll() {
97111
T res = list.poll();
98112

99113
if (list.size() <= maxLengthWhenLimited && semaphore.hasQueuedThreads()) {
@@ -104,6 +118,7 @@ public T poll() {
104118
}
105119

106120
public void setLimited(boolean limited) {
121+
assert this.limited != limited;
107122
this.limited = limited;
108123
if (!limited && semaphore.hasQueuedThreads()) {
109124
semaphore.release();

0 commit comments

Comments
 (0)