Skip to content

Commit 93b32c2

Browse files
author
Steve Powell
committed
Merge in default
2 parents e8437e4 + 8b478f1 commit 93b32c2

File tree

10 files changed

+167
-124
lines changed

10 files changed

+167
-124
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,40 +111,61 @@ public interface Channel extends ShutdownNotifier {
111111
void abort(int closeCode, String closeMessage) throws IOException;
112112

113113
/**
114-
* Return the current {@link ReturnListener}.
115-
* @return an interface to the current return listener
114+
* Add a {@link ReturnListener}.
115+
* @param listener the listener to add
116116
*/
117-
ReturnListener getReturnListener();
117+
void addReturnListener(ReturnListener listener);
118118

119119
/**
120-
* Set the current {@link ReturnListener}.
121-
* @param listener the listener to use, or null indicating "don't use one".
120+
* Remove a {@link ReturnListener}.
121+
* @param listener the listener to remove
122+
* @return <code><b>true</b></code> if the listener was found and removed,
123+
* <code><b>false</b></code> otherwise
122124
*/
123-
void setReturnListener(ReturnListener listener);
125+
boolean removeReturnListener(ReturnListener listener);
124126

125127
/**
126-
* Return the current {@link FlowListener}.
127-
* @return an interface to the current flow listener.
128+
* Remove all {@link ReturnListener}s.
128129
*/
129-
FlowListener getFlowListener();
130+
void clearReturnListeners();
130131

131132
/**
132-
* Set the current {@link FlowListener}.
133-
* @param listener the listener to use, or null indicating "don't use one".
133+
* Add a {@link FlowListener}.
134+
* @param listener the listener to add
134135
*/
135-
void setFlowListener(FlowListener listener);
136+
void addFlowListener(FlowListener listener);
136137

137138
/**
138-
* Return the current {@link ConfirmListener}.
139-
* @return an interface to the current ack listener.
139+
* Remove a {@link FlowListener}.
140+
* @param listener the listener to remove
141+
* @return <code><b>true</b></code> if the listener was found and removed,
142+
* <code><b>false</b></code> otherwise
140143
*/
141-
ConfirmListener getConfirmListener();
144+
boolean removeFlowListener(FlowListener listener);
142145

143146
/**
144-
* Set the current {@link ConfirmListener}.
145-
* @param listener the listener to use, or null indicating "don't use one".
147+
* Remove all {@link FlowListener}s.
146148
*/
147-
void setConfirmListener(ConfirmListener listener);
149+
void clearFlowListeners();
150+
151+
/**
152+
* Add a {@link ConfirmListener}.
153+
* @param listener the listener to add
154+
*/
155+
void addConfirmListener(ConfirmListener listener);
156+
157+
/**
158+
* Remove a {@link ConfirmListener}.
159+
* @param listener the listener to remove
160+
* @return <code><b>true</b></code> if the listener was found and removed,
161+
* <code><b>false</b></code> otherwise
162+
*/
163+
boolean removeConfirmListener(ConfirmListener listener);
164+
165+
/**
166+
* Remove all {@link ConfirmListener}s.
167+
*/
168+
void clearConfirmListeners();
148169

149170
/**
150171
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 84 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import java.io.IOException;
21+
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.Map;
25+
import java.util.concurrent.CopyOnWriteArrayList;
2426
import java.util.concurrent.TimeoutException;
2527

2628
import com.rabbitmq.client.AMQP;
@@ -78,25 +80,18 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7880
public final Map<String, Consumer> _consumers =
7981
Collections.synchronizedMap(new HashMap<String, Consumer>());
8082

81-
/** Reference to the currently-active ReturnListener, or null if there is none.
82-
*/
83-
public volatile ReturnListener returnListener = null;
84-
85-
/** Reference to the currently-active FlowListener, or null if there is none.
86-
*/
87-
public volatile FlowListener flowListener = null;
83+
/* All listeners collections are in CopyOnWriteArrayList objects */
84+
/** The ReturnListener collection. */
85+
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
86+
/** The FlowListener collection. */
87+
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
88+
/** The ConfirmListener collection. */
89+
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
8890

89-
/** Reference to the currently-active ConfirmListener, or null if there is none.
90-
*/
91-
public volatile ConfirmListener confirmListener = null;
92-
93-
/** Sequence number of next published message requiring confirmation.
94-
*/
91+
/** Sequence number of next published message requiring confirmation. */
9592
private long nextPublishSeqNo = 0L;
9693

97-
/** Reference to the currently-active default consumer, or null if there is
98-
* none.
99-
*/
94+
/** The current default consumer, or null if there is none. */
10095
public volatile Consumer defaultConsumer = null;
10196

10297
/**
@@ -123,43 +118,40 @@ public void open() throws IOException {
123118
Utility.use(openOk);
124119
}
125120

126-
/** Returns the current ReturnListener. */
127-
public ReturnListener getReturnListener() {
128-
return returnListener;
121+
public void addReturnListener(ReturnListener listener) {
122+
returnListeners.add(listener);
129123
}
130124

131-
/**
132-
* Sets the current ReturnListener.
133-
* A null argument is interpreted to mean "do not use a return listener".
134-
*/
135-
public void setReturnListener(ReturnListener listener) {
136-
returnListener = listener;
125+
public boolean removeReturnListener(ReturnListener listener) {
126+
return returnListeners.remove(listener);
137127
}
138128

139-
/** Returns the current {@link FlowListener}. */
140-
public FlowListener getFlowListener() {
141-
return flowListener;
129+
public void clearReturnListeners() {
130+
returnListeners.clear();
142131
}
143132

144-
/**
145-
* Sets the current {@link FlowListener}.
146-
* A null argument is interpreted to mean "do not use a flow listener".
147-
*/
148-
public void setFlowListener(FlowListener listener) {
149-
flowListener = listener;
133+
public void addFlowListener(FlowListener listener) {
134+
flowListeners.add(listener);
150135
}
151136

152-
/** Returns the current {@link ConfirmListener}. */
153-
public ConfirmListener getConfirmListener() {
154-
return confirmListener;
137+
public boolean removeFlowListener(FlowListener listener) {
138+
return flowListeners.remove(listener);
155139
}
156140

157-
/**
158-
* Sets the current {@link ConfirmListener}.
159-
* A null argument is interpreted to mean "do not use a confirm listener".
160-
*/
161-
public void setConfirmListener(ConfirmListener listener) {
162-
confirmListener = listener;
141+
public void clearFlowListeners() {
142+
flowListeners.clear();
143+
}
144+
145+
public void addConfirmListener(ConfirmListener listener) {
146+
confirmListeners.add(listener);
147+
}
148+
149+
public boolean removeConfirmListener(ConfirmListener listener) {
150+
return confirmListeners.remove(listener);
151+
}
152+
153+
public void clearConfirmListeners() {
154+
confirmListeners.clear();
163155
}
164156

165157
/** Returns the current default consumer. */
@@ -275,22 +267,7 @@ public void releaseChannelNumber() {
275267
}
276268
return true;
277269
} else if (method instanceof Basic.Return) {
278-
ReturnListener l = getReturnListener();
279-
if (l != null) {
280-
Basic.Return basicReturn = (Basic.Return) method;
281-
try {
282-
l.handleReturn(basicReturn.getReplyCode(),
283-
basicReturn.getReplyText(),
284-
basicReturn.getExchange(),
285-
basicReturn.getRoutingKey(),
286-
(BasicProperties)
287-
command.getContentHeader(),
288-
command.getContentBody());
289-
} catch (Throwable ex) {
290-
_connection.getExceptionHandler().handleReturnListenerException(this,
291-
ex);
292-
}
293-
}
270+
callReturnListeners(command, (Basic.Return) method);
294271
return true;
295272
} else if (method instanceof Channel.Flow) {
296273
Channel.Flow channelFlow = (Channel.Flow) method;
@@ -299,36 +276,13 @@ public void releaseChannelNumber() {
299276
transmit(new Channel.FlowOk(!_blockContent));
300277
_channelMutex.notifyAll();
301278
}
302-
FlowListener l = getFlowListener();
303-
if (l != null) {
304-
try {
305-
l.handleFlow(channelFlow.getActive());
306-
} catch (Throwable ex) {
307-
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
308-
}
309-
}
279+
callFlowListeners(command, channelFlow);
310280
return true;
311281
} else if (method instanceof Basic.Ack) {
312-
Basic.Ack ack = (Basic.Ack) method;
313-
ConfirmListener l = getConfirmListener();
314-
if (l != null) {
315-
try {
316-
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
317-
} catch (Throwable ex) {
318-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
319-
}
320-
}
282+
callConfirmListeners(command, (Basic.Ack) method);
321283
return true;
322284
} else if (method instanceof Basic.Nack) {
323-
Basic.Nack nack = (Basic.Nack) method;
324-
ConfirmListener l = getConfirmListener();
325-
if (l != null) {
326-
try {
327-
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
328-
} catch (Throwable ex) {
329-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
330-
}
331-
}
285+
callConfirmListeners(command, (Basic.Nack) method);
332286
return true;
333287
} else if (method instanceof Basic.RecoverOk) {
334288
for (Consumer callback: _consumers.values()) {
@@ -379,6 +333,51 @@ public void releaseChannelNumber() {
379333
}
380334
}
381335

336+
private void callReturnListeners(Command command, Basic.Return basicReturn) {
337+
try {
338+
for (ReturnListener l : this.returnListeners) {
339+
l.handleReturn(basicReturn.getReplyCode(),
340+
basicReturn.getReplyText(),
341+
basicReturn.getExchange(),
342+
basicReturn.getRoutingKey(),
343+
(BasicProperties) command.getContentHeader(),
344+
command.getContentBody());
345+
}
346+
} catch (Throwable ex) {
347+
_connection.getExceptionHandler().handleReturnListenerException(this, ex);
348+
}
349+
}
350+
351+
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
352+
try {
353+
for (FlowListener l : this.flowListeners) {
354+
l.handleFlow(channelFlow.getActive());
355+
}
356+
} catch (Throwable ex) {
357+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
358+
}
359+
}
360+
361+
private void callConfirmListeners(Command command, Basic.Ack ack) {
362+
try {
363+
for (ConfirmListener l : this.confirmListeners) {
364+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
365+
}
366+
} catch (Throwable ex) {
367+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
368+
}
369+
}
370+
371+
private void callConfirmListeners(Command command, Basic.Nack nack) {
372+
try {
373+
for (ConfirmListener l : this.confirmListeners) {
374+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
375+
}
376+
} catch (Throwable ex) {
377+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
378+
}
379+
}
380+
382381
private void asyncShutdown(Command command) throws IOException {
383382
releaseChannelNumber();
384383
ShutdownSignalException signal = new ShutdownSignalException(false,

test/src/com/rabbitmq/client/test/ConfirmBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ConfirmBase extends BrokerTestCase {
3232
protected void setUp() throws IOException {
3333
super.setUp();
3434
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35-
channel.setConfirmListener(new ConfirmListener() {
35+
channel.addConfirmListener(new ConfirmListener() {
3636
public void handleAck(long seqNo, boolean multiple) {
3737
if (!unconfirmedSet.contains(seqNo)) {
3838
fail("got duplicate ack: " + seqNo);

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static boolean[] expected(String key) {
5757

5858
@Override protected void setUp() throws IOException {
5959
super.setUp();
60-
channel.setReturnListener(new ReturnListener() {
60+
channel.addReturnListener(new ReturnListener() {
6161
public void handleReturn(int replyCode,
6262
String replyText,
6363
String exchange,

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.rabbitmq.client.test.functional;
1919

20-
import com.rabbitmq.client.test.BrokerTestCase;
2120
import com.rabbitmq.client.AMQP;
2221
import com.rabbitmq.client.Channel;
2322
import com.rabbitmq.client.DefaultConsumer;

test/src/com/rabbitmq/client/test/performance/ScalabilityTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private float timeRouting(Channel channel, String[] routingKeys)
284284
boolean mandatory = true;
285285
boolean immdediate = true;
286286
final CountDownLatch latch = new CountDownLatch(params.messageCount);
287-
channel.setReturnListener(new ReturnListener() {
287+
channel.addReturnListener(new ReturnListener() {
288288
public void handleReturn(int replyCode, String replyText,
289289
String exchange, String routingKey,
290290
AMQP.BasicProperties properties, byte[] body) throws IOException {

test/src/com/rabbitmq/client/test/server/Firehose.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ private GetResponse publishGet(String key) throws IOException {
5454

5555
private void checkHeaders(Map<String, Object> pubHeaders) {
5656
assertEquals("test", pubHeaders.get("exchange_name").toString());
57+
@SuppressWarnings("unchecked")
5758
List<Object> routing_keys = (List<Object>) pubHeaders.get("routing_keys");
5859
assertEquals("traced", routing_keys.get(0).toString());
5960
}

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void run() {
6161
Connection conn = connectionFactory.newConnection();
6262
Channel ch = conn.createChannel();
6363
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
64-
ch.setConfirmListener(new ConfirmListener() {
64+
ch.addConfirmListener(new ConfirmListener() {
6565
public void handleAck(long seqNo, boolean multiple) {
6666
if (multiple) {
6767
unconfirmedSet.headSet(seqNo+1).clear();

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,13 @@ public static void main(String[] args) {
117117
}
118118
Thread[] producerThreads = new Thread[producerCount];
119119
Connection[] producerConnections = new Connection[producerCount];
120+
Channel[] producerChannels = new Channel[producerCount];
120121
for (int i = 0; i < producerCount; i++) {
121122
System.out.println("starting producer #" + i);
122123
Connection conn = factory.newConnection();
123124
producerConnections[i] = conn;
124125
Channel channel = conn.createChannel();
126+
producerChannels[i] = channel;
125127
if (producerTxSize > 0) channel.txSelect();
126128
if (confirm >= 0) channel.confirmSelect();
127129
channel.exchangeDeclare(exchangeName, exchangeType);
@@ -130,15 +132,17 @@ public static void main(String[] args) {
130132
1000L * samplingInterval,
131133
rateLimit, minMsgSize, timeLimit,
132134
confirm);
133-
channel.setReturnListener(p);
134-
channel.setConfirmListener(p);
135+
channel.addReturnListener(p);
136+
channel.addConfirmListener(p);
135137
Thread t = new Thread(p);
136138
producerThreads[i] = t;
137139
t.start();
138140
}
139141

140142
for (int i = 0; i < producerCount; i++) {
141143
producerThreads[i].join();
144+
producerChannels[i].clearReturnListeners();
145+
producerChannels[i].clearConfirmListeners();
142146
producerConnections[i].close();
143147
}
144148

0 commit comments

Comments
 (0)