Skip to content

Commit deb987a

Browse files
committed
#354 | add implementation for ack, nack and unrouted publishes
1 parent 843db2b commit deb987a

File tree

5 files changed

+147
-2
lines changed

5 files changed

+147
-2
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ public interface MetricsCollector {
3838

3939
void basicPublishFailure(Channel channel, Throwable cause);
4040

41+
void basicPublishAck(Channel channel);
42+
43+
void basicPublishNack(Channel channel);
44+
45+
void basicPublishUnrouted(Channel channel);
46+
4147
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
4248

4349
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,21 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
7575

7676
}
7777

78+
@Override
79+
public void basicPublishAck(Channel channel) {
80+
81+
}
82+
83+
@Override
84+
public void basicPublishNack(Channel channel) {
85+
86+
}
87+
88+
@Override
89+
public void basicPublishUnrouted(Channel channel) {
90+
91+
}
92+
7893
@Override
7994
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
8095

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,33 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
111111
}
112112
}
113113

114+
@Override
115+
public void basicPublishAck(Channel channel) {
116+
try {
117+
markMessagePublishAcknowledged();
118+
} catch(Exception e) {
119+
LOGGER.info("Error while computing metrics in basicPublishAck: " + e.getMessage());
120+
}
121+
}
122+
123+
@Override
124+
public void basicPublishNack(Channel channel) {
125+
try {
126+
markMessagePublishNotAcknowledged();
127+
} catch(Exception e) {
128+
LOGGER.info("Error while computing metrics in basicPublishNack: " + e.getMessage());
129+
}
130+
}
131+
132+
@Override
133+
public void basicPublishUnrouted(Channel channel) {
134+
try {
135+
markPublishedMessageNotRouted();
136+
} catch(Exception e) {
137+
LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage());
138+
}
139+
}
140+
114141
@Override
115142
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
116143
try {
@@ -360,6 +387,17 @@ private ChannelState(Channel channel) {
360387
*/
361388
protected abstract void markRejectedMessage();
362389

390+
/**
391+
* Marks the event of a message publishing acknowledgement.
392+
*/
393+
protected abstract void markMessagePublishAcknowledged();
363394

364-
395+
/**
396+
* Marks the event of a message publishing not being acknowledged.
397+
*/
398+
protected abstract void markMessagePublishNotAcknowledged();
399+
/**
400+
* Marks the event of a published message not being routed.
401+
*/
402+
protected abstract void markPublishedMessageNotRouted();
365403
}

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
5151

5252
private final Counter failedToPublishMessages;
5353

54+
private final Counter ackedPublishedMessages;
55+
56+
private final Counter nackedPublishedMessages;
57+
58+
private final Counter unroutedPublishedMessages;
59+
5460
private final Counter consumedMessages;
5561

5662
private final Counter acknowledgedMessages;
@@ -81,6 +87,9 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
8187
this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES);
8288
this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES);
8389
this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES);
90+
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
91+
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
92+
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
8493
}
8594

8695
@Override
@@ -128,6 +137,21 @@ protected void markRejectedMessage() {
128137
rejectedMessages.increment();
129138
}
130139

140+
@Override
141+
protected void markMessagePublishAcknowledged() {
142+
ackedPublishedMessages.increment();
143+
}
144+
145+
@Override
146+
protected void markMessagePublishNotAcknowledged() {
147+
nackedPublishedMessages.increment();
148+
}
149+
150+
@Override
151+
protected void markPublishedMessageNotRouted() {
152+
unroutedPublishedMessages.increment();
153+
}
154+
131155
public AtomicLong getConnections() {
132156
return connections;
133157
}
@@ -144,6 +168,18 @@ public Counter getFailedToPublishMessages() {
144168
return failedToPublishMessages;
145169
}
146170

171+
public Counter getAckedPublishedMessages() {
172+
return ackedPublishedMessages;
173+
}
174+
175+
public Counter getNackedPublishedMessages() {
176+
return nackedPublishedMessages;
177+
}
178+
179+
public Counter getUnroutedPublishedMessages() {
180+
return unroutedPublishedMessages;
181+
}
182+
147183
public Counter getConsumedMessages() {
148184
return consumedMessages;
149185
}
@@ -198,6 +234,24 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
198234
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
199235
return registry.counter(prefix + ".failed_to_publish", tags);
200236
}
237+
},
238+
ACKED_PUBLISHED_MESSAGES {
239+
@Override
240+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
241+
return registry.counter(prefix + ".acknowledged_published", tags);
242+
}
243+
},
244+
NACKED_PUBLISHED_MESSAGES {
245+
@Override
246+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
247+
return registry.counter(prefix + ".not_acknowledged_published", tags);
248+
}
249+
},
250+
UNROUTED_PUBLISHED_MESSAGES {
251+
@Override
252+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
253+
return registry.counter(prefix + ".unrouted_published", tags);
254+
}
201255
};
202256

203257
/**

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
4444
private final Meter failedToPublishMessages;
45+
private final Meter publishAcknowledgedMessages;
46+
private final Meter publishNacknowledgedMessages;
47+
private final Meter publishUnroutedMessages;
4548

4649

4750
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
@@ -50,6 +53,9 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5053
this.channels = registry.counter(metricsPrefix+".channels");
5154
this.publishedMessages = registry.meter(metricsPrefix+".published");
5255
this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish");
56+
this.publishAcknowledgedMessages = registry.meter(metricsPrefix+".publish_ack");
57+
this.publishNacknowledgedMessages = registry.meter(metricsPrefix+".publish_nack");
58+
this.publishUnroutedMessages = registry.meter(metricsPrefix+".publish_unrouted");
5359
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
5460
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
5561
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
@@ -108,8 +114,21 @@ protected void markRejectedMessage() {
108114
rejectedMessages.mark();
109115
}
110116

117+
@Override
118+
protected void markMessagePublishAcknowledged() {
119+
acknowledgedMessages.mark();
120+
}
121+
122+
@Override
123+
protected void markMessagePublishNotAcknowledged() {
124+
publishNacknowledgedMessages.mark();
125+
}
126+
127+
@Override
128+
protected void markPublishedMessageNotRouted() {
129+
publishUnroutedMessages.mark();
130+
}
111131

112-
113132
public MetricRegistry getMetricRegistry() {
114133
return registry;
115134
}
@@ -141,4 +160,17 @@ public Meter getRejectedMessages() {
141160
public Meter getFailedToPublishMessages() {
142161
return failedToPublishMessages;
143162
}
163+
164+
public Meter getPublishAcknowledgedMessages() {
165+
return publishAcknowledgedMessages;
166+
}
167+
168+
public Meter getPublishNotAcknowledgedMessages() {
169+
return publishNacknowledgedMessages;
170+
}
171+
172+
public Meter getPublishUnroutedMessages() {
173+
return publishUnroutedMessages;
174+
}
175+
144176
}

0 commit comments

Comments
 (0)