Skip to content

Commit c4b9bc2

Browse files
author
Steve Powell
committed
Replace monitored LinkedLists Listener collections by CopyOnWriteArrays, and adjust interface.
1 parent 80b6fa8 commit c4b9bc2

File tree

2 files changed

+27
-83
lines changed

2 files changed

+27
-83
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,8 @@ public interface Channel extends ShutdownNotifier {
126126

127127
/**
128128
* Remove all {@link ReturnListener}s.
129-
* @return the number of listeners removed
130129
*/
131-
int clearReturnListeners();
130+
void clearReturnListeners();
132131

133132
/**
134133
* Add a {@link FlowListener}.
@@ -146,9 +145,8 @@ public interface Channel extends ShutdownNotifier {
146145

147146
/**
148147
* Remove all {@link FlowListener}s.
149-
* @return the number of listeners removed
150148
*/
151-
int clearFlowListeners();
149+
void clearFlowListeners();
152150

153151
/**
154152
* Add a {@link ConfirmListener}.
@@ -166,9 +164,8 @@ public interface Channel extends ShutdownNotifier {
166164

167165
/**
168166
* Remove all {@link ConfirmListener}s.
169-
* @return the number of listeners removed
170167
*/
171-
int clearConfirmListeners();
168+
void clearConfirmListeners();
172169

173170
/**
174171
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 24 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
2221
import java.util.Collection;
2322
import java.util.Collections;
2423
import java.util.HashMap;
25-
import java.util.LinkedList;
2624
import java.util.Map;
25+
import java.util.concurrent.CopyOnWriteArrayList;
2726
import java.util.concurrent.TimeoutException;
2827

2928
import com.rabbitmq.client.AMQP;
@@ -81,14 +80,13 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8180
public final Map<String, Consumer> _consumers =
8281
Collections.synchronizedMap(new HashMap<String, Consumer>());
8382

84-
/** Monitor protecting all listeners collections */
85-
private final Object listenersMonitor = new Object();
86-
/** The ReturnListener collection. */
87-
private final Collection<ReturnListener> returnListeners = new LinkedList<ReturnListener>();
88-
/** The FlowListener collection. */
89-
private final Collection<FlowListener> flowListeners = new LinkedList<FlowListener>();
90-
/** The ConfirmListener collection. */
91-
private final Collection<ConfirmListener> confirmListeners = new LinkedList<ConfirmListener>();
83+
/* All listeners collections are in CopyOnWriteArrayList objects */
84+
/** The ReturnListener collection. */
85+
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
86+
/** The FlowListener collection. */
87+
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
88+
/** The ConfirmListener collection. */
89+
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
9290

9391
/** Sequence number of next published message requiring confirmation. */
9492
private long nextPublishSeqNo = 0L;
@@ -120,91 +118,40 @@ public void open() throws IOException {
120118
Utility.use(openOk);
121119
}
122120

123-
/** Returns a readable copy of the current ReturnListener collection. */
124-
private Collection<ReturnListener> getReturnListeners() {
125-
synchronized (listenersMonitor) {
126-
if (returnListeners.isEmpty())
127-
return Collections.emptyList();
128-
return new ArrayList<ReturnListener>(returnListeners);
129-
}
130-
}
131-
132121
public void addReturnListener(ReturnListener listener) {
133-
synchronized (listenersMonitor) {
134-
returnListeners.add(listener);
135-
}
122+
returnListeners.add(listener);
136123
}
137124

138125
public boolean removeReturnListener(ReturnListener listener) {
139-
synchronized (listenersMonitor) {
140-
return returnListeners.remove(listener);
141-
}
126+
return returnListeners.remove(listener);
142127
}
143128

144-
public int clearReturnListeners() {
145-
synchronized(listenersMonitor) {
146-
int result = returnListeners.size();
147-
returnListeners.clear();
148-
return result;
149-
}
150-
}
151-
152-
/** Returns a readable copy of the current FlowListener collection. */
153-
private Collection<FlowListener> getFlowListeners() {
154-
synchronized (listenersMonitor) {
155-
if (flowListeners.isEmpty())
156-
return Collections.emptyList();
157-
return new ArrayList<FlowListener>(flowListeners);
158-
}
129+
public void clearReturnListeners() {
130+
returnListeners.clear();
159131
}
160132

161133
public void addFlowListener(FlowListener listener) {
162-
synchronized (listenersMonitor) {
163-
flowListeners.add(listener);
164-
}
134+
flowListeners.add(listener);
165135
}
166136

167137
public boolean removeFlowListener(FlowListener listener) {
168-
synchronized (listenersMonitor) {
169-
return flowListeners.remove(listener);
170-
}
138+
return flowListeners.remove(listener);
171139
}
172140

173-
public int clearFlowListeners() {
174-
synchronized(listenersMonitor) {
175-
int result = flowListeners.size();
176-
flowListeners.clear();
177-
return result;
178-
}
179-
}
180-
181-
/** Returns a readable copy of the current ConfirmListener collection. */
182-
private Collection<ConfirmListener> getConfirmListeners() {
183-
synchronized (listenersMonitor) {
184-
if (confirmListeners.isEmpty())
185-
return Collections.emptyList();
186-
return new ArrayList<ConfirmListener>(confirmListeners);
187-
}
141+
public void clearFlowListeners() {
142+
flowListeners.clear();
188143
}
189144

190145
public void addConfirmListener(ConfirmListener listener) {
191-
synchronized (listenersMonitor) {
192-
confirmListeners.add(listener);
193-
}
146+
confirmListeners.add(listener);
194147
}
195148

196149
public boolean removeConfirmListener(ConfirmListener listener) {
197-
synchronized (listenersMonitor) {
198-
return confirmListeners.remove(listener);
199-
}
150+
return confirmListeners.remove(listener);
200151
}
201152

202-
public int clearConfirmListeners() {
203-
synchronized(listenersMonitor) {
204-
int result = confirmListeners.size();
205-
confirmListeners.clear();
206-
return result;
207-
}
153+
public void clearConfirmListeners() {
154+
confirmListeners.clear();
208155
}
209156

210157
/** Returns the current default consumer. */
@@ -388,7 +335,7 @@ public void releaseChannelNumber() {
388335

389336
private void callReturnListeners(Command command, Basic.Return basicReturn) {
390337
try {
391-
for (ReturnListener l : getReturnListeners()) {
338+
for (ReturnListener l : this.returnListeners) {
392339
l.handleReturn(basicReturn.getReplyCode(),
393340
basicReturn.getReplyText(),
394341
basicReturn.getExchange(),
@@ -403,7 +350,7 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
403350

404351
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
405352
try {
406-
for (FlowListener l : getFlowListeners()) {
353+
for (FlowListener l : this.flowListeners) {
407354
l.handleFlow(channelFlow.getActive());
408355
}
409356
} catch (Throwable ex) {
@@ -413,7 +360,7 @@ private void callFlowListeners(Command command, Channel.Flow channelFlow) {
413360

414361
private void callConfirmListeners(Command command, Basic.Ack ack) {
415362
try {
416-
for (ConfirmListener l : getConfirmListeners()) {
363+
for (ConfirmListener l : this.confirmListeners) {
417364
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
418365
}
419366
} catch (Throwable ex) {
@@ -423,7 +370,7 @@ private void callConfirmListeners(Command command, Basic.Ack ack) {
423370

424371
private void callConfirmListeners(Command command, Basic.Nack nack) {
425372
try {
426-
for (ConfirmListener l : getConfirmListeners()) {
373+
for (ConfirmListener l : this.confirmListeners) {
427374
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
428375
}
429376
} catch (Throwable ex) {

0 commit comments

Comments
 (0)