Skip to content

Commit dce0c41

Browse files
author
Matthias Radestock
committed
merge bug22423 into default
2 parents b140705 + 44ea2e3 commit dce0c41

File tree

3 files changed

+27
-0
lines changed

3 files changed

+27
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.client.AMQP.Exchange;
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
40+
import com.rabbitmq.client.impl.AMQImpl.Channel.FlowOk;
4041

4142
/**
4243
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -90,6 +91,14 @@ public interface Channel extends ShutdownNotifier {
9091
* @throws java.io.IOException if an error is encountered
9192
*/
9293
void close(int closeCode, String closeMessage) throws IOException;
94+
95+
/**
96+
* Set flow on the channel
97+
*
98+
* @param active if true, the server is asked to start sending. If false, the server is asked to stop sending.
99+
* @throws IOException
100+
*/
101+
FlowOk flow(boolean active) throws IOException;
93102

94103
/**
95104
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,4 +695,9 @@ public Tx.RollbackOk txRollback()
695695
{
696696
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
697697
}
698+
699+
/** Public API - {@inheritDoc} */
700+
public Channel.FlowOk flow(final boolean a) throws IOException {
701+
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
702+
}
698703
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,19 @@ public void testLimitInheritsUnackedCount()
359359
drain(c, 1);
360360
}
361361

362+
public void testFlow() throws IOException
363+
{
364+
QueueingConsumer c = new QueueingConsumer(channel);
365+
declareBindConsume(c);
366+
fill(1);
367+
drain(c, 1);
368+
channel.flow(false);
369+
fill(1);
370+
drain(c, 0);
371+
channel.flow(true);
372+
drain(c, 1);
373+
}
374+
362375
protected void runLimitTests(int limit,
363376
boolean multiAck,
364377
boolean txMode,

0 commit comments

Comments
 (0)