Skip to content

Commit a1065c2

Browse files
committed
Migrate branch bug18061
1 parent 393c5fa commit a1065c2

File tree

8 files changed

+259
-23
lines changed

8 files changed

+259
-23
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
/*
4+
* Thrown when application tries to perform an action on connection/channel
5+
* which was already closed
6+
*/
7+
public class AlreadyClosedException extends ShutdownSignalException {
8+
public AlreadyClosedException(String s)
9+
{
10+
super(true, true, s);
11+
}
12+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,4 +433,38 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436+
437+
/**
438+
* Add shutdown listener to the channel
439+
*
440+
* @param listener {@link ShutdownListener} for the channel
441+
*/
442+
void addShutdownListener(ShutdownListener listener);
443+
444+
/**
445+
* Remove shutdown listener for the channel.
446+
*
447+
* @param listener {@link ShutdownListener} to be removed
448+
*/
449+
void removeShutdownListener(ShutdownListener listener);
450+
451+
/**
452+
* Get connection channel shutdown reason.
453+
* Return null if channel is still open.
454+
* @see com.rabbitmq.client.ShutdownCause
455+
* @return shutdown reason if channel is closed
456+
*/
457+
ShutdownSignalException getCloseReason();
458+
459+
/**
460+
* Determine if channel is currently open.
461+
* Will return false if we are currently closing or closed.
462+
* Checking this method should be only for information,
463+
* because of the race conditions - state can change after the call.
464+
* Instead just execute and and try to catch AlreadyClosedException
465+
*
466+
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467+
* @return true when channel is open, false otherwise
468+
*/
469+
boolean isOpen();
436470
}

src/com/rabbitmq/client/Connection.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,40 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
127127
* @throws IOException if an I/O problem is encountered
128128
*/
129129
void close(int closeCode, String closeMessage) throws IOException;
130+
131+
/**
132+
* Add connection shutdown listener.
133+
* If the connection is already closed handler is fired immediately
134+
*
135+
* @param listener {@link ShutdownListener} to the connection
136+
*/
137+
void addShutdownListener(ShutdownListener listener);
138+
139+
/**
140+
* Remove shutdown listener for the connection.
141+
*
142+
* @param listener {@link ShutdownListener} to be removed
143+
*/
144+
void removeShutdownListener(ShutdownListener listener);
145+
146+
/**
147+
* Retrieve connection close reason.
148+
*
149+
* @see com.rabbitmq.client.ShutdownCause
150+
* @return information about the cause of closing the connection, or null if connection is still open
151+
*/
152+
ShutdownSignalException getCloseReason();
153+
154+
/**
155+
* Determine whether the connection is currently open.
156+
* Will return false if we are currently closing.
157+
* Checking this method should be only for information,
158+
* because of the race conditions - state can change after the call.
159+
* Instead just execute and try to catch ShutdownSignalException
160+
* and IOException
161+
*
162+
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
163+
* @return true when connection is open, false otherwise
164+
*/
165+
boolean isOpen();
130166
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.EventListener;
4+
5+
public interface ShutdownListener extends EventListener {
6+
7+
public void service(ShutdownSignalException cause);
8+
9+
}

src/com/rabbitmq/client/ShutdownSignalException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Encapsulates a shutdown condition for a connection to an AMQP broker.
2929
*/
3030

31-
public class ShutdownSignalException extends Exception {
31+
public class ShutdownSignalException extends RuntimeException {
3232
/** True if the connection is shut down, or false if this signal refers to a channel */
3333
private final boolean _hardError;
3434

@@ -59,11 +59,13 @@ public ShutdownSignalException(boolean hardError,
5959

6060
/** @return true if this signals a connection error, or false if a channel error */
6161
public boolean isHardError() { return _hardError; }
62+
6263
/** @return true if this exception was caused by explicit application
6364
* action; false if it originated with the broker or as a result
6465
* of detectable non-deliberate application failure
6566
*/
6667
public boolean isInitiatedByApplication() { return _initiatedByApplication; }
68+
6769
/** @return the reason object, if any */
6870
public Object getReason() { return _reason; }
6971

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727

2828
import java.io.IOException;
2929

30+
import com.rabbitmq.client.AlreadyClosedException;
3031
import com.rabbitmq.client.Command;
3132
import com.rabbitmq.client.Connection;
33+
import com.rabbitmq.client.ShutdownListener;
3234
import com.rabbitmq.client.ShutdownSignalException;
3335
import com.rabbitmq.utility.BlockingValueOrException;
3436
import com.rabbitmq.utility.SingleShotLinearTimer;
@@ -53,9 +55,9 @@ public abstract class AMQChannel {
5355

5456
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5557
public RpcContinuation _activeRpc = null;
56-
57-
/** Indicates whether this channel is in a state to handle further activity. */
58-
public volatile boolean _isOpen = true;
58+
59+
/** Reason for closing the channel, null if still open */
60+
public volatile ShutdownSignalException _cause;
5961

6062
/**
6163
* Construct a channel on the given connection, with the given channel number.
@@ -117,6 +119,10 @@ public AMQCommand exnWrappingRpc(Method m)
117119
{
118120
try {
119121
return rpc(m);
122+
} catch (AlreadyClosedException ace) {
123+
// Do not wrap it since it means that connection/channel
124+
// was closed in some action in the past
125+
throw ace;
120126
} catch (ShutdownSignalException ex) {
121127
throw wrap(ex);
122128
}
@@ -160,21 +166,36 @@ public synchronized void transmitAndEnqueue(Method m, RpcContinuation k)
160166
transmit(m);
161167
}
162168

163-
public synchronized RpcContinuation nextOutstandingRpc() {
169+
public synchronized RpcContinuation nextOutstandingRpc()
170+
{
164171
RpcContinuation result = _activeRpc;
165172
_activeRpc = null;
166173
return result;
167174
}
168175

169-
public boolean isOpen() {
170-
return _isOpen;
176+
/**
177+
* Public API - Indicates whether this channel is in an open state
178+
* @return true if channel is open, false otherwise
179+
*/
180+
public boolean isOpen()
181+
{
182+
return _cause == null;
183+
}
184+
185+
/**
186+
* Public API - Get the reason for closing the channel
187+
* @return object having information about the shutdown, or null if still open
188+
*/
189+
public ShutdownSignalException getCloseReason()
190+
{
191+
return _cause;
171192
}
172193

173194
public void ensureIsOpen()
174-
throws IllegalStateException
195+
throws AlreadyClosedException
175196
{
176197
if (!isOpen()) {
177-
throw new IllegalStateException("Attempt to use closed channel");
198+
throw new AlreadyClosedException("Attempt to use closed channel");
178199
}
179200
}
180201

@@ -261,7 +282,7 @@ public void run() {
261282
public void processShutdownSignal(ShutdownSignalException signal) {
262283
synchronized (this) {
263284
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
264-
_isOpen = false;
285+
_cause = signal;
265286
}
266287
RpcContinuation k = nextOutstandingRpc();
267288
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,20 @@
2828
import java.io.EOFException;
2929
import java.io.IOException;
3030
import java.net.SocketException;
31+
import java.util.ArrayList;
32+
import java.util.List;
3133
import java.util.Map;
3234

3335
import com.rabbitmq.client.AMQP;
3436
import com.rabbitmq.client.Address;
37+
import com.rabbitmq.client.AlreadyClosedException;
3538
import com.rabbitmq.client.Channel;
3639
import com.rabbitmq.client.Command;
3740
import com.rabbitmq.client.Connection;
3841
import com.rabbitmq.client.ConnectionParameters;
3942
import com.rabbitmq.client.MissedHeartbeatException;
4043
import com.rabbitmq.client.RedirectException;
44+
import com.rabbitmq.client.ShutdownListener;
4145
import com.rabbitmq.client.ShutdownSignalException;
4246
import com.rabbitmq.utility.Utility;
4347

@@ -100,7 +104,11 @@ public class AMQConnection implements Connection {
100104

101105
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
102106
public final ExceptionHandler _exceptionHandler;
103-
107+
108+
/** List of all shutdown listeners associated with the connection */
109+
public List<ShutdownListener> listeners
110+
= new ArrayList<ShutdownListener>();
111+
104112
/**
105113
* Protected API - respond, in the driver thread, to a ShutdownSignal.
106114
* @param channelNumber the number of the channel to disconnect
@@ -109,15 +117,19 @@ public final void disconnectChannel(int channelNumber) {
109117
_channelManager.disconnectChannel(channelNumber);
110118
}
111119

120+
/**
121+
* Public API - Determine whether the connection is open
122+
* @return true if haven't yet received shutdown signal, false otherwise
123+
*/
112124
public boolean isOpen() {
113125
return _shutdownCause == null;
114126
}
115127

116128
public void ensureIsOpen()
117-
throws IllegalStateException
129+
throws AlreadyClosedException
118130
{
119131
if (!isOpen()) {
120-
throw new IllegalStateException("Attempt to use closed connection");
132+
throw new AlreadyClosedException("Attempt to use closed connection");
121133
}
122134
}
123135

@@ -340,15 +352,13 @@ public Address[] open(final ConnectionParameters params, boolean insist)
340352
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
341353
_frameHandler.sendHeader();
342354

343-
if (!isOpen()) {
344-
// See bug 17389. The MainLoop could have shut down already in
345-
// which case we don't want to wait forever for a reply.
346-
347-
// There is no race if the MainLoop shuts down after enqueuing
348-
// the RPC because if that happens the channel will correctly
349-
// pass the exception into RPC, waking it up.
350-
throw _shutdownCause;
351-
}
355+
// See bug 17389. The MainLoop could have shut down already in
356+
// which case we don't want to wait forever for a reply.
357+
358+
// There is no race if the MainLoop shuts down after enqueuing
359+
// the RPC because if that happens the channel will correctly
360+
// pass the exception into RPC, waking it up.
361+
ensureIsOpen();
352362

353363
AMQP.Connection.Start connStart =
354364
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
@@ -561,6 +571,7 @@ public void handleConnectionClose(Command closeCommand) {
561571
Utility.emptyStatement();
562572
}
563573
shutdown(closeCommand, false, null);
574+
notifyListeners();
564575
}
565576

566577
/**
@@ -616,6 +627,56 @@ public void close(int closeCode,
616627
} finally {
617628
_running = false;
618629
}
630+
notifyListeners();
631+
}
632+
633+
/**
634+
* Private API - notify the listeners attached to this connection
635+
* @see com.rabbitmq.client.ShutdownListener
636+
*/
637+
public void notifyListeners()
638+
{
639+
synchronized(listeners) {
640+
for (ShutdownListener l: listeners)
641+
l.service(getCloseReason());
642+
}
643+
}
644+
645+
/**
646+
* Public API - Add shutdown listener fired when closing the connection
647+
* @see com.rabbitmq.client.Connection#addShutdownListener()
648+
*/
649+
public void addShutdownListener(ShutdownListener listener)
650+
{
651+
652+
boolean closed = false;
653+
synchronized(listeners) {
654+
closed = !isOpen();
655+
listeners.add(listener);
656+
}
657+
if (closed)
658+
listener.service(_shutdownCause);
659+
}
660+
661+
/**
662+
* Public API - Remove shutdown listener for this connection
663+
* Removing only the first found object
664+
* @see com.rabbitmq.client.Connection#removeShutdownListener()
665+
*/
666+
public void removeShutdownListener(ShutdownListener listener)
667+
{
668+
synchronized(listeners) {
669+
listeners.remove(listener);
670+
}
671+
}
672+
673+
/**
674+
* Public API - Get reason for shutdown, or null if open
675+
* @see com.rabbitmq.client.Connection#getShutdownReason()
676+
*/
677+
public ShutdownSignalException getCloseReason()
678+
{
679+
return _shutdownCause;
619680
}
620681

621682
@Override public String toString() {

0 commit comments

Comments
 (0)