Skip to content

Commit ea4efc9

Browse files
committed
Merge branch '4.x.x-stable' into 5.1.x-stable
2 parents a925c0a + 46a7d86 commit ea4efc9

File tree

5 files changed

+83
-14
lines changed

5 files changed

+83
-14
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ public void publish(AMQP.BasicProperties props, byte[] message)
209209
}
210210

211211
public Response doCall(AMQP.BasicProperties props, byte[] message)
212+
throws IOException, TimeoutException {
213+
return doCall(props, message, _timeout);
214+
}
215+
216+
public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
212217
throws IOException, ShutdownSignalException, TimeoutException {
213218
checkConsumer();
214219
BlockingCell<Object> k = new BlockingCell<Object>();
@@ -220,7 +225,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
220225
_continuationMap.put(replyId, k);
221226
}
222227
publish(props, message);
223-
Object reply = k.uninterruptibleGet(_timeout);
228+
Object reply = k.uninterruptibleGet(timeout);
224229
if (reply instanceof ShutdownSignalException) {
225230
ShutdownSignalException sig = (ShutdownSignalException) reply;
226231
ShutdownSignalException wrapper =
@@ -238,7 +243,13 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
238243
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
239244
throws IOException, ShutdownSignalException, TimeoutException
240245
{
241-
return doCall(props, message).getBody();
246+
return primitiveCall(props, message, _timeout);
247+
}
248+
249+
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message, int timeout)
250+
throws IOException, ShutdownSignalException, TimeoutException
251+
{
252+
return doCall(props, message, timeout).getBody();
242253
}
243254

244255
/**
@@ -266,7 +277,23 @@ public byte[] primitiveCall(byte[] message)
266277
* @throws TimeoutException if a response is not received within the configured timeout
267278
*/
268279
public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException {
269-
return doCall(null, message);
280+
return responseCall(message, _timeout);
281+
}
282+
283+
/**
284+
* Perform a simple byte-array-based RPC roundtrip
285+
*
286+
* Useful if you need to get at more than just the body of the message
287+
*
288+
* @param message the byte array request message to send
289+
* @param timeout milliseconds before timing out on wait for response
290+
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
291+
* @throws ShutdownSignalException if the connection dies during our wait
292+
* @throws IOException if an error is encountered
293+
* @throws TimeoutException if a response is not received within the configured timeout
294+
*/
295+
public Response responseCall(byte[] message, int timeout) throws IOException, ShutdownSignalException, TimeoutException {
296+
return doCall(null, message, timeout);
270297
}
271298

272299
/**

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,8 +768,8 @@ public void automaticallyRecover(AutorecoveringConnection connection, Connection
768768
final RecoveryAwareChannelN newChannel = (RecoveryAwareChannelN) connDelegate.createChannel(this.getChannelNumber());
769769
if (newChannel == null)
770770
throw new IOException("Failed to create new channel for channel number=" + this.getChannelNumber() + " during recovery");
771+
newChannel.inheritOffsetFrom(defunctChannel);
771772
this.delegate = newChannel;
772-
this.delegate.inheritOffsetFrom(defunctChannel);
773773

774774
this.notifyRecoveryListenersStarted();
775775
this.recoverShutdownListeners();

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,28 @@ void deleteRecordedQueue(String queue) {
779779
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
780780
}
781781
}
782+
783+
/**
784+
* Exclude the queue from the list of queues to recover after connection failure.
785+
* Intended to be used in usecases where you want to remove the queue from this connection's recovery list but don't want to delete the queue from the server.
786+
*
787+
* @param queue queue name to exclude from recorded recovery queues
788+
* @param ifUnused if true, the RecordedQueue will only be excluded if no local consumers are using it.
789+
*/
790+
public void excludeQueueFromRecovery(final String queue, final boolean ifUnused) {
791+
if (ifUnused) {
792+
// Note: This is basically the same as maybeDeleteRecordedAutoDeleteQueue except it works for non auto-delete queues as well.
793+
synchronized (this.consumers) {
794+
synchronized (this.recordedQueues) {
795+
if (!hasMoreConsumersOnQueue(this.consumers.values(), queue)) {
796+
deleteRecordedQueue(queue);
797+
}
798+
}
799+
}
800+
} else {
801+
deleteRecordedQueue(queue);
802+
}
803+
}
782804

783805
void recordExchange(String exchange, RecordedExchange x) {
784806
this.recordedExchanges.put(exchange, x);
@@ -807,7 +829,7 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
807829
RecordedQueue q = this.recordedQueues.get(queue);
808830
// last consumer on this connection is gone, remove recorded queue
809831
// if it is auto-deleted. See bug 26364.
810-
if((q != null) && q.isAutoDelete()) {
832+
if(q != null && q.isAutoDelete()) {
811833
deleteRecordedQueue(queue);
812834
}
813835
}
@@ -822,7 +844,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
822844
RecordedExchange x = this.recordedExchanges.get(exchange);
823845
// last binding where this exchange is the source is gone, remove recorded exchange
824846
// if it is auto-deleted. See bug 26364.
825-
if((x != null) && x.isAutoDelete()) {
847+
if(x != null && x.isAutoDelete()) {
826848
deleteRecordedExchange(exchange);
827849
}
828850
}

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
* @since 3.3.0
3535
*/
3636
public class RecoveryAwareChannelN extends ChannelN {
37-
private long maxSeenDeliveryTag = 0;
38-
private long activeDeliveryTagOffset = 0;
37+
private volatile long maxSeenDeliveryTag = 0;
38+
private volatile long activeDeliveryTagOffset = 0;
3939

4040
/**
4141
* Construct a new channel on the given connection with the given
@@ -83,21 +83,19 @@ private AMQImpl.Basic.Deliver offsetDeliveryTag(AMQImpl.Basic.Deliver method) {
8383

8484
@Override
8585
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
86-
// FIXME no check if deliveryTag = 0 (ack all)
8786
long realTag = deliveryTag - activeDeliveryTagOffset;
88-
// 0 tag means ack all
89-
if (realTag >= 0) {
87+
// 0 tag means ack all when multiple is set
88+
if (realTag > 0 || (multiple && realTag == 0)) {
9089
transmit(new Basic.Ack(realTag, multiple));
9190
metricsCollector.basicAck(this, deliveryTag, multiple);
9291
}
9392
}
9493

9594
@Override
9695
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
97-
// FIXME no check if deliveryTag = 0 (nack all)
9896
long realTag = deliveryTag - activeDeliveryTagOffset;
99-
// 0 tag means nack all
100-
if (realTag >= 0) {
97+
// 0 tag means nack all when multiple is set
98+
if (realTag > 0 || (multiple && realTag == 0)) {
10199
transmit(new Basic.Nack(realTag, multiple, requeue));
102100
metricsCollector.basicNack(this, deliveryTag);
103101
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,28 @@ public void queueRecovered(String oldName, String newName) {
559559
// expected
560560
}
561561
}
562+
563+
@Test public void thatExcludedQueueDoesNotReappearOnRecover() throws IOException, InterruptedException {
564+
final String q = "java-client.test.recovery.excludedQueue1";
565+
channel.queueDeclare(q, true, false, false, null);
566+
// now delete it using the delegate so AutorecoveringConnection and AutorecoveringChannel are not aware of it
567+
((AutorecoveringChannel)channel).getDelegate().queueDelete(q);
568+
assertNotNull(((AutorecoveringConnection)connection).getRecordedQueues().get(q));
569+
// exclude the queue from recovery
570+
((AutorecoveringConnection)connection).excludeQueueFromRecovery(q, true);
571+
// verify its not there
572+
assertNull(((AutorecoveringConnection)connection).getRecordedQueues().get(q));
573+
// reconnect
574+
closeAndWaitForRecovery();
575+
expectChannelRecovery(channel);
576+
// verify queue was not recreated
577+
try {
578+
channel.queueDeclarePassive(q);
579+
fail("Expected passive declare to fail");
580+
} catch (IOException ioe) {
581+
// expected
582+
}
583+
}
562584

563585
@Test public void thatCancelledConsumerDoesNotReappearOnRecover() throws IOException, InterruptedException {
564586
String q = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)