4646
4747public class ChannelManager {
4848 /** Mapping from channel number to AMQChannel instance */
49- private final Map <Integer , ChannelN > _channelMap = Collections .synchronizedMap (new HashMap <Integer , ChannelN >());
49+ private final Map <Integer , ChannelN > _channelMap =
50+ Collections .synchronizedMap (new HashMap <Integer , ChannelN >());
5051 private final IntAllocator channelNumberAllocator ;
5152
5253 /** Maximum channel number available on this connection. */
@@ -56,22 +57,22 @@ public int getChannelMax(){
5657 return _channelMax ;
5758 }
5859
59- public ChannelManager (int channelMax ){
60- if (channelMax == 0 ){
61- // The framing encoding only allows for unsigned 16-bit integers for the channel number
62- channelMax = ( 1 << 16 ) - 1 ;
63- }
64-
65- _channelMax = channelMax ;
66- channelNumberAllocator = new IntAllocator (1 , channelMax );
60+ public ChannelManager (int channelMax ) {
61+ if (channelMax == 0 ) {
62+ // The framing encoding only allows for unsigned 16-bit integers
63+ // for the channel number
64+ channelMax = ( 1 << 16 ) - 1 ;
65+ }
66+ _channelMax = channelMax ;
67+ channelNumberAllocator = new IntAllocator (1 , channelMax );
6768 }
6869
69-
70+
7071 /**
7172 * Public API - Looks up an existing channel associated with this connection.
7273 * @param channelNumber the number of the required channel
7374 * @return the relevant channel descriptor
74- * @throws UnknownChannelException if there is no Channel associated with the
75+ * @throws UnknownChannelException if there is no Channel associated with the
7576 * required channel number.
7677 */
7778 public ChannelN getChannel (int channelNumber ) {
@@ -85,8 +86,8 @@ public void handleSignal(ShutdownSignalException signal) {
8586 synchronized (_channelMap ) {
8687 channels = new HashSet <ChannelN >(_channelMap .values ());
8788 }
88- for (AMQChannel channel : channels ) {
89- disconnectChannel (channel . getChannelNumber () );
89+ for (ChannelN channel : channels ) {
90+ disconnectChannel (channel );
9091 channel .processShutdownSignal (signal , true , true );
9192 }
9293 }
@@ -100,7 +101,7 @@ public synchronized ChannelN createChannel(AMQConnection connection) throws IOEx
100101 }
101102
102103 public synchronized ChannelN createChannel (AMQConnection connection , int channelNumber ) throws IOException {
103- if (channelNumberAllocator .reserve (channelNumber ))
104+ if (channelNumberAllocator .reserve (channelNumber ))
104105 return createChannelInternal (connection , channelNumber );
105106 else
106107 return null ;
@@ -110,10 +111,10 @@ private synchronized ChannelN createChannelInternal(AMQConnection connection, in
110111 if (_channelMap .containsKey (channelNumber )) {
111112 // That number's already allocated! Can't do it
112113 // This should never happen unless something has gone
113- // badly wrong with our implementation.
114- throw new IllegalStateException ("We have attempted to"
115- + "create a channel with a number that is already in"
116- + "use. This should never happen. Please report this as a bug." );
114+ // badly wrong with our implementation.
115+ throw new IllegalStateException ("We have attempted to "
116+ + "create a channel with a number that is already in "
117+ + "use. This should never happen. Please report this as a bug." );
117118 }
118119 ChannelN ch = new ChannelN (connection , channelNumber );
119120 addChannel (ch );
@@ -125,8 +126,36 @@ private void addChannel(ChannelN chan) {
125126 _channelMap .put (chan .getChannelNumber (), chan );
126127 }
127128
128- public synchronized void disconnectChannel (int channelNumber ) {
129- _channelMap .remove (channelNumber );
130- channelNumberAllocator .free (channelNumber );
129+ /**
130+ * Remove the argument channel from the channel map.
131+ * This method must be safe to call multiple times on the same channel. If
132+ * it is not then things go badly wrong.
133+ */
134+ public synchronized void disconnectChannel (ChannelN channel ) {
135+ int channelNumber = channel .getChannelNumber ();
136+
137+ // Warning, here be dragons. Not great big ones, but little baby ones
138+ // which will nibble on your toes and occasionally trip you up when
139+ // you least expect it.
140+ // Basically, there's a race that can end us up here. It almost never
141+ // happens, but it's easier to repair it when it does than prevent it
142+ // from happening in the first place.
143+ // If we end up doing a Channel.close in one thread and a Channel.open
144+ // with the same channel number in another, the two can overlap in such
145+ // a way as to cause disconnectChannel on the old channel to try to
146+ // remove the new one. Ideally we would fix this race at the source,
147+ // but it's much easier to just catch it here.
148+ synchronized (_channelMap ) {
149+ ChannelN existing = _channelMap .remove (channelNumber );
150+ // Nothing to do here. Move along.
151+ if (existing == null ) return ;
152+ // Oops, we've gone and stomped on someone else's channel. Put it back
153+ // and pretend we didn't touch it.
154+ else if (existing != channel ) {
155+ _channelMap .put (channelNumber , existing );
156+ return ;
157+ }
158+ channelNumberAllocator .free (channelNumber );
159+ }
131160 }
132161}
0 commit comments