Skip to content

Commit 7b4a6cc

Browse files
author
Simon MacMullen
committed
Move synchronous basic.recover to the Java client.
1 parent 61d6c27 commit 7b4a6cc

File tree

6 files changed

+89
-13
lines changed

6 files changed

+89
-13
lines changed

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<pathelement path="${test.javac.out}"/>
2727
</path>
2828

29-
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json"/>
29+
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json ${codegen.dir}/unbind-0.8.json ${codegen.dir}/recover-0.8.json"/>
3030

3131
<target name="amqp-generate-check" description="check if codegen needs to be run">
3232
<uptodate property="amqp.generate.notRequired">

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,12 +478,22 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
478478
* Ask the broker to resend unacknowledged messages. In 0-8
479479
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
480480
* the new, deprecated method basic.recover_async is asynchronous.
481-
* To avoid this API changing, this is named for the latter, and
482-
* will be deprecated.
483481
* @param requeue If true, messages will be requeued and possibly
484482
* delivered to a different consumer. If false, messages will be
485483
* redelivered to the same consumer.
486484
*/
485+
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
486+
487+
/**
488+
* Ask the broker to resend unacknowledged messages. In 0-8
489+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
490+
* the new, deprecated method basic.recover_async is asynchronous
491+
* and deprecated.
492+
* @param requeue If true, messages will be requeued and possibly
493+
* delivered to a different consumer. If false, messages will be
494+
* redelivered to the same consumer.
495+
*/
496+
@Deprecated
487497
void basicRecoverAsync(boolean requeue) throws IOException;
488498

489499
/**

src/com/rabbitmq/client/Consumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public interface Consumer {
7070
*/
7171
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
7272

73+
/**
74+
* Called when the
75+
*/
76+
void handleRecoverOk();
77+
7378
/**
7479
* Called when a delivery appears for this consumer.
7580
* @param consumerTag the defined consumerTag (either client- or server-generated)

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
7474
// no work to do
7575
}
7676

77+
/**
78+
* No-op implementation of {@link Consumer#handleRecoverOk}.
79+
*/
80+
public void handleRecoverOk() {
81+
// no work to do
82+
}
83+
7784
/**
7885
* No-op implementation of {@link Consumer#handleDelivery}.
7986
*/

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,15 @@ public void releaseChannelNumber() {
258258
_channelMutex.notifyAll();
259259
}
260260
return true;
261+
} else if (method instanceof Basic.RecoverOk) {
262+
for (Consumer callback: _consumers.values()) {
263+
callback.handleRecoverOk();
264+
}
265+
266+
// Unlike all the other cases we still want this RecoverOk to
267+
// be handled by whichever RPC continuation invoked Recover,
268+
// so return false
269+
return false;
261270
} else {
262271
return false;
263272
}
@@ -672,11 +681,19 @@ public Consumer transformReply(AMQCommand replyCommand) {
672681
}
673682
}
674683

684+
/** Public API - {@inheritDoc} */
685+
public Basic.RecoverOk basicRecover(boolean requeue)
686+
throws IOException
687+
{
688+
return (Basic.RecoverOk) exnWrappingRpc(new Basic.Recover(requeue)).getMethod();
689+
}
690+
691+
675692
/** Public API - {@inheritDoc} */
676693
public void basicRecoverAsync(boolean requeue)
677694
throws IOException
678695
{
679-
transmit(new Basic.Recover(requeue));
696+
transmit(new Basic.RecoverAsync(requeue));
680697
}
681698

682699
/** Public API - {@inheritDoc} */

test/src/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,48 +36,85 @@
3636

3737
import com.rabbitmq.client.AMQP;
3838
import com.rabbitmq.client.QueueingConsumer;
39+
import com.rabbitmq.client.Channel;
3940

4041
import com.rabbitmq.client.test.BrokerTestCase;
4142

4243
public class Recover extends BrokerTestCase {
4344

4445
String queue;
4546
byte[] body = "message".getBytes();
46-
47+
4748
public void createResources() throws IOException {
4849
AMQP.Queue.DeclareOk ok = channel.queueDeclare();
4950
queue = ok.getQueue();
5051
}
5152

52-
public void testRedeliverOnRecover() throws IOException, InterruptedException {
53+
static interface RecoverCallback {
54+
void recover(Channel channel) throws IOException;
55+
}
56+
57+
// The AMQP specification under-specifies the behaviour when
58+
// requeue=false. So we can't really test any scenarios for
59+
// requeue=false.
60+
61+
void verifyRedeliverOnRecover(RecoverCallback call)
62+
throws IOException, InterruptedException {
5363
QueueingConsumer consumer = new QueueingConsumer(channel);
5464
channel.basicConsume(queue, false, consumer); // require acks.
5565
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
5666
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
5767
assertTrue("consumed message body not as sent",
5868
Arrays.equals(body, delivery.getBody()));
5969
// Don't ack it, and get it redelivered to the same consumer
60-
channel.basicRecoverAsync(true);
70+
call.recover(channel);
6171
QueueingConsumer.Delivery secondDelivery = consumer.nextDelivery(5000);
6272
assertNotNull("timed out waiting for redelivered message", secondDelivery);
6373
assertTrue("consumed (redelivered) message body not as sent",
64-
Arrays.equals(body, delivery.getBody()));
74+
Arrays.equals(body, delivery.getBody()));
6575
}
6676

67-
public void testNoRedeliveryWithAutoAck() throws IOException, InterruptedException {
77+
void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
78+
throws IOException, InterruptedException {
6879
QueueingConsumer consumer = new QueueingConsumer(channel);
6980
channel.basicConsume(queue, true, consumer); // auto ack.
7081
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
7182
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
7283
assertTrue("consumed message body not as sent",
7384
Arrays.equals(body, delivery.getBody()));
74-
channel.basicRecoverAsync(true);
85+
call.recover(channel);
7586
// there's a race here between our recover finishing and the basic.get;
7687
Thread.sleep(500);
7788
assertNull("should be no message available", channel.basicGet(queue, true));
7889
}
7990

80-
// The AMQP specification under-specifies the behaviour when
81-
// requeue=false. So we can't really test any scenarios for
82-
// requeue=false.
91+
RecoverCallback recoverAsync = new RecoverCallback() {
92+
public void recover(Channel channel) throws IOException {
93+
channel.basicRecoverAsync(true);
94+
}
95+
};
96+
97+
RecoverCallback recoverSync = new RecoverCallback() {
98+
public void recover(Channel channel) throws IOException {
99+
channel.basicRecover(true);
100+
}
101+
};
102+
103+
public void testRedeliverOnRecoverAsync() throws IOException, InterruptedException {
104+
verifyRedeliverOnRecover(recoverAsync);
105+
}
106+
107+
public void testRedeliveryOnRecover() throws IOException, InterruptedException {
108+
verifyRedeliverOnRecover(recoverSync);
109+
}
110+
111+
public void testNoRedeliveryWithAutoAckAsync()
112+
throws IOException, InterruptedException {
113+
verifyNoRedeliveryWithAutoAck(recoverAsync);
114+
}
115+
116+
public void testNoRedeliveryWithAutoAck()
117+
throws IOException, InterruptedException {
118+
verifyNoRedeliveryWithAutoAck(recoverSync);
119+
}
83120
}

0 commit comments

Comments
 (0)