33import java .util .Collection ;
44import java .util .HashMap ;
55import java .util .HashSet ;
6+ import java .util .LinkedList ;
67import java .util .Map ;
78import java .util .Set ;
8- import java .util .concurrent .BlockingQueue ;
9- import java .util .concurrent .LinkedBlockingQueue ;
9+ import java .util .concurrent .Semaphore ;
10+ import java .util .concurrent .atomic . AtomicInteger ;
1011
1112/**
1213 * This is a generic implementation of the <q>Channels</q> specification
6970public class WorkPool <K , W > {
7071 private static final int MAX_QUEUE_LENGTH = 1000 ;
7172
73+ // This is like a LinkedBlockingQueue of limited length except you can turn the limit
74+ // on and off. And it only has the methods we need.
75+ //
76+ // This class is partly synchronised because:
77+ //
78+ // a) we cannot make put(T) synchronised as it may block indefinitely. Therefore we
79+ // only lock before modifying the list.
80+ // b) we don't want to make setUnlimited() synchronised as it is called frequently by
81+ // the channel.
82+ // c) anyway the issue with setUnlimited() is not that it be synchronised itself but
83+ // that calls to it should alternate between false and true. We assert this, but
84+ // it should not be able to go wrong because the RPC calls in AMQChannel and
85+ // ChannelN are all protected by the _channelMutex; we can't have more than one
86+ // outstanding RPC or finish the same RPC twice.
87+
88+ private class WorkQueue {
89+ private LinkedList <W > list ;
90+ private boolean unlimited ;
91+ private int maxLengthWhenLimited ;
92+
93+ private WorkQueue (int maxLengthWhenLimited ) {
94+ this .list = new LinkedList <W >();
95+ this .unlimited = false ; // Just for assertions
96+ this .maxLengthWhenLimited = maxLengthWhenLimited ;
97+ }
98+
99+ public void put (W w ) throws InterruptedException {
100+ if (list .size () > maxLengthWhenLimited ) {
101+ acquireSemaphore ();
102+ }
103+ synchronized (this ) {
104+ list .add (w );
105+ }
106+ }
107+
108+ public synchronized W poll () {
109+ W res = list .poll ();
110+
111+ if (list .size () <= maxLengthWhenLimited ) {
112+ releaseSemaphore ();
113+ }
114+
115+ return res ;
116+ }
117+
118+ public void setUnlimited (boolean unlimited ) {
119+ assert this .unlimited != unlimited ;
120+ this .unlimited = unlimited ;
121+ if (unlimited ) {
122+ increaseUnlimited ();
123+ }
124+ else {
125+ decreaseUnlimited ();
126+ }
127+ }
128+
129+ public boolean isEmpty () {
130+ return list .isEmpty ();
131+ }
132+ }
133+
72134 /** An injective queue of <i>ready</i> clients. */
73135 private final SetQueue <K > ready = new SetQueue <K >();
74136 /** The set of clients which have work <i>in progress</i>. */
75137 private final Set <K > inProgress = new HashSet <K >();
76138 /** The pool of registered clients, with their work queues. */
77- private final Map <K , BlockingQueue <W >> pool = new HashMap <K , BlockingQueue <W >>();
139+ private final Map <K , WorkQueue > pool = new HashMap <K , WorkQueue >();
140+
141+ // The semaphore should only be used when unlimitedQueues == 0, otherwise we ignore it and
142+ // thus don't block the connection.
143+ private Semaphore semaphore = new Semaphore (1 );
144+ private AtomicInteger unlimitedQueues = new AtomicInteger (0 );
145+
146+ private void acquireSemaphore () throws InterruptedException {
147+ if (unlimitedQueues .get () == 0 ) {
148+ semaphore .acquire ();
149+ }
150+ }
151+
152+ private void releaseSemaphore () {
153+ semaphore .release ();
154+ }
155+
156+ private void increaseUnlimited () {
157+ unlimitedQueues .getAndIncrement ();
158+ semaphore .release ();
159+ }
160+
161+ private void decreaseUnlimited () {
162+ unlimitedQueues .getAndDecrement ();
163+ }
78164
79165 /**
80166 * Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -86,7 +172,16 @@ public class WorkPool<K, W> {
86172 public void registerKey (K key ) {
87173 synchronized (this ) {
88174 if (!this .pool .containsKey (key )) {
89- this .pool .put (key , new LinkedBlockingQueue <W >(MAX_QUEUE_LENGTH ));
175+ this .pool .put (key , new WorkQueue (MAX_QUEUE_LENGTH ));
176+ }
177+ }
178+ }
179+
180+ public void unlimit (K key , boolean unlimited ) {
181+ synchronized (this ) {
182+ WorkQueue queue = this .pool .get (key );
183+ if (queue != null ) {
184+ queue .setUnlimited (unlimited );
90185 }
91186 }
92187 }
@@ -128,7 +223,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
128223 synchronized (this ) {
129224 K nextKey = readyToInProgress ();
130225 if (nextKey != null ) {
131- BlockingQueue < W > queue = this .pool .get (nextKey );
226+ WorkQueue queue = this .pool .get (nextKey );
132227 drainTo (queue , to , size );
133228 }
134229 return nextKey ;
@@ -137,13 +232,12 @@ public K nextWorkBlock(Collection<W> to, int size) {
137232
138233 /**
139234 * Private implementation of <code><b>drainTo</b></code> (not implemented for <code><b>LinkedList<W></b></code>s).
140- * @param <W> element type
141235 * @param deList to take (poll) elements from
142236 * @param c to add elements to
143237 * @param maxElements to take from deList
144238 * @return number of elements actually taken
145239 */
146- private static < W > int drainTo (BlockingQueue < W > deList , Collection <W > c , int maxElements ) {
240+ private int drainTo (WorkQueue deList , Collection <W > c , int maxElements ) {
147241 int n = 0 ;
148242 while (n < maxElements ) {
149243 W first = deList .poll ();
@@ -165,7 +259,7 @@ private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int max
165259 * — <i>as a result of this work item</i>
166260 */
167261 public boolean addWorkItem (K key , W item ) {
168- BlockingQueue < W > queue ;
262+ WorkQueue queue ;
169263 synchronized (this ) {
170264 queue = this .pool .get (key );
171265 }
@@ -213,7 +307,7 @@ public boolean finishWorkBlock(K key) {
213307 }
214308
215309 private boolean moreWorkItems (K key ) {
216- BlockingQueue < W > leList = this .pool .get (key );
310+ WorkQueue leList = this .pool .get (key );
217311 return leList != null && !leList .isEmpty ();
218312 }
219313
0 commit comments