Skip to content

Commit 176a9a5

Browse files
committed
Create abstract class for metrics collection
Fixes #222
1 parent b2c251e commit 176a9a5

File tree

2 files changed

+390
-245
lines changed

2 files changed

+390
-245
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl;
17+
18+
import com.rabbitmq.client.*;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.util.*;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ConcurrentMap;
25+
import java.util.concurrent.locks.Lock;
26+
import java.util.concurrent.locks.ReentrantLock;
27+
28+
/**
29+
* Base class for {@link MetricsCollector}.
30+
* Implements tricky logic such as keeping track of acknowledged and
31+
* rejected messages. Sub-classes just need to implement
32+
* the logic to increment their metrics.
33+
* Note transactions are not supported (see {@link MetricsCollector}.
34+
*
35+
* @see MetricsCollector
36+
*/
37+
public abstract class AbstractMetricsCollector implements MetricsCollector {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMetricsCollector.class);
40+
41+
private final ConcurrentMap<String, ConnectionState> connectionState = new ConcurrentHashMap<String, ConnectionState>();
42+
43+
private final Runnable markAcknowledgedMessageAction = new Runnable() {
44+
@Override
45+
public void run() {
46+
markAcknowledgedMessage();
47+
}
48+
};
49+
50+
private final Runnable markRejectedMessageAction = new Runnable() {
51+
@Override
52+
public void run() {
53+
markRejectedMessage();
54+
}
55+
};
56+
57+
@Override
58+
public void newConnection(final Connection connection) {
59+
try {
60+
if(connection.getId() == null) {
61+
connection.setId(UUID.randomUUID().toString());
62+
}
63+
incrementConnectionCount(connection);
64+
connectionState.put(connection.getId(), new ConnectionState(connection));
65+
connection.addShutdownListener(new ShutdownListener() {
66+
@Override
67+
public void shutdownCompleted(ShutdownSignalException cause) {
68+
closeConnection(connection);
69+
}
70+
});
71+
} catch(Exception e) {
72+
LOGGER.info("Error while computing metrics in newConnection: " + e.getMessage());
73+
}
74+
}
75+
76+
@Override
77+
public void closeConnection(Connection connection) {
78+
try {
79+
ConnectionState removed = connectionState.remove(connection.getId());
80+
if(removed != null) {
81+
decrementConnectionCount(connection);
82+
}
83+
} catch(Exception e) {
84+
LOGGER.info("Error while computing metrics in closeConnection: " + e.getMessage());
85+
}
86+
}
87+
88+
@Override
89+
public void newChannel(final Channel channel) {
90+
try {
91+
incrementChannelCount(channel);
92+
channel.addShutdownListener(new ShutdownListener() {
93+
@Override
94+
public void shutdownCompleted(ShutdownSignalException cause) {
95+
closeChannel(channel);
96+
}
97+
});
98+
connectionState(channel.getConnection()).channelState.put(channel.getChannelNumber(), new ChannelState(channel));
99+
} catch(Exception e) {
100+
LOGGER.info("Error while computing metrics in newChannel: " + e.getMessage());
101+
}
102+
}
103+
104+
@Override
105+
public void closeChannel(Channel channel) {
106+
try {
107+
ChannelState removed = connectionState(channel.getConnection()).channelState.remove(channel.getChannelNumber());
108+
if(removed != null) {
109+
decrementChannelCount(channel);
110+
}
111+
} catch(Exception e) {
112+
LOGGER.info("Error while computing metrics in closeChannel: " + e.getMessage());
113+
}
114+
}
115+
116+
@Override
117+
public void basicPublish(Channel channel) {
118+
try {
119+
markPublishedMessage();
120+
} catch(Exception e) {
121+
LOGGER.info("Error while computing metrics in basicPublish: " + e.getMessage());
122+
}
123+
}
124+
125+
@Override
126+
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
127+
try {
128+
if(!autoAck) {
129+
ChannelState channelState = channelState(channel);
130+
channelState.lock.lock();
131+
try {
132+
channelState(channel).consumersWithManualAck.add(consumerTag);
133+
} finally {
134+
channelState.lock.unlock();
135+
}
136+
}
137+
} catch(Exception e) {
138+
LOGGER.info("Error while computing metrics in basicConsume: " + e.getMessage());
139+
}
140+
}
141+
142+
@Override
143+
public void basicCancel(Channel channel, String consumerTag) {
144+
try {
145+
ChannelState channelState = channelState(channel);
146+
channelState.lock.lock();
147+
try {
148+
channelState(channel).consumersWithManualAck.remove(consumerTag);
149+
} finally {
150+
channelState.lock.unlock();
151+
}
152+
} catch(Exception e) {
153+
LOGGER.info("Error while computing metrics in basicCancel: " + e.getMessage());
154+
}
155+
}
156+
157+
@Override
158+
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
159+
try {
160+
markConsumedMessage();
161+
if(!autoAck) {
162+
ChannelState channelState = channelState(channel);
163+
channelState.lock.lock();
164+
try {
165+
channelState(channel).unackedMessageDeliveryTags.add(deliveryTag);
166+
} finally {
167+
channelState.lock.unlock();
168+
}
169+
}
170+
} catch(Exception e) {
171+
LOGGER.info("Error while computing metrics in consumedMessage: " + e.getMessage());
172+
}
173+
}
174+
175+
@Override
176+
public void consumedMessage(Channel channel, long deliveryTag, String consumerTag) {
177+
try {
178+
markConsumedMessage();
179+
ChannelState channelState = channelState(channel);
180+
channelState.lock.lock();
181+
try {
182+
if(channelState.consumersWithManualAck.contains(consumerTag)) {
183+
channelState.unackedMessageDeliveryTags.add(deliveryTag);
184+
}
185+
} finally {
186+
channelState.lock.unlock();
187+
}
188+
} catch(Exception e) {
189+
LOGGER.info("Error while computing metrics in consumedMessage: " + e.getMessage());
190+
}
191+
}
192+
193+
@Override
194+
public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
195+
try {
196+
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, markAcknowledgedMessageAction);
197+
} catch(Exception e) {
198+
LOGGER.info("Error while computing metrics in basicAck: " + e.getMessage());
199+
}
200+
}
201+
202+
@Override
203+
public void basicNack(Channel channel, long deliveryTag) {
204+
try {
205+
updateChannelStateAfterAckReject(channel, deliveryTag, true, markRejectedMessageAction);
206+
} catch(Exception e) {
207+
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
208+
}
209+
}
210+
211+
@Override
212+
public void basicReject(Channel channel, long deliveryTag) {
213+
try {
214+
updateChannelStateAfterAckReject(channel, deliveryTag, false, markRejectedMessageAction);
215+
} catch(Exception e) {
216+
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
217+
}
218+
}
219+
220+
private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag, boolean multiple, Runnable action) {
221+
ChannelState channelState = channelState(channel);
222+
channelState.lock.lock();
223+
try {
224+
if(multiple) {
225+
Iterator<Long> iterator = channelState.unackedMessageDeliveryTags.iterator();
226+
while(iterator.hasNext()) {
227+
long messageDeliveryTag = iterator.next();
228+
if(messageDeliveryTag <= deliveryTag) {
229+
iterator.remove();
230+
action.run();
231+
}
232+
}
233+
} else {
234+
channelState.unackedMessageDeliveryTags.remove(deliveryTag);
235+
action.run();
236+
}
237+
} finally {
238+
channelState.lock.unlock();
239+
}
240+
}
241+
242+
private ConnectionState connectionState(Connection connection) {
243+
return connectionState.get(connection.getId());
244+
}
245+
246+
private ChannelState channelState(Channel channel) {
247+
return connectionState(channel.getConnection()).channelState.get(channel.getChannelNumber());
248+
}
249+
250+
/**
251+
* Clean inner state for close connections and channels.
252+
* Inner state is automatically cleaned on connection
253+
* and channel closing.
254+
* Thus, this method is provided as a safety net, to be externally
255+
* called periodically if closing of resources wouldn't work
256+
* properly for some corner cases.
257+
*/
258+
public void cleanStaleState() {
259+
try {
260+
Iterator<Map.Entry<String, ConnectionState>> connectionStateIterator = connectionState.entrySet().iterator();
261+
while(connectionStateIterator.hasNext()) {
262+
Map.Entry<String, ConnectionState> connectionEntry = connectionStateIterator.next();
263+
Connection connection = connectionEntry.getValue().connection;
264+
if(connection.isOpen()) {
265+
Iterator<Map.Entry<Integer, ChannelState>> channelStateIterator = connectionEntry.getValue().channelState.entrySet().iterator();
266+
while(channelStateIterator.hasNext()) {
267+
Map.Entry<Integer, ChannelState> channelStateEntry = channelStateIterator.next();
268+
Channel channel = channelStateEntry.getValue().channel;
269+
if(!channel.isOpen()) {
270+
channelStateIterator.remove();
271+
decrementChannelCount(channel);
272+
LOGGER.info("Ripped off state of channel {} of connection {}. This is abnormal, please report.",
273+
channel.getChannelNumber(), connection.getId());
274+
}
275+
}
276+
} else {
277+
connectionStateIterator.remove();
278+
decrementConnectionCount(connection);
279+
for(int i = 0; i < connectionEntry.getValue().channelState.size(); i++) {
280+
decrementChannelCount(null);
281+
}
282+
LOGGER.info("Ripped off state of connection {}. This is abnormal, please report.",
283+
connection.getId());
284+
}
285+
}
286+
} catch(Exception e) {
287+
LOGGER.info("Error during periodic clean of metricsCollector: "+e.getMessage());
288+
}
289+
}
290+
291+
private static class ConnectionState {
292+
293+
final ConcurrentMap<Integer, ChannelState> channelState = new ConcurrentHashMap<Integer, ChannelState>();
294+
final Connection connection;
295+
296+
private ConnectionState(Connection connection) {
297+
this.connection = connection;
298+
}
299+
}
300+
301+
private static class ChannelState {
302+
303+
final Lock lock = new ReentrantLock();
304+
305+
final Set<Long> unackedMessageDeliveryTags = new HashSet<Long>();
306+
final Set<String> consumersWithManualAck = new HashSet<String>();
307+
308+
final Channel channel;
309+
310+
private ChannelState(Channel channel) {
311+
this.channel = channel;
312+
}
313+
314+
}
315+
316+
/**
317+
* Increments connection count.
318+
* The connection object is passed in as complementary information
319+
* and without any guarantee of not being null.
320+
* @param connection the connection that has been created (can be null)
321+
*/
322+
protected abstract void incrementConnectionCount(Connection connection);
323+
324+
/**
325+
* Decrements connection count.
326+
* The connection object is passed in as complementary information
327+
* and without any guarantee of not being null.
328+
* @param connection the connection that has been closed (can be null)
329+
*/
330+
protected abstract void decrementConnectionCount(Connection connection);
331+
332+
/**
333+
* Increments channel count.
334+
* The channel object is passed in as complementary information
335+
* and without any guarantee of not being null.
336+
* @param channel the channel that has been created (can be null)
337+
*/
338+
protected abstract void incrementChannelCount(Channel channel);
339+
340+
/**
341+
* Decrements channel count.
342+
* The channel object is passed in as complementary information
343+
* and without any guarantee of not being null.
344+
* @param channel
345+
*/
346+
protected abstract void decrementChannelCount(Channel channel);
347+
348+
/**
349+
* Marks the event of a published message.
350+
*/
351+
protected abstract void markPublishedMessage();
352+
353+
/**
354+
* Marks the event of a consumed message.
355+
*/
356+
protected abstract void markConsumedMessage();
357+
358+
/**
359+
* Marks the event of an acknowledged message.
360+
*/
361+
protected abstract void markAcknowledgedMessage();
362+
363+
/**
364+
* Marks the event of a rejected message.
365+
*/
366+
protected abstract void markRejectedMessage();
367+
368+
369+
370+
}

0 commit comments

Comments
 (0)