Skip to content

Commit cfadcfa

Browse files
author
Steve Powell
committed
Replaced set/get*Listener on Channel interface with add/remove*Listener;
added 'collection' of listeners of three types: Return, Flow and Confirm; single private monitor manages these collections; copy of collection is made under lock before making listener calls.
1 parent a5a75b8 commit cfadcfa

File tree

8 files changed

+144
-110
lines changed

8 files changed

+144
-110
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,40 +111,46 @@ public interface Channel extends ShutdownNotifier {
111111
void abort(int closeCode, String closeMessage) throws IOException;
112112

113113
/**
114-
* Return the current {@link ReturnListener}.
115-
* @return an interface to the current return listener
114+
* Add a {@link ReturnListener}.
115+
* @param listener the listener to add
116116
*/
117-
ReturnListener getReturnListener();
117+
void addReturnListener(ReturnListener listener);
118118

119119
/**
120-
* Set the current {@link ReturnListener}.
121-
* @param listener the listener to use, or null indicating "don't use one".
120+
* Remove a {@link ReturnListener}.
121+
* @param listener the listener to remove
122+
* @return <code><b>true</b></code> if the listener was found and removed,
123+
* <code><b>false</b></code> otherwise
122124
*/
123-
void setReturnListener(ReturnListener listener);
125+
boolean removeReturnListener(ReturnListener listener);
124126

125127
/**
126-
* Return the current {@link FlowListener}.
127-
* @return an interface to the current flow listener.
128+
* Add a {@link FlowListener}.
129+
* @param listener the listener to add
128130
*/
129-
FlowListener getFlowListener();
131+
void addFlowListener(FlowListener listener);
130132

131133
/**
132-
* Set the current {@link FlowListener}.
133-
* @param listener the listener to use, or null indicating "don't use one".
134+
* Remove a {@link FlowListener}.
135+
* @param listener the listener to remove
136+
* @return <code><b>true</b></code> if the listener was found and removed,
137+
* <code><b>false</b></code> otherwise
134138
*/
135-
void setFlowListener(FlowListener listener);
139+
boolean removeFlowListener(FlowListener listener);
136140

137141
/**
138-
* Return the current {@link ConfirmListener}.
139-
* @return an interface to the current ack listener.
142+
* Add a {@link ConfirmListener}.
143+
* @param listener the listener to add
140144
*/
141-
ConfirmListener getConfirmListener();
145+
void addConfirmListener(ConfirmListener listener);
142146

143147
/**
144-
* Set the current {@link ConfirmListener}.
145-
* @param listener the listener to use, or null indicating "don't use one".
148+
* Remove a {@link ConfirmListener}.
149+
* @param listener the listener to remove
150+
* @return <code><b>true</b></code> if the listener was found and removed,
151+
* <code><b>false</b></code> otherwise
146152
*/
147-
void setConfirmListener(ConfirmListener listener);
153+
boolean removeConfirmListener(ConfirmListener listener);
148154

149155
/**
150156
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 113 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
2123
import java.util.Collections;
2224
import java.util.HashMap;
25+
import java.util.LinkedList;
2326
import java.util.Map;
2427
import java.util.concurrent.TimeoutException;
2528

@@ -78,25 +81,19 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7881
public final Map<String, Consumer> _consumers =
7982
Collections.synchronizedMap(new HashMap<String, Consumer>());
8083

81-
/** Reference to the currently-active ReturnListener, or null if there is none.
82-
*/
83-
public volatile ReturnListener returnListener = null;
84-
85-
/** Reference to the currently-active FlowListener, or null if there is none.
86-
*/
87-
public volatile FlowListener flowListener = null;
88-
89-
/** Reference to the currently-active ConfirmListener, or null if there is none.
90-
*/
91-
public volatile ConfirmListener confirmListener = null;
84+
/** Monitor protecting all listeners collections */
85+
private final Object listenersMonitor = new Object();
86+
/** The ReturnListener collection. */
87+
private final Collection<ReturnListener> returnListeners = new LinkedList<ReturnListener>();
88+
/** The FlowListener collection. */
89+
private final Collection<FlowListener> flowListeners = new LinkedList<FlowListener>();
90+
/** The ConfirmListener collection. */
91+
private final Collection<ConfirmListener> confirmListeners = new LinkedList<ConfirmListener>();
9292

93-
/** Sequence number of next published message requiring confirmation.
94-
*/
93+
/** Sequence number of next published message requiring confirmation. */
9594
private long nextPublishSeqNo = 0L;
9695

97-
/** Reference to the currently-active default consumer, or null if there is
98-
* none.
99-
*/
96+
/** The current default consumer, or null if there is none. */
10097
public volatile Consumer defaultConsumer = null;
10198

10299
/**
@@ -123,43 +120,67 @@ public void open() throws IOException {
123120
Utility.use(openOk);
124121
}
125122

126-
/** Returns the current ReturnListener. */
127-
public ReturnListener getReturnListener() {
128-
return returnListener;
123+
/** Returns a readable copy of the current ReturnListener collection. */
124+
private Collection<ReturnListener> getReturnListeners() {
125+
synchronized (listenersMonitor) {
126+
if (returnListeners.isEmpty())
127+
return Collections.emptyList();
128+
return new ArrayList<ReturnListener>(returnListeners);
129+
}
129130
}
130131

131-
/**
132-
* Sets the current ReturnListener.
133-
* A null argument is interpreted to mean "do not use a return listener".
134-
*/
135-
public void setReturnListener(ReturnListener listener) {
136-
returnListener = listener;
132+
public void addReturnListener(ReturnListener listener) {
133+
synchronized (listenersMonitor) {
134+
returnListeners.add(listener);
135+
}
137136
}
138137

139-
/** Returns the current {@link FlowListener}. */
140-
public FlowListener getFlowListener() {
141-
return flowListener;
138+
public boolean removeReturnListener(ReturnListener listener) {
139+
synchronized (listenersMonitor) {
140+
return returnListeners.remove(listener);
141+
}
142142
}
143143

144-
/**
145-
* Sets the current {@link FlowListener}.
146-
* A null argument is interpreted to mean "do not use a flow listener".
147-
*/
148-
public void setFlowListener(FlowListener listener) {
149-
flowListener = listener;
144+
/** Returns a readable copy of the current FlowListener collection. */
145+
private Collection<FlowListener> getFlowListeners() {
146+
synchronized (listenersMonitor) {
147+
if (flowListeners.isEmpty())
148+
return Collections.emptyList();
149+
return new ArrayList<FlowListener>(flowListeners);
150+
}
150151
}
151152

152-
/** Returns the current {@link ConfirmListener}. */
153-
public ConfirmListener getConfirmListener() {
154-
return confirmListener;
153+
public void addFlowListener(FlowListener listener) {
154+
synchronized (listenersMonitor) {
155+
flowListeners.add(listener);
156+
}
155157
}
156158

157-
/**
158-
* Sets the current {@link ConfirmListener}.
159-
* A null argument is interpreted to mean "do not use a confirm listener".
160-
*/
161-
public void setConfirmListener(ConfirmListener listener) {
162-
confirmListener = listener;
159+
public boolean removeFlowListener(FlowListener listener) {
160+
synchronized (listenersMonitor) {
161+
return flowListeners.remove(listener);
162+
}
163+
}
164+
165+
/** Returns a readable copy of the current ConfirmListener collection. */
166+
private Collection<ConfirmListener> getConfirmListeners() {
167+
synchronized (listenersMonitor) {
168+
if (confirmListeners.isEmpty())
169+
return Collections.emptyList();
170+
return new ArrayList<ConfirmListener>(confirmListeners);
171+
}
172+
}
173+
174+
public void addConfirmListener(ConfirmListener listener) {
175+
synchronized (listenersMonitor) {
176+
confirmListeners.add(listener);
177+
}
178+
}
179+
180+
public boolean removeConfirmListener(ConfirmListener listener) {
181+
synchronized (listenersMonitor) {
182+
return confirmListeners.remove(listener);
183+
}
163184
}
164185

165186
/** Returns the current default consumer. */
@@ -275,22 +296,7 @@ public void releaseChannelNumber() {
275296
}
276297
return true;
277298
} else if (method instanceof Basic.Return) {
278-
ReturnListener l = getReturnListener();
279-
if (l != null) {
280-
Basic.Return basicReturn = (Basic.Return) method;
281-
try {
282-
l.handleReturn(basicReturn.getReplyCode(),
283-
basicReturn.getReplyText(),
284-
basicReturn.getExchange(),
285-
basicReturn.getRoutingKey(),
286-
(BasicProperties)
287-
command.getContentHeader(),
288-
command.getContentBody());
289-
} catch (Throwable ex) {
290-
_connection.getExceptionHandler().handleReturnListenerException(this,
291-
ex);
292-
}
293-
}
299+
callReturnListeners(command, (Basic.Return) method);
294300
return true;
295301
} else if (method instanceof Channel.Flow) {
296302
Channel.Flow channelFlow = (Channel.Flow) method;
@@ -299,36 +305,13 @@ public void releaseChannelNumber() {
299305
transmit(new Channel.FlowOk(!_blockContent));
300306
_channelMutex.notifyAll();
301307
}
302-
FlowListener l = getFlowListener();
303-
if (l != null) {
304-
try {
305-
l.handleFlow(channelFlow.getActive());
306-
} catch (Throwable ex) {
307-
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
308-
}
309-
}
308+
callFlowListeners(command, channelFlow);
310309
return true;
311310
} else if (method instanceof Basic.Ack) {
312-
Basic.Ack ack = (Basic.Ack) method;
313-
ConfirmListener l = getConfirmListener();
314-
if (l != null) {
315-
try {
316-
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
317-
} catch (Throwable ex) {
318-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
319-
}
320-
}
311+
callConfirmListeners(command, (Basic.Ack) method);
321312
return true;
322313
} else if (method instanceof Basic.Nack) {
323-
Basic.Nack nack = (Basic.Nack) method;
324-
ConfirmListener l = getConfirmListener();
325-
if (l != null) {
326-
try {
327-
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
328-
} catch (Throwable ex) {
329-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
330-
}
331-
}
314+
callConfirmListeners(command, (Basic.Nack) method);
332315
return true;
333316
} else if (method instanceof Basic.RecoverOk) {
334317
for (Consumer callback: _consumers.values()) {
@@ -379,6 +362,51 @@ public void releaseChannelNumber() {
379362
}
380363
}
381364

365+
private void callReturnListeners(Command command, Basic.Return basicReturn) {
366+
try {
367+
for (ReturnListener l : getReturnListeners()) {
368+
l.handleReturn(basicReturn.getReplyCode(),
369+
basicReturn.getReplyText(),
370+
basicReturn.getExchange(),
371+
basicReturn.getRoutingKey(),
372+
(BasicProperties) command.getContentHeader(),
373+
command.getContentBody());
374+
}
375+
} catch (Throwable ex) {
376+
_connection.getExceptionHandler().handleReturnListenerException(this, ex);
377+
}
378+
}
379+
380+
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
381+
try {
382+
for (FlowListener l : getFlowListeners()) {
383+
l.handleFlow(channelFlow.getActive());
384+
}
385+
} catch (Throwable ex) {
386+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
387+
}
388+
}
389+
390+
private void callConfirmListeners(Command command, Basic.Ack ack) {
391+
try {
392+
for (ConfirmListener l : getConfirmListeners()) {
393+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
394+
}
395+
} catch (Throwable ex) {
396+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
397+
}
398+
}
399+
400+
private void callConfirmListeners(Command command, Basic.Nack nack) {
401+
try {
402+
for (ConfirmListener l : getConfirmListeners()) {
403+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
404+
}
405+
} catch (Throwable ex) {
406+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
407+
}
408+
}
409+
382410
private void asyncShutdown(Command command) throws IOException {
383411
releaseChannelNumber();
384412
ShutdownSignalException signal = new ShutdownSignalException(false,

test/src/com/rabbitmq/client/test/ConfirmBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ConfirmBase extends BrokerTestCase {
3232
protected void setUp() throws IOException {
3333
super.setUp();
3434
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35-
channel.setConfirmListener(new ConfirmListener() {
35+
channel.addConfirmListener(new ConfirmListener() {
3636
public void handleAck(long seqNo, boolean multiple) {
3737
if (!unconfirmedSet.contains(seqNo)) {
3838
fail("got duplicate ack: " + seqNo);

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static boolean[] expected(String key) {
5757

5858
@Override protected void setUp() throws IOException {
5959
super.setUp();
60-
channel.setReturnListener(new ReturnListener() {
60+
channel.addReturnListener(new ReturnListener() {
6161
public void handleReturn(int replyCode,
6262
String replyText,
6363
String exchange,

test/src/com/rabbitmq/client/test/performance/ScalabilityTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private float timeRouting(Channel channel, String[] routingKeys)
284284
boolean mandatory = true;
285285
boolean immdediate = true;
286286
final CountDownLatch latch = new CountDownLatch(params.messageCount);
287-
channel.setReturnListener(new ReturnListener() {
287+
channel.addReturnListener(new ReturnListener() {
288288
public void handleReturn(int replyCode, String replyText,
289289
String exchange, String routingKey,
290290
AMQP.BasicProperties properties, byte[] body) throws IOException {

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void run() {
6161
Connection conn = connectionFactory.newConnection();
6262
Channel ch = conn.createChannel();
6363
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
64-
ch.setConfirmListener(new ConfirmListener() {
64+
ch.addConfirmListener(new ConfirmListener() {
6565
public void handleAck(long seqNo, boolean multiple) {
6666
if (multiple) {
6767
unconfirmedSet.headSet(seqNo+1).clear();

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ public static void main(String[] args) {
135135
1000L * samplingInterval,
136136
rateLimit, minMsgSize, timeLimit,
137137
confirm);
138-
channel.setReturnListener(p);
139-
channel.setConfirmListener(p);
138+
channel.addReturnListener(p);
139+
channel.addConfirmListener(p);
140140
Thread t = new Thread(p);
141141
producerThreads[i] = t;
142142
t.start();

test/src/com/rabbitmq/examples/TestMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public void run() throws IOException {
230230

231231
_ch1 = createChannel();
232232

233-
_ch1.setReturnListener(new ReturnListener() {
233+
_ch1.addReturnListener(new ReturnListener() {
234234
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body)
235235
throws IOException {
236236
Method method = new AMQImpl.Basic.Return(replyCode, replyText, exchange, routingKey);

0 commit comments

Comments
 (0)