Skip to content

Commit 704fa1e

Browse files
committed
#354 | call metrics (acks and nacks have to be corrected for multiples)
1 parent deb987a commit 704fa1e

File tree

4 files changed

+12
-6
lines changed

4 files changed

+12
-6
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public interface MetricsCollector {
3838

3939
void basicPublishFailure(Channel channel, Throwable cause);
4040

41-
void basicPublishAck(Channel channel);
41+
void basicPublishAck(Channel channel, long deliveryTag, boolean multiple);
4242

43-
void basicPublishNack(Channel channel);
43+
void basicPublishNack(Channel channel, long deliveryTag, boolean multiple);
4444

4545
void basicPublishUnrouted(Channel channel);
4646

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
7676
}
7777

7878
@Override
79-
public void basicPublishAck(Channel channel) {
79+
public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
8080

8181
}
8282

8383
@Override
84-
public void basicPublishNack(Channel channel) {
84+
public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
8585

8686
}
8787

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
112112
}
113113

114114
@Override
115-
public void basicPublishAck(Channel channel) {
115+
public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
116116
try {
117117
markMessagePublishAcknowledged();
118118
} catch(Exception e) {
@@ -121,7 +121,7 @@ public void basicPublishAck(Channel channel) {
121121
}
122122

123123
@Override
124-
public void basicPublishNack(Channel channel) {
124+
public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
125125
try {
126126
markMessagePublishNotAcknowledged();
127127
} catch(Exception e) {

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
481481
}
482482
} catch (Throwable ex) {
483483
getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
484+
} finally {
485+
metricsCollector.basicPublishUnrouted(this);
484486
}
485487
}
486488

@@ -491,6 +493,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B
491493
}
492494
} catch (Throwable ex) {
493495
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
496+
} finally {
497+
metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
494498
}
495499
}
496500

@@ -501,6 +505,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B
501505
}
502506
} catch (Throwable ex) {
503507
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
508+
} finally {
509+
metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
504510
}
505511
}
506512

0 commit comments

Comments
 (0)