Skip to content

Commit 0eacae1

Browse files
committed
#354 | track publishing failures
1 parent 1f7b2c8 commit 0eacae1

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface MetricsCollector {
3636

3737
void basicPublish(Channel channel);
3838

39+
void basicPublishFailure(Channel channel);
40+
3941
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
4042

4143
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
@@ -49,4 +51,5 @@ public interface MetricsCollector {
4951
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
5052

5153
void basicCancel(Channel channel, String consumerTag);
54+
5255
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -684,22 +684,25 @@ public void basicPublish(String exchange, String routingKey,
684684
unconfirmedSet.add(getNextPublishSeqNo());
685685
nextPublishSeqNo++;
686686
}
687-
BasicProperties useProps = props;
688687
if (props == null) {
689-
useProps = MessageProperties.MINIMAL_BASIC;
688+
props = MessageProperties.MINIMAL_BASIC;
689+
}
690+
AMQCommand command = new AMQCommand(
691+
new Basic.Publish.Builder()
692+
.exchange(exchange)
693+
.routingKey(routingKey)
694+
.mandatory(mandatory)
695+
.immediate(immediate)
696+
.build(), props, body);
697+
try {
698+
transmit(command);
699+
} catch (IOException e) {
700+
metricsCollector.basicPublishFailure(this);
701+
throw e;
690702
}
691-
transmit(new AMQCommand(new Basic.Publish.Builder()
692-
.exchange(exchange)
693-
.routingKey(routingKey)
694-
.mandatory(mandatory)
695-
.immediate(immediate)
696-
.build(),
697-
useProps, body));
698703
metricsCollector.basicPublish(this);
699704
}
700705

701-
702-
703706
/** Public API - {@inheritDoc} */
704707
@Override
705708
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,

0 commit comments

Comments
 (0)