Skip to content

Commit da33f93

Browse files
Record exchanges declared with Channel#exchangeDeclareNowait
1 parent 96fa456 commit da33f93

File tree

2 files changed

+44
-0
lines changed

2 files changed

+44
-0
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boo
192192

193193
@Override
194194
public void exchangeDeclareNowait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
195+
RecordedExchange x = new RecordedExchange(this, exchange).
196+
type(type).
197+
durable(durable).
198+
autoDelete(autoDelete).
199+
arguments(arguments);
200+
recordExchange(exchange, x);
195201
delegate.exchangeDeclareNowait(exchange, type, durable, autoDelete, internal, arguments);
196202
}
197203

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,26 @@ public void handleNack(long deliveryTag, boolean multiple) throws IOException {
146146
wait(latch);
147147
}
148148

149+
public void testExchangeRecovery() throws IOException, InterruptedException, TimeoutException {
150+
Channel ch = connection.createChannel();
151+
String x = "java-client.test.recovery.x1";
152+
declareExchange(ch, x);
153+
closeAndWaitForRecovery();
154+
expectChannelRecovery(ch);
155+
expectExchangeRecovery(ch, x);
156+
ch.exchangeDelete(x);
157+
}
158+
159+
public void testExchangeRecoveryWithNoWait() throws IOException, InterruptedException, TimeoutException {
160+
Channel ch = connection.createChannel();
161+
String x = "java-client.test.recovery.x1-nowait";
162+
declareExchangeNoWait(ch, x);
163+
closeAndWaitForRecovery();
164+
expectChannelRecovery(ch);
165+
expectExchangeRecovery(ch, x);
166+
ch.exchangeDelete(x);
167+
}
168+
149169
public void testClientNamedQueueRecovery() throws IOException, InterruptedException, TimeoutException {
150170
Channel ch = connection.createChannel();
151171
String q = "java-client.test.recovery.q1";
@@ -357,6 +377,14 @@ private AMQP.Queue.DeclareOk declareClientNamedQueue(Channel ch, String q) throw
357377
return ch.queueDeclare(q, true, false, false, null);
358378
}
359379

380+
private AMQP.Exchange.DeclareOk declareExchange(Channel ch, String x) throws IOException {
381+
return ch.exchangeDeclare(x, "fanout", false);
382+
}
383+
384+
private void declareExchangeNoWait(Channel ch, String x) throws IOException {
385+
ch.exchangeDeclareNowait(x, "fanout", false, false, false, null);
386+
}
387+
360388
private void expectQueueRecovery(Channel ch, String q) throws IOException, InterruptedException, TimeoutException {
361389
ch.confirmSelect();
362390
ch.queuePurge(q);
@@ -368,6 +396,16 @@ private void expectQueueRecovery(Channel ch, String q) throws IOException, Inter
368396
assertEquals(1, ok2.getMessageCount());
369397
}
370398

399+
private void expectExchangeRecovery(Channel ch, String x) throws IOException, InterruptedException, TimeoutException {
400+
ch.confirmSelect();
401+
String q = ch.queueDeclare().getQueue();
402+
final String rk = "routing-key";
403+
ch.queueBind(q, x, rk);
404+
ch.basicPublish(x, rk, null, "msg".getBytes());
405+
waitForConfirms(ch);
406+
ch.exchangeDeclarePassive(x);
407+
}
408+
371409
private CountDownLatch prepareForRecovery(Connection conn) {
372410
final CountDownLatch latch = new CountDownLatch(1);
373411
((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() {

0 commit comments

Comments
 (0)