Skip to content

Commit 89177a8

Browse files
authored
Merge pull request #357 from slayful/rabbitmq-java-354
Track publishing failures in metrics References #354
2 parents 37b8776 + 843db2b commit 89177a8

File tree

7 files changed

+115
-21
lines changed

7 files changed

+115
-21
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface MetricsCollector {
3636

3737
void basicPublish(Channel channel);
3838

39+
void basicPublishFailure(Channel channel, Throwable cause);
40+
3941
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
4042

4143
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
@@ -49,4 +51,5 @@ public interface MetricsCollector {
4951
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
5052

5153
void basicCancel(Channel channel, String consumerTag);
54+
5255
}

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public void basicPublish(Channel channel) {
7070

7171
}
7272

73+
@Override
74+
public void basicPublishFailure(Channel channel, Throwable cause) {
75+
76+
}
77+
7378
@Override
7479
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
7580

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ public void basicPublish(Channel channel) {
102102
}
103103
}
104104

105+
@Override
106+
public void basicPublishFailure(Channel channel, Throwable cause) {
107+
try {
108+
markMessagePublishFailed();
109+
} catch(Exception e) {
110+
LOGGER.info("Error while computing metrics in basicPublishFailure: " + e.getMessage());
111+
}
112+
}
113+
105114
@Override
106115
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
107116
try {
@@ -331,6 +340,11 @@ private ChannelState(Channel channel) {
331340
*/
332341
protected abstract void markPublishedMessage();
333342

343+
/**
344+
* Marks the event of a message publishing failure.
345+
*/
346+
protected abstract void markMessagePublishFailed();
347+
334348
/**
335349
* Marks the event of a consumed message.
336350
*/

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -684,22 +684,25 @@ public void basicPublish(String exchange, String routingKey,
684684
unconfirmedSet.add(getNextPublishSeqNo());
685685
nextPublishSeqNo++;
686686
}
687-
BasicProperties useProps = props;
688687
if (props == null) {
689-
useProps = MessageProperties.MINIMAL_BASIC;
688+
props = MessageProperties.MINIMAL_BASIC;
689+
}
690+
AMQCommand command = new AMQCommand(
691+
new Basic.Publish.Builder()
692+
.exchange(exchange)
693+
.routingKey(routingKey)
694+
.mandatory(mandatory)
695+
.immediate(immediate)
696+
.build(), props, body);
697+
try {
698+
transmit(command);
699+
} catch (IOException e) {
700+
metricsCollector.basicPublishFailure(this, e);
701+
throw e;
690702
}
691-
transmit(new AMQCommand(new Basic.Publish.Builder()
692-
.exchange(exchange)
693-
.routingKey(routingKey)
694-
.mandatory(mandatory)
695-
.immediate(immediate)
696-
.build(),
697-
useProps, body));
698703
metricsCollector.basicPublish(this);
699704
}
700705

701-
702-
703706
/** Public API - {@inheritDoc} */
704707
@Override
705708
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicLong;
2828
import java.util.function.Function;
2929

30-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES;
31-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS;
32-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS;
33-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES;
34-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES;
35-
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES;
30+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.*;
3631

3732
/**
3833
* Micrometer implementation of {@link MetricsCollector}.
@@ -54,6 +49,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
5449

5550
private final Counter publishedMessages;
5651

52+
private final Counter failedToPublishMessages;
53+
5754
private final Counter consumedMessages;
5855

5956
private final Counter acknowledgedMessages;
@@ -83,6 +80,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
8380
this.consumedMessages = (Counter) metricsCreator.apply(CONSUMED_MESSAGES);
8481
this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES);
8582
this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES);
83+
this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES);
8684
}
8785

8886
@Override
@@ -110,6 +108,11 @@ protected void markPublishedMessage() {
110108
publishedMessages.increment();
111109
}
112110

111+
@Override
112+
protected void markMessagePublishFailed() {
113+
failedToPublishMessages.increment();
114+
}
115+
113116
@Override
114117
protected void markConsumedMessage() {
115118
consumedMessages.increment();
@@ -137,6 +140,10 @@ public Counter getPublishedMessages() {
137140
return publishedMessages;
138141
}
139142

143+
public Counter getFailedToPublishMessages() {
144+
return failedToPublishMessages;
145+
}
146+
140147
public Counter getConsumedMessages() {
141148
return consumedMessages;
142149
}
@@ -185,14 +192,19 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
185192
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
186193
return registry.counter(prefix + ".rejected", tags);
187194
}
195+
},
196+
FAILED_TO_PUBLISH_MESSAGES {
197+
@Override
198+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
199+
return registry.counter(prefix + ".failed_to_publish", tags);
200+
}
188201
};
189202

190203
/**
191204
*
192205
* @param registry
193206
* @param prefix
194207
* @deprecated will be removed in 6.0.0
195-
* @return
196208
*/
197209
@Deprecated
198210
Object create(MeterRegistry registry, String prefix) {

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4141
private final Meter consumedMessages;
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
44+
private final Meter failedToPublishMessages;
4445

4546

4647
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
4748
this.registry = registry;
4849
this.connections = registry.counter(metricsPrefix+".connections");
4950
this.channels = registry.counter(metricsPrefix+".channels");
5051
this.publishedMessages = registry.meter(metricsPrefix+".published");
52+
this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish");
5153
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
5254
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
5355
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
@@ -86,6 +88,11 @@ protected void markPublishedMessage() {
8688
publishedMessages.mark();
8789
}
8890

91+
@Override
92+
protected void markMessagePublishFailed() {
93+
failedToPublishMessages.mark();
94+
}
95+
8996
@Override
9097
protected void markConsumedMessage() {
9198
consumedMessages.mark();
@@ -130,4 +137,8 @@ public Meter getAcknowledgedMessages() {
130137
public Meter getRejectedMessages() {
131138
return rejectedMessages;
132139
}
140+
141+
public Meter getFailedToPublishMessages() {
142+
return failedToPublishMessages;
143+
}
133144
}

src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import org.junit.runner.RunWith;
2727
import org.junit.runners.Parameterized;
2828

29-
import static org.hamcrest.Matchers.*;
30-
import static org.junit.Assert.*;
31-
import static org.mockito.Mockito.*;
29+
import java.io.IOException;
30+
31+
import static org.hamcrest.Matchers.is;
32+
import static org.junit.Assert.assertThat;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
3235

3336
/**
3437
*
@@ -116,7 +119,34 @@ public void basicGetAndAck() {
116119

117120
metrics.basicAck(channel, 10, true);
118121
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
122+
}
123+
124+
@Test public void publishingAndPublishingFailures() {
125+
AbstractMetricsCollector metrics = factory.create();
126+
Channel channel = mock(Channel.class);
127+
128+
assertThat(failedToPublishMessages(metrics), is(0L));
129+
assertThat(publishedMessages(metrics), is(0L));
119130

131+
metrics.basicPublishFailure(channel, new IOException());
132+
assertThat(failedToPublishMessages(metrics), is(1L));
133+
assertThat(publishedMessages(metrics), is(0L));
134+
135+
metrics.basicPublish(channel);
136+
assertThat(failedToPublishMessages(metrics), is(1L));
137+
assertThat(publishedMessages(metrics), is(1L));
138+
139+
metrics.basicPublishFailure(channel, new IOException());
140+
assertThat(failedToPublishMessages(metrics), is(2L));
141+
assertThat(publishedMessages(metrics), is(1L));
142+
143+
metrics.basicPublish(channel);
144+
assertThat(failedToPublishMessages(metrics), is(2L));
145+
assertThat(publishedMessages(metrics), is(2L));
146+
147+
metrics.cleanStaleState();
148+
assertThat(failedToPublishMessages(metrics), is(2L));
149+
assertThat(publishedMessages(metrics), is(2L));
120150
}
121151

122152
@Test public void cleanStaleState() {
@@ -159,6 +189,22 @@ public void basicGetAndAck() {
159189
assertThat(channels(metrics), is(1L));
160190
}
161191

192+
long publishedMessages(MetricsCollector metrics) {
193+
if (metrics instanceof StandardMetricsCollector) {
194+
return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount();
195+
} else {
196+
return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count();
197+
}
198+
}
199+
200+
long failedToPublishMessages(MetricsCollector metrics) {
201+
if (metrics instanceof StandardMetricsCollector) {
202+
return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount();
203+
} else {
204+
return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count();
205+
}
206+
}
207+
162208
long consumedMessages(MetricsCollector metrics) {
163209
if (metrics instanceof StandardMetricsCollector) {
164210
return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();

0 commit comments

Comments
 (0)