Skip to content

Commit 87ed5df

Browse files
author
Steve Powell
committed
Added channel.close response processing even if !isOpen()
o copied handling code in ChannelN.processAsync() to !isOpen() case o improved atomicity of operations in shutdown processing o spelling and comment adjustments
1 parent 24d0764 commit 87ed5df

File tree

6 files changed

+73
-57
lines changed

6 files changed

+73
-57
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,10 @@ public void processShutdownSignal(ShutdownSignalException signal,
259259
boolean notifyRpc) {
260260
try {
261261
synchronized (_channelMutex) {
262-
if (!ignoreClosed)
263-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
264-
if (isOpen())
265-
_shutdownCause = signal;
262+
if (!setShutdownCauseIfOpen(signal)) {
263+
if (!ignoreClosed)
264+
throw new AlreadyClosedException("Attempt to use closed channel", this);
265+
}
266266

267267
_channelMutex.notifyAll();
268268
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,9 @@ public void start()
333333
heartbeat));
334334
// 0.9.1: insist [on not being redirected] is deprecated, but
335335
// still in generated code; just pass a dummy value here
336-
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
337-
"",
338-
false)).getMethod();
336+
_channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
337+
"",
338+
false)).getMethod();
339339
return;
340340
}
341341

@@ -610,11 +610,9 @@ public ShutdownSignalException shutdown(Object reason,
610610
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
611611
reason, this);
612612
sse.initCause(cause);
613-
synchronized (this) {
614-
if (initiatedByApplication)
615-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
616-
if (isOpen())
617-
_shutdownCause = sse;
613+
if (!setShutdownCauseIfOpen(sse)) {
614+
if (initiatedByApplication)
615+
throw new AlreadyClosedException("Attempt to use closed connection", this);
618616
}
619617

620618
// stop any heartbeating

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.util.Collections;
5757
import java.util.HashMap;
5858
import java.util.Map;
59-
import java.util.concurrent.atomic.AtomicLong;
6059
import java.util.concurrent.TimeoutException;
6160

6261

@@ -256,12 +255,8 @@ public void releaseChannelNumber() {
256255
command,
257256
this);
258257
synchronized (_channelMutex) {
259-
try {
260-
processShutdownSignal(signal, true, false);
261-
quiescingTransmit(new Channel.CloseOk());
262-
} finally {
263-
notifyOutstandingRpc(signal);
264-
}
258+
processShutdownSignal(signal, true, true);
259+
quiescingTransmit(new Channel.CloseOk());
265260
}
266261
notifyListeners();
267262
return true;
@@ -358,13 +353,20 @@ public void releaseChannelNumber() {
358353
return false;
359354
}
360355
} else {
361-
// We're in quiescing mode.
356+
// We're in quiescing mode == !isOpen()
362357

363358
if (method instanceof Channel.Close) {
364-
// We're already shutting down, so just send back an ok.
359+
// We are already shutting down, but we cannot assume no Rpc is waiting.
360+
releaseChannelNumber();
361+
ShutdownSignalException signal = new ShutdownSignalException(false,
362+
false,
363+
command,
364+
this);
365365
synchronized (_channelMutex) {
366+
processShutdownSignal(signal, true, true);
366367
quiescingTransmit(new Channel.CloseOk());
367368
}
369+
notifyListeners();
368370
return true;
369371
} else if (method instanceof Channel.CloseOk) {
370372
// We're quiescing, and we see a channel.close-ok:
@@ -422,7 +424,7 @@ public void close(int closeCode,
422424
throws IOException
423425
{
424426
// First, notify all our dependents that we are shutting down.
425-
// This clears _isOpen, so no further work from the
427+
// This clears isOpen(), so no further work from the
426428
// application side will be accepted, and any inbound commands
427429
// will be discarded (unless they're channel.close-oks).
428430
Channel.Close reason = new Channel.Close(closeCode, closeMessage, 0, 0);
@@ -445,8 +447,8 @@ public void close(int closeCode,
445447
}
446448

447449
// Now that we're in quiescing state, channel.close was sent and
448-
// we wait for the reply. We ignore the result. (It's always
449-
// close-ok.)
450+
// we wait for the reply. We ignore the result.
451+
// (It's NOT always close-ok.)
450452
notify = true;
451453
k.getReply(-1);
452454
} catch (TimeoutException ise) {

src/com/rabbitmq/client/impl/Frame.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,12 @@ public DataOutputStream getOutputStream() {
255255

256256
@Override public String toString() {
257257
StringBuffer sb = new StringBuffer();
258-
sb.append("Frame(" + type + ", " + channel + ", ");
258+
sb.append("Frame(type=").append(type).append(", channel=").append(channel).append(", ");
259259
if (accumulator == null) {
260-
sb.append(payload.length + " bytes of payload");
260+
sb.append(payload.length).append(" bytes of payload)");
261261
} else {
262-
sb.append(accumulator.size() + " bytes of accumulator");
262+
sb.append(accumulator.size()).append(" bytes of accumulator)");
263263
}
264-
sb.append(")");
265264
return sb.toString();
266265
}
267266

@@ -339,7 +338,7 @@ else if(value instanceof byte[]) {
339338
acc += 4 + ((byte[])value).length;
340339
}
341340
else if(value instanceof List) {
342-
acc += 4 + arraySize((List)value);
341+
acc += 4 + arraySize((List<?>)value);
343342
}
344343
else if(value == null) {
345344
}
@@ -350,7 +349,7 @@ else if(value == null) {
350349
}
351350

352351
/** Computes the AMQP wire-protocol length of an encoded field-array */
353-
public static long arraySize(List values)
352+
public static long arraySize(List<?> values)
354353
throws UnsupportedEncodingException
355354
{
356355
long acc = 0;

src/com/rabbitmq/client/impl/ShutdownNotifierComponent.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,53 +40,71 @@
4040

4141
public class ShutdownNotifierComponent implements ShutdownNotifier {
4242

43+
/** Monitor for listeners and shutdownCause */
44+
private final Object monitor = new Object();
45+
4346
/** List of all shutdown listeners associated with the component */
44-
public List<ShutdownListener> listeners
45-
= new ArrayList<ShutdownListener>();
47+
private final List<ShutdownListener> shutdownListeners
48+
= new ArrayList<ShutdownListener>();
4649

4750
/**
4851
* When this value is null, the component is in an "open"
4952
* state. When non-null, the component is in "closed" state, and
5053
* this value indicates the circumstances of the shutdown.
5154
*/
52-
public volatile ShutdownSignalException _shutdownCause = null;
55+
private volatile ShutdownSignalException shutdownCause = null;
5356

5457
public void addShutdownListener(ShutdownListener listener)
5558
{
56-
boolean closed = false;
57-
synchronized(listeners) {
58-
closed = !isOpen();
59-
listeners.add(listener);
59+
ShutdownSignalException sse = null;
60+
synchronized(this.monitor) {
61+
sse = this.shutdownCause;
62+
this.shutdownListeners.add(listener);
6063
}
61-
if (closed)
62-
listener.shutdownCompleted(getCloseReason());
64+
if (sse != null) // closed
65+
listener.shutdownCompleted(sse);
6366
}
6467

6568
public ShutdownSignalException getCloseReason() {
66-
return _shutdownCause;
69+
return this.shutdownCause;
6770
}
6871

6972
public void notifyListeners()
7073
{
71-
synchronized(listeners) {
72-
for (ShutdownListener l: listeners)
73-
try {
74-
l.shutdownCompleted(getCloseReason());
75-
} catch (Exception e) {
76-
// FIXME: proper logging
77-
}
74+
ShutdownSignalException sse = null;
75+
ShutdownListener[] sdls = null;
76+
synchronized(this.monitor) {
77+
sdls = this.shutdownListeners
78+
.toArray(new ShutdownListener[this.shutdownListeners.size()]);
79+
sse = this.shutdownCause;
80+
}
81+
for (ShutdownListener l: sdls) {
82+
try {
83+
l.shutdownCompleted(sse);
84+
} catch (Exception e) {
85+
// FIXME: proper logging
86+
}
7887
}
7988
}
8089

8190
public void removeShutdownListener(ShutdownListener listener)
8291
{
83-
synchronized(listeners) {
84-
listeners.remove(listener);
92+
synchronized(this.monitor) {
93+
this.shutdownListeners.remove(listener);
8594
}
8695
}
8796

8897
public boolean isOpen() {
89-
return _shutdownCause == null;
98+
return this.shutdownCause == null;
9099
}
91100

101+
public boolean setShutdownCauseIfOpen(ShutdownSignalException sse) {
102+
synchronized (this.monitor) {
103+
if (isOpen()) {
104+
this.shutdownCause = sse;
105+
return true;
106+
}
107+
return false;
108+
}
109+
}
92110
}

src/com/rabbitmq/utility/BlockingCell.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public synchronized T get() throws InterruptedException {
6565
}
6666
return _value;
6767
}
68-
68+
6969
/**
7070
* Wait for a value, and when one arrives, return it (without clearing it). If there's
7171
* already a value present, there's no need to wait - the existing value is returned.
72-
* If timeout is reached and value hasn't arrived, TimeoutException is thrown
72+
* If timeout is reached and value hasn't arrived, TimeoutException is thrown.
7373
*
74-
* @param timeout timeout in miliseconds. -1 effectively means infinity
74+
* @param timeout timeout in milliseconds. -1 effectively means infinity
7575
* @return the waited-for value
7676
* @throws InterruptedException if this thread is interrupted
7777
*/
@@ -112,8 +112,8 @@ public synchronized T uninterruptibleGet() {
112112
* a value appears or until specified timeout is reached. If timeout is reached,
113113
* TimeoutException it thrown.
114114
* We also use System.nanoTime() to behave correctly when system clock jumps around.
115-
*
116-
* @param timeout timeout in miliseconds. -1 effectively means infinity
115+
*
116+
* @param timeout timeout in milliseconds. -1 effectively means infinity
117117
* @return the waited-for value
118118
*/
119119
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
@@ -141,7 +141,7 @@ public synchronized void set(T newValue) {
141141
}
142142
_value = newValue;
143143
_filled = true;
144-
notify();
144+
notifyAll();
145145
}
146146

147147
/**
@@ -154,7 +154,6 @@ public synchronized boolean setIfUnset(T newValue) {
154154
return false;
155155
}
156156
set(newValue);
157-
_filled = true;
158157
return true;
159158
}
160159
}

0 commit comments

Comments
 (0)