@@ -139,6 +139,49 @@ protected void releaseResources() throws IOException {
139139 }
140140 }
141141
142+ @ Test public void metricsPublisherUnrouted () throws IOException , TimeoutException , InterruptedException {
143+ StandardMetricsCollector metrics = new StandardMetricsCollector ();
144+ connectionFactory .setMetricsCollector (metrics );
145+ Connection connection = null ;
146+ try {
147+ connection = connectionFactory .newConnection ();
148+ Channel channel = connection .createChannel ();
149+ channel .confirmSelect ();
150+ assertThat (metrics .getPublishUnroutedMessages (), is (1L ));
151+ // when
152+ channel .basicPublish (
153+ "any-exchange" ,
154+ "any-routing-key" ,
155+ MessageProperties .MINIMAL_BASIC ,
156+ "any-message" .getBytes ()
157+ );
158+ channel .waitForConfirms (30 * 60 * 1000 );
159+ // then
160+ assertThat (metrics .getPublishUnroutedMessages (), is (1L ));
161+ } finally {
162+ safeClose (connection );
163+ }
164+ }
165+
166+ @ Test public void metricsPublisherAck () throws IOException , TimeoutException , InterruptedException {
167+ StandardMetricsCollector metrics = new StandardMetricsCollector ();
168+ connectionFactory .setMetricsCollector (metrics );
169+ Connection connection = null ;
170+ try {
171+ connection = connectionFactory .newConnection ();
172+ Channel channel = connection .createChannel ();
173+ channel .confirmSelect ();
174+ assertThat (metrics .getPublishAcknowledgedMessages (), is (0L ));
175+ channel .basicConsume (QUEUE , false , new MultipleAckConsumer (channel , false ));
176+ // when
177+ sendMessage (channel );
178+ channel .waitForConfirms (30 * 60 * 1000 );
179+ // then
180+ assertThat (metrics .getPublishAcknowledgedMessages (), is (1L ));
181+ } finally {
182+ safeClose (connection );
183+ }
184+ }
142185
143186 @ Test public void metricsAck () throws IOException , TimeoutException {
144187 StandardMetricsCollector metrics = new StandardMetricsCollector ();
0 commit comments