Skip to content

Commit 80b6fa8

Browse files
author
Steve Powell
committed
Added clear*Listeners() to Channel i/f, and implement them. Added to test.
2 parents 99b064e + cfadcfa commit 80b6fa8

File tree

11 files changed

+191
-112
lines changed

11 files changed

+191
-112
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,40 +111,64 @@ public interface Channel extends ShutdownNotifier {
111111
void abort(int closeCode, String closeMessage) throws IOException;
112112

113113
/**
114-
* Return the current {@link ReturnListener}.
115-
* @return an interface to the current return listener
114+
* Add a {@link ReturnListener}.
115+
* @param listener the listener to add
116116
*/
117-
ReturnListener getReturnListener();
117+
void addReturnListener(ReturnListener listener);
118118

119119
/**
120-
* Set the current {@link ReturnListener}.
121-
* @param listener the listener to use, or null indicating "don't use one".
120+
* Remove a {@link ReturnListener}.
121+
* @param listener the listener to remove
122+
* @return <code><b>true</b></code> if the listener was found and removed,
123+
* <code><b>false</b></code> otherwise
122124
*/
123-
void setReturnListener(ReturnListener listener);
125+
boolean removeReturnListener(ReturnListener listener);
124126

125127
/**
126-
* Return the current {@link FlowListener}.
127-
* @return an interface to the current flow listener.
128+
* Remove all {@link ReturnListener}s.
129+
* @return the number of listeners removed
128130
*/
129-
FlowListener getFlowListener();
131+
int clearReturnListeners();
130132

131133
/**
132-
* Set the current {@link FlowListener}.
133-
* @param listener the listener to use, or null indicating "don't use one".
134+
* Add a {@link FlowListener}.
135+
* @param listener the listener to add
134136
*/
135-
void setFlowListener(FlowListener listener);
137+
void addFlowListener(FlowListener listener);
136138

137139
/**
138-
* Return the current {@link ConfirmListener}.
139-
* @return an interface to the current ack listener.
140+
* Remove a {@link FlowListener}.
141+
* @param listener the listener to remove
142+
* @return <code><b>true</b></code> if the listener was found and removed,
143+
* <code><b>false</b></code> otherwise
140144
*/
141-
ConfirmListener getConfirmListener();
145+
boolean removeFlowListener(FlowListener listener);
142146

143147
/**
144-
* Set the current {@link ConfirmListener}.
145-
* @param listener the listener to use, or null indicating "don't use one".
148+
* Remove all {@link FlowListener}s.
149+
* @return the number of listeners removed
146150
*/
147-
void setConfirmListener(ConfirmListener listener);
151+
int clearFlowListeners();
152+
153+
/**
154+
* Add a {@link ConfirmListener}.
155+
* @param listener the listener to add
156+
*/
157+
void addConfirmListener(ConfirmListener listener);
158+
159+
/**
160+
* Remove a {@link ConfirmListener}.
161+
* @param listener the listener to remove
162+
* @return <code><b>true</b></code> if the listener was found and removed,
163+
* <code><b>false</b></code> otherwise
164+
*/
165+
boolean removeConfirmListener(ConfirmListener listener);
166+
167+
/**
168+
* Remove all {@link ConfirmListener}s.
169+
* @return the number of listeners removed
170+
*/
171+
int clearConfirmListeners();
148172

149173
/**
150174
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 137 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
2123
import java.util.Collections;
2224
import java.util.HashMap;
25+
import java.util.LinkedList;
2326
import java.util.Map;
2427
import java.util.concurrent.TimeoutException;
2528

@@ -78,25 +81,19 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7881
public final Map<String, Consumer> _consumers =
7982
Collections.synchronizedMap(new HashMap<String, Consumer>());
8083

81-
/** Reference to the currently-active ReturnListener, or null if there is none.
82-
*/
83-
public volatile ReturnListener returnListener = null;
84-
85-
/** Reference to the currently-active FlowListener, or null if there is none.
86-
*/
87-
public volatile FlowListener flowListener = null;
88-
89-
/** Reference to the currently-active ConfirmListener, or null if there is none.
90-
*/
91-
public volatile ConfirmListener confirmListener = null;
84+
/** Monitor protecting all listeners collections */
85+
private final Object listenersMonitor = new Object();
86+
/** The ReturnListener collection. */
87+
private final Collection<ReturnListener> returnListeners = new LinkedList<ReturnListener>();
88+
/** The FlowListener collection. */
89+
private final Collection<FlowListener> flowListeners = new LinkedList<FlowListener>();
90+
/** The ConfirmListener collection. */
91+
private final Collection<ConfirmListener> confirmListeners = new LinkedList<ConfirmListener>();
9292

93-
/** Sequence number of next published message requiring confirmation.
94-
*/
93+
/** Sequence number of next published message requiring confirmation. */
9594
private long nextPublishSeqNo = 0L;
9695

97-
/** Reference to the currently-active default consumer, or null if there is
98-
* none.
99-
*/
96+
/** The current default consumer, or null if there is none. */
10097
public volatile Consumer defaultConsumer = null;
10198

10299
/**
@@ -123,43 +120,91 @@ public void open() throws IOException {
123120
Utility.use(openOk);
124121
}
125122

126-
/** Returns the current ReturnListener. */
127-
public ReturnListener getReturnListener() {
128-
return returnListener;
123+
/** Returns a readable copy of the current ReturnListener collection. */
124+
private Collection<ReturnListener> getReturnListeners() {
125+
synchronized (listenersMonitor) {
126+
if (returnListeners.isEmpty())
127+
return Collections.emptyList();
128+
return new ArrayList<ReturnListener>(returnListeners);
129+
}
129130
}
130131

131-
/**
132-
* Sets the current ReturnListener.
133-
* A null argument is interpreted to mean "do not use a return listener".
134-
*/
135-
public void setReturnListener(ReturnListener listener) {
136-
returnListener = listener;
132+
public void addReturnListener(ReturnListener listener) {
133+
synchronized (listenersMonitor) {
134+
returnListeners.add(listener);
135+
}
137136
}
138137

139-
/** Returns the current {@link FlowListener}. */
140-
public FlowListener getFlowListener() {
141-
return flowListener;
138+
public boolean removeReturnListener(ReturnListener listener) {
139+
synchronized (listenersMonitor) {
140+
return returnListeners.remove(listener);
141+
}
142142
}
143143

144-
/**
145-
* Sets the current {@link FlowListener}.
146-
* A null argument is interpreted to mean "do not use a flow listener".
147-
*/
148-
public void setFlowListener(FlowListener listener) {
149-
flowListener = listener;
144+
public int clearReturnListeners() {
145+
synchronized(listenersMonitor) {
146+
int result = returnListeners.size();
147+
returnListeners.clear();
148+
return result;
149+
}
150150
}
151151

152-
/** Returns the current {@link ConfirmListener}. */
153-
public ConfirmListener getConfirmListener() {
154-
return confirmListener;
152+
/** Returns a readable copy of the current FlowListener collection. */
153+
private Collection<FlowListener> getFlowListeners() {
154+
synchronized (listenersMonitor) {
155+
if (flowListeners.isEmpty())
156+
return Collections.emptyList();
157+
return new ArrayList<FlowListener>(flowListeners);
158+
}
155159
}
156160

157-
/**
158-
* Sets the current {@link ConfirmListener}.
159-
* A null argument is interpreted to mean "do not use a confirm listener".
160-
*/
161-
public void setConfirmListener(ConfirmListener listener) {
162-
confirmListener = listener;
161+
public void addFlowListener(FlowListener listener) {
162+
synchronized (listenersMonitor) {
163+
flowListeners.add(listener);
164+
}
165+
}
166+
167+
public boolean removeFlowListener(FlowListener listener) {
168+
synchronized (listenersMonitor) {
169+
return flowListeners.remove(listener);
170+
}
171+
}
172+
173+
public int clearFlowListeners() {
174+
synchronized(listenersMonitor) {
175+
int result = flowListeners.size();
176+
flowListeners.clear();
177+
return result;
178+
}
179+
}
180+
181+
/** Returns a readable copy of the current ConfirmListener collection. */
182+
private Collection<ConfirmListener> getConfirmListeners() {
183+
synchronized (listenersMonitor) {
184+
if (confirmListeners.isEmpty())
185+
return Collections.emptyList();
186+
return new ArrayList<ConfirmListener>(confirmListeners);
187+
}
188+
}
189+
190+
public void addConfirmListener(ConfirmListener listener) {
191+
synchronized (listenersMonitor) {
192+
confirmListeners.add(listener);
193+
}
194+
}
195+
196+
public boolean removeConfirmListener(ConfirmListener listener) {
197+
synchronized (listenersMonitor) {
198+
return confirmListeners.remove(listener);
199+
}
200+
}
201+
202+
public int clearConfirmListeners() {
203+
synchronized(listenersMonitor) {
204+
int result = confirmListeners.size();
205+
confirmListeners.clear();
206+
return result;
207+
}
163208
}
164209

165210
/** Returns the current default consumer. */
@@ -275,22 +320,7 @@ public void releaseChannelNumber() {
275320
}
276321
return true;
277322
} else if (method instanceof Basic.Return) {
278-
ReturnListener l = getReturnListener();
279-
if (l != null) {
280-
Basic.Return basicReturn = (Basic.Return) method;
281-
try {
282-
l.handleReturn(basicReturn.getReplyCode(),
283-
basicReturn.getReplyText(),
284-
basicReturn.getExchange(),
285-
basicReturn.getRoutingKey(),
286-
(BasicProperties)
287-
command.getContentHeader(),
288-
command.getContentBody());
289-
} catch (Throwable ex) {
290-
_connection.getExceptionHandler().handleReturnListenerException(this,
291-
ex);
292-
}
293-
}
323+
callReturnListeners(command, (Basic.Return) method);
294324
return true;
295325
} else if (method instanceof Channel.Flow) {
296326
Channel.Flow channelFlow = (Channel.Flow) method;
@@ -299,36 +329,13 @@ public void releaseChannelNumber() {
299329
transmit(new Channel.FlowOk(!_blockContent));
300330
_channelMutex.notifyAll();
301331
}
302-
FlowListener l = getFlowListener();
303-
if (l != null) {
304-
try {
305-
l.handleFlow(channelFlow.getActive());
306-
} catch (Throwable ex) {
307-
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
308-
}
309-
}
332+
callFlowListeners(command, channelFlow);
310333
return true;
311334
} else if (method instanceof Basic.Ack) {
312-
Basic.Ack ack = (Basic.Ack) method;
313-
ConfirmListener l = getConfirmListener();
314-
if (l != null) {
315-
try {
316-
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
317-
} catch (Throwable ex) {
318-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
319-
}
320-
}
335+
callConfirmListeners(command, (Basic.Ack) method);
321336
return true;
322337
} else if (method instanceof Basic.Nack) {
323-
Basic.Nack nack = (Basic.Nack) method;
324-
ConfirmListener l = getConfirmListener();
325-
if (l != null) {
326-
try {
327-
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
328-
} catch (Throwable ex) {
329-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
330-
}
331-
}
338+
callConfirmListeners(command, (Basic.Nack) method);
332339
return true;
333340
} else if (method instanceof Basic.RecoverOk) {
334341
for (Consumer callback: _consumers.values()) {
@@ -379,6 +386,51 @@ public void releaseChannelNumber() {
379386
}
380387
}
381388

389+
private void callReturnListeners(Command command, Basic.Return basicReturn) {
390+
try {
391+
for (ReturnListener l : getReturnListeners()) {
392+
l.handleReturn(basicReturn.getReplyCode(),
393+
basicReturn.getReplyText(),
394+
basicReturn.getExchange(),
395+
basicReturn.getRoutingKey(),
396+
(BasicProperties) command.getContentHeader(),
397+
command.getContentBody());
398+
}
399+
} catch (Throwable ex) {
400+
_connection.getExceptionHandler().handleReturnListenerException(this, ex);
401+
}
402+
}
403+
404+
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
405+
try {
406+
for (FlowListener l : getFlowListeners()) {
407+
l.handleFlow(channelFlow.getActive());
408+
}
409+
} catch (Throwable ex) {
410+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
411+
}
412+
}
413+
414+
private void callConfirmListeners(Command command, Basic.Ack ack) {
415+
try {
416+
for (ConfirmListener l : getConfirmListeners()) {
417+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
418+
}
419+
} catch (Throwable ex) {
420+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
421+
}
422+
}
423+
424+
private void callConfirmListeners(Command command, Basic.Nack nack) {
425+
try {
426+
for (ConfirmListener l : getConfirmListeners()) {
427+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
428+
}
429+
} catch (Throwable ex) {
430+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
431+
}
432+
}
433+
382434
private void asyncShutdown(Command command) throws IOException {
383435
releaseChannelNumber();
384436
ShutdownSignalException signal = new ShutdownSignalException(false,

test/src/com/rabbitmq/client/test/ConfirmBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ConfirmBase extends BrokerTestCase {
3232
protected void setUp() throws IOException {
3333
super.setUp();
3434
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35-
channel.setConfirmListener(new ConfirmListener() {
35+
channel.addConfirmListener(new ConfirmListener() {
3636
public void handleAck(long seqNo, boolean multiple) {
3737
if (!unconfirmedSet.contains(seqNo)) {
3838
fail("got duplicate ack: " + seqNo);

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static boolean[] expected(String key) {
5757

5858
@Override protected void setUp() throws IOException {
5959
super.setUp();
60-
channel.setReturnListener(new ReturnListener() {
60+
channel.addReturnListener(new ReturnListener() {
6161
public void handleReturn(int replyCode,
6262
String replyText,
6363
String exchange,

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.rabbitmq.client.test.functional;
1919

20-
import com.rabbitmq.client.test.BrokerTestCase;
2120
import com.rabbitmq.client.AMQP;
2221
import com.rabbitmq.client.Channel;
2322
import com.rabbitmq.client.DefaultConsumer;

test/src/com/rabbitmq/client/test/functional/TransactionsBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.rabbitmq.client.test.functional;
1919

2020
import com.rabbitmq.client.AMQP;
21-
import com.rabbitmq.client.AlreadyClosedException;
2221
import com.rabbitmq.client.test.BrokerTestCase;
2322
import java.io.IOException;
2423

0 commit comments

Comments
 (0)