Skip to content

Commit 27a914a

Browse files
committed
Merge default into bug21949
2 parents 6442179 + e76d8a2 commit 27a914a

File tree

6 files changed

+96
-24
lines changed

6 files changed

+96
-24
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.client.AMQP.Exchange;
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
40+
import com.rabbitmq.client.impl.AMQImpl.Channel.FlowOk;
4041

4142
/**
4243
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -90,6 +91,14 @@ public interface Channel extends ShutdownNotifier {
9091
* @throws java.io.IOException if an error is encountered
9192
*/
9293
void close(int closeCode, String closeMessage) throws IOException;
94+
95+
/**
96+
* Set flow on the channel
97+
*
98+
* @param active if true, the server is asked to start sending. If false, the server is asked to stop sending.
99+
* @throws IOException
100+
*/
101+
FlowOk flow(boolean active) throws IOException;
93102

94103
/**
95104
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code

src/com/rabbitmq/client/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
103103
*/
104104
Address[] getKnownHosts();
105105

106+
/**
107+
* Retrieve the server properties.
108+
* @return a map of the server properties. This typically includes the product name and version of the server.
109+
*/
110+
Map<String, Object> getServerProperties();
111+
106112
/**
107113
* Create a new channel, using an internally allocated channel number.
108114
* @return a new channel descriptor, or null if none is available

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
128128
* Protected API - respond, in the driver thread, to a ShutdownSignal.
129129
* @param channelNumber the number of the channel to disconnect
130130
*/
131-
public final void disconnectChannel(int channelNumber) {
132-
_channelManager.disconnectChannel(channelNumber);
131+
public final void disconnectChannel(ChannelN channel) {
132+
_channelManager.disconnectChannel(channel);
133133
}
134134

135135
public void ensureIsOpen()
@@ -163,6 +163,9 @@ public void ensureIsOpen()
163163
private final int _requestedChannelMax, _requestedFrameMax, _requestedHeartbeat;
164164
private final Map<String, Object> _clientProperties;
165165

166+
/** Saved server properties field from connection.start */
167+
public Map<String, Object> _serverProperties;
168+
166169
/** {@inheritDoc} */
167170
public String getHost() {
168171
return _frameHandler.getHost();
@@ -182,6 +185,11 @@ public FrameHandler getFrameHandler(){
182185
return _frameHandler;
183186
}
184187

188+
/** {@inheritDoc} */
189+
public Map<String, Object> getServerProperties() {
190+
return _serverProperties;
191+
}
192+
185193
/**
186194
* Construct a new connection to a broker.
187195
* @param factory the initialization parameters for a connection
@@ -261,6 +269,8 @@ public void start(boolean insist)
261269
try {
262270
AMQP.Connection.Start connStart =
263271
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
272+
273+
_serverProperties = connStart.getServerProperties();
264274

265275
Version serverVersion =
266276
new Version(connStart.getVersionMajor(),

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646

4747
public class ChannelManager {
4848
/** Mapping from channel number to AMQChannel instance */
49-
private final Map<Integer, ChannelN> _channelMap = Collections.synchronizedMap(new HashMap<Integer, ChannelN>());
49+
private final Map<Integer, ChannelN> _channelMap =
50+
Collections.synchronizedMap(new HashMap<Integer, ChannelN>());
5051
private final IntAllocator channelNumberAllocator;
5152

5253
/** Maximum channel number available on this connection. */
@@ -56,22 +57,22 @@ public int getChannelMax(){
5657
return _channelMax;
5758
}
5859

59-
public ChannelManager(int channelMax){
60-
if(channelMax == 0){
61-
// The framing encoding only allows for unsigned 16-bit integers for the channel number
62-
channelMax = (1 << 16) - 1;
63-
}
64-
65-
_channelMax = channelMax;
66-
channelNumberAllocator = new IntAllocator(1, channelMax);
60+
public ChannelManager(int channelMax) {
61+
if (channelMax == 0) {
62+
// The framing encoding only allows for unsigned 16-bit integers
63+
// for the channel number
64+
channelMax = (1 << 16) - 1;
65+
}
66+
_channelMax = channelMax;
67+
channelNumberAllocator = new IntAllocator(1, channelMax);
6768
}
6869

69-
70+
7071
/**
7172
* Public API - Looks up an existing channel associated with this connection.
7273
* @param channelNumber the number of the required channel
7374
* @return the relevant channel descriptor
74-
* @throws UnknownChannelException if there is no Channel associated with the
75+
* @throws UnknownChannelException if there is no Channel associated with the
7576
* required channel number.
7677
*/
7778
public ChannelN getChannel(int channelNumber) {
@@ -85,8 +86,8 @@ public void handleSignal(ShutdownSignalException signal) {
8586
synchronized(_channelMap) {
8687
channels = new HashSet<ChannelN>(_channelMap.values());
8788
}
88-
for (AMQChannel channel : channels) {
89-
disconnectChannel(channel.getChannelNumber());
89+
for (ChannelN channel : channels) {
90+
disconnectChannel(channel);
9091
channel.processShutdownSignal(signal, true, true);
9192
}
9293
}
@@ -100,7 +101,7 @@ public synchronized ChannelN createChannel(AMQConnection connection) throws IOEx
100101
}
101102

102103
public synchronized ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
103-
if(channelNumberAllocator.reserve(channelNumber))
104+
if(channelNumberAllocator.reserve(channelNumber))
104105
return createChannelInternal(connection, channelNumber);
105106
else
106107
return null;
@@ -110,10 +111,10 @@ private synchronized ChannelN createChannelInternal(AMQConnection connection, in
110111
if (_channelMap.containsKey(channelNumber)) {
111112
// That number's already allocated! Can't do it
112113
// This should never happen unless something has gone
113-
// badly wrong with our implementation.
114-
throw new IllegalStateException("We have attempted to"
115-
+ "create a channel with a number that is already in"
116-
+ "use. This should never happen. Please report this as a bug.");
114+
// badly wrong with our implementation.
115+
throw new IllegalStateException("We have attempted to "
116+
+ "create a channel with a number that is already in "
117+
+ "use. This should never happen. Please report this as a bug.");
117118
}
118119
ChannelN ch = new ChannelN(connection, channelNumber);
119120
addChannel(ch);
@@ -125,8 +126,36 @@ private void addChannel(ChannelN chan) {
125126
_channelMap.put(chan.getChannelNumber(), chan);
126127
}
127128

128-
public synchronized void disconnectChannel(int channelNumber) {
129-
_channelMap.remove(channelNumber);
130-
channelNumberAllocator.free(channelNumber);
129+
/**
130+
* Remove the argument channel from the channel map.
131+
* This method must be safe to call multiple times on the same channel. If
132+
* it is not then things go badly wrong.
133+
*/
134+
public synchronized void disconnectChannel(ChannelN channel) {
135+
int channelNumber = channel.getChannelNumber();
136+
137+
// Warning, here be dragons. Not great big ones, but little baby ones
138+
// which will nibble on your toes and occasionally trip you up when
139+
// you least expect it.
140+
// Basically, there's a race that can end us up here. It almost never
141+
// happens, but it's easier to repair it when it does than prevent it
142+
// from happening in the first place.
143+
// If we end up doing a Channel.close in one thread and a Channel.open
144+
// with the same channel number in another, the two can overlap in such
145+
// a way as to cause disconnectChannel on the old channel to try to
146+
// remove the new one. Ideally we would fix this race at the source,
147+
// but it's much easier to just catch it here.
148+
synchronized (_channelMap) {
149+
ChannelN existing = _channelMap.remove(channelNumber);
150+
// Nothing to do here. Move along.
151+
if (existing == null) return;
152+
// Oops, we've gone and stomped on someone else's channel. Put it back
153+
// and pretend we didn't touch it.
154+
else if (existing != channel) {
155+
_channelMap.put(channelNumber, existing);
156+
return;
157+
}
158+
channelNumberAllocator.free(channelNumber);
159+
}
131160
}
132161
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void broadcastShutdownSignal(ShutdownSignalException signal) {
166166
}
167167

168168
public void releaseChannelNumber() {
169-
_connection.disconnectChannel(_channelNumber);
169+
_connection.disconnectChannel(this);
170170
}
171171

172172
/**
@@ -695,4 +695,9 @@ public Tx.RollbackOk txRollback()
695695
{
696696
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
697697
}
698+
699+
/** Public API - {@inheritDoc} */
700+
public Channel.FlowOk flow(final boolean a) throws IOException {
701+
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
702+
}
698703
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,19 @@ public void testLimitInheritsUnackedCount()
359359
drain(c, 1);
360360
}
361361

362+
public void testFlow() throws IOException
363+
{
364+
QueueingConsumer c = new QueueingConsumer(channel);
365+
declareBindConsume(c);
366+
fill(1);
367+
drain(c, 1);
368+
channel.flow(false);
369+
fill(1);
370+
drain(c, 0);
371+
channel.flow(true);
372+
drain(c, 1);
373+
}
374+
362375
protected void runLimitTests(int limit,
363376
boolean multiAck,
364377
boolean txMode,

0 commit comments

Comments
 (0)