Skip to content

Commit b5e5f31

Browse files
author
Matthew Sackman
committed
Merging bug 22272 into default
2 parents d4c4b73 + 36caffb commit b5e5f31

File tree

6 files changed

+432
-34
lines changed

6 files changed

+432
-34
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
8787
};
8888

8989
/** Object that manages a set of channels */
90-
public final ChannelManager _channelManager = new ChannelManager();
90+
public ChannelManager _channelManager = new ChannelManager(0);
9191

9292
/** Frame source/sink */
9393
public final FrameHandler _frameHandler;
@@ -263,7 +263,7 @@ public void start(boolean insist)
263263
int channelMax =
264264
negotiatedMaxValue(getParameters().getRequestedChannelMax(),
265265
connTune.getChannelMax());
266-
setChannelMax(channelMax);
266+
_channelManager = new ChannelManager(channelMax);
267267

268268
int frameMax =
269269
negotiatedMaxValue(getParameters().getRequestedFrameMax(),
@@ -306,13 +306,6 @@ public int getChannelMax() {
306306
return _channelManager.getChannelMax();
307307
}
308308

309-
/**
310-
* Protected API - set the max channel <b>number</b>
311-
*/
312-
public void setChannelMax(int value) {
313-
_channelManager.setChannelMax(value);
314-
}
315-
316309
/** {@inheritDoc} */
317310
public int getFrameMax() {
318311
return _frameMax;

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Set;
3939

4040
import com.rabbitmq.client.ShutdownSignalException;
41+
import com.rabbitmq.utility.IntAllocator;
4142

4243
/**
4344
* Manages a set of channels, indexed by channel number.
@@ -46,18 +47,26 @@
4647
public class ChannelManager {
4748
/** Mapping from channel number to AMQChannel instance */
4849
private final Map<Integer, ChannelN> _channelMap = Collections.synchronizedMap(new HashMap<Integer, ChannelN>());
50+
private final IntAllocator channelNumberAllocator;
4951

5052
/** Maximum channel number available on this connection. */
51-
public int _channelMax = 0;
53+
public final int _channelMax;
5254

53-
public synchronized int getChannelMax() {
54-
return _channelMax;
55+
public int getChannelMax(){
56+
return _channelMax;
5557
}
5658

57-
public synchronized void setChannelMax(int value) {
58-
_channelMax = value;
59+
public ChannelManager(int channelMax){
60+
if(channelMax == 0){
61+
// The framing encoding only allows for unsigned 16-bit integers for the channel number
62+
channelMax = (1 << 16) - 1;
63+
}
64+
65+
_channelMax = channelMax;
66+
channelNumberAllocator = new IntAllocator(1, channelMax);
5967
}
6068

69+
6170
/**
6271
* Public API - Looks up an existing channel associated with this connection.
6372
* @param channelNumber the number of the required channel
@@ -83,43 +92,41 @@ public void handleSignal(ShutdownSignalException signal) {
8392
}
8493

8594
public synchronized ChannelN createChannel(AMQConnection connection) throws IOException {
86-
int channelNumber = allocateChannelNumber(getChannelMax());
95+
int channelNumber = channelNumberAllocator.allocate();
8796
if (channelNumber == -1) {
8897
return null;
8998
}
90-
return createChannel(connection, channelNumber);
99+
return createChannelInternal(connection, channelNumber);
91100
}
92101

93102
public synchronized ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
94-
ChannelN ch = new ChannelN(connection, channelNumber);
103+
if(channelNumberAllocator.reserve(channelNumber))
104+
return createChannelInternal(connection, channelNumber);
105+
else
106+
return null;
107+
}
108+
109+
private synchronized ChannelN createChannelInternal(AMQConnection connection, int channelNumber) throws IOException {
95110
if (_channelMap.containsKey(channelNumber)) {
96-
return null; // That number's already allocated! Can't do it
111+
// That number's already allocated! Can't do it
112+
// This should never happen unless something has gone
113+
// badly wrong with our implementation.
114+
throw new IllegalStateException("We have attempted to"
115+
+ "create a channel with a number that is already in"
116+
+ "use. This should never happen. Please report this as a bug.");
97117
}
118+
ChannelN ch = new ChannelN(connection, channelNumber);
98119
addChannel(ch);
99120
ch.open(); // now that it's been added to our internal tables
100121
return ch;
101122
}
102123

103-
public synchronized int allocateChannelNumber(int maxChannels) {
104-
if (maxChannels == 0) {
105-
// The framing encoding only allows for unsigned 16-bit integers for the channel number
106-
maxChannels = (1 << 16) - 1;
107-
}
108-
int channelNumber = -1;
109-
for (int candidate = 1; candidate <= maxChannels; candidate++) {
110-
if (!_channelMap.containsKey(candidate)) {
111-
channelNumber = candidate;
112-
break;
113-
}
114-
}
115-
return channelNumber;
116-
}
117-
118124
private void addChannel(ChannelN chan) {
119125
_channelMap.put(chan.getChannelNumber(), chan);
120126
}
121127

122-
public void disconnectChannel(int channelNumber) {
128+
public synchronized void disconnectChannel(int channelNumber) {
123129
_channelMap.remove(channelNumber);
130+
channelNumberAllocator.free(channelNumber);
124131
}
125132
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
package com.rabbitmq.utility;
32+
33+
import java.util.*;
34+
35+
/**
36+
* A class for allocating integer IDs in a given range.
37+
*/
38+
public class IntAllocator{
39+
40+
// Invariant: Sorted in order of first element. Non-overlapping, non-adjacent.
41+
// This could really use being a balanced binary tree. However for normal usages
42+
// it doesn't actually matter.
43+
private LinkedList<Interval> intervals = new LinkedList<Interval>();
44+
45+
private final int[] unsorted;
46+
private int unsortedCount = 0;
47+
48+
/**
49+
* A class representing an inclusive interval from start to end.
50+
*/
51+
private static class Interval{
52+
Interval(int start, int end){
53+
this.start = start;
54+
this.end = end;
55+
}
56+
57+
int start;
58+
int end;
59+
60+
int length(){ return end - start + 1; }
61+
}
62+
63+
/**
64+
* Creates an IntAllocator allocating integer IDs within the inclusive range [start, end]
65+
*/
66+
public IntAllocator(int start, int end){
67+
if(start > end) throw new IllegalArgumentException("illegal range [" + start +", " + end + "]");
68+
69+
// Fairly arbitrary heuristic for a good size for the unsorted set.
70+
unsorted = new int[Math.max(32, (int)Math.sqrt(end - start))];
71+
intervals.add(new Interval(start, end));
72+
}
73+
74+
/**
75+
* Allocate a fresh integer from the range, or return -1 if no more integers
76+
* are available. This operation is guaranteed to run in O(1)
77+
*/
78+
public int allocate(){
79+
if(unsortedCount > 0){
80+
return unsorted[--unsortedCount];
81+
} else if (!intervals.isEmpty()) {
82+
Interval first = intervals.getFirst();
83+
if(first.length() == 1) intervals.removeFirst();
84+
return first.start++;
85+
} else {
86+
return -1;
87+
}
88+
}
89+
90+
/**
91+
* Make the provided integer available for allocation again. This operation
92+
* runs in amortized O(sqrt(range size)) time: About every sqrt(range size)
93+
* operations will take O(range_size + number of intervals) to complete and
94+
* the rest run in constant time.
95+
*
96+
* No error checking is performed, so if you double free or free an integer
97+
* that was not originally allocated the results are undefined. Sorry.
98+
*/
99+
public void free(int id){
100+
if(unsortedCount >= unsorted.length){
101+
flush();
102+
}
103+
unsorted[unsortedCount++] = id;
104+
}
105+
106+
/**
107+
* Attempt to reserve the provided ID as if it had been allocated. Returns true
108+
* if it is available, false otherwise.
109+
*
110+
* This operation runs in O(id) in the worst case scenario, though it can usually
111+
* be expected to perform better than that unless a great deal of fragmentation
112+
* has occurred.
113+
*/
114+
public boolean reserve(int id){
115+
flush();
116+
ListIterator<Interval> it = intervals.listIterator();
117+
118+
while(it.hasNext()){
119+
Interval i = it.next();
120+
if(i.start <= id && id <= i.end){
121+
if(i.length() == 1) it.remove();
122+
else if(i.start == id) i.start++;
123+
else if(i.end == id) i.end--;
124+
else {
125+
it.add(new Interval(id + 1, i.end));
126+
i.end = id - 1;
127+
}
128+
return true;
129+
}
130+
}
131+
132+
return false;
133+
}
134+
135+
private void flush(){
136+
if(unsortedCount == 0) return;
137+
138+
Arrays.sort(unsorted);
139+
140+
ListIterator<Interval> it = intervals.listIterator();
141+
142+
int i = 0;
143+
while(i < unsorted.length){
144+
int start = i;
145+
while((i < unsorted.length - 1) && (unsorted[i + 1] == unsorted[i] + 1))
146+
i++;
147+
148+
Interval interval = new Interval(start, i);
149+
150+
// Scan to an appropriate point in the list to insert this interval
151+
// this may well be the end
152+
while(it.hasNext()){
153+
if(it.next().start > interval.end){
154+
it.previous();
155+
break;
156+
}
157+
}
158+
159+
it.add(interval);
160+
i++;
161+
}
162+
163+
normalize();
164+
unsortedCount = 0;
165+
}
166+
167+
private void normalize(){
168+
if(intervals.isEmpty()) return;
169+
Iterator<Interval> it = intervals.iterator();
170+
171+
Interval trailing, leading;
172+
leading = it.next();
173+
while(it.hasNext()){
174+
trailing = leading;
175+
leading = it.next();
176+
177+
if(leading.start == trailing.end + 1) {
178+
it.remove();
179+
trailing.end = leading.end;
180+
}
181+
}
182+
}
183+
184+
@Override public String toString(){
185+
StringBuilder builder = new StringBuilder();
186+
187+
builder.append("IntAllocator{");
188+
189+
builder.append("intervals = [");
190+
Iterator<Interval> it = intervals.iterator();
191+
while(it.hasNext()){
192+
Interval i = it.next();
193+
builder.append(i.start).append("..").append(i.end);
194+
if(it.hasNext()) builder.append(", ");
195+
}
196+
builder.append("]");
197+
198+
builder.append(", unsorted = [");
199+
for(int i = 0; i < unsortedCount; i++){
200+
builder.append(unsorted[i]);
201+
if( i < unsortedCount - 1) builder.append(", ");
202+
}
203+
builder.append("]");
204+
205+
206+
builder.append("}");
207+
return builder.toString();
208+
}
209+
}

0 commit comments

Comments
 (0)