Skip to content

Commit b6c7782

Browse files
author
Hubert Plociniczak
committed
Rename service() on ShutdownListener to shutdownInitiated().
Catch for any exceptions in the notifyListeners() method. Move shutdown listener related methods out of the Connection and Channel to ShutdownNotifier.
1 parent a1065c2 commit b6c7782

File tree

8 files changed

+130
-241
lines changed

8 files changed

+130
-241
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
*
5757
*/
5858

59-
public interface Channel {
59+
public interface Channel extends ShutdownNotifier{
6060
/**
6161
* Retrieve this channel's channel number.
6262
* @return the channel number
@@ -433,38 +433,4 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436-
437-
/**
438-
* Add shutdown listener to the channel
439-
*
440-
* @param listener {@link ShutdownListener} for the channel
441-
*/
442-
void addShutdownListener(ShutdownListener listener);
443-
444-
/**
445-
* Remove shutdown listener for the channel.
446-
*
447-
* @param listener {@link ShutdownListener} to be removed
448-
*/
449-
void removeShutdownListener(ShutdownListener listener);
450-
451-
/**
452-
* Get connection channel shutdown reason.
453-
* Return null if channel is still open.
454-
* @see com.rabbitmq.client.ShutdownCause
455-
* @return shutdown reason if channel is closed
456-
*/
457-
ShutdownSignalException getCloseReason();
458-
459-
/**
460-
* Determine if channel is currently open.
461-
* Will return false if we are currently closing or closed.
462-
* Checking this method should be only for information,
463-
* because of the race conditions - state can change after the call.
464-
* Instead just execute and and try to catch AlreadyClosedException
465-
*
466-
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467-
* @return true when channel is open, false otherwise
468-
*/
469-
boolean isOpen();
470436
}

src/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* Current implementations are thread-safe for code at the client API level,
5050
* and in fact thread-safe internally except for code within RPC calls.
5151
*/
52-
public interface Connection { // rename to AMQPConnection later, this is a temporary name
52+
public interface Connection extends ShutdownNotifier { // rename to AMQPConnection later, this is a temporary name
5353
/**
5454
* Retrieve the host.
5555
* @return the hostname of the peer we're connected to.
@@ -127,40 +127,4 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
127127
* @throws IOException if an I/O problem is encountered
128128
*/
129129
void close(int closeCode, String closeMessage) throws IOException;
130-
131-
/**
132-
* Add connection shutdown listener.
133-
* If the connection is already closed handler is fired immediately
134-
*
135-
* @param listener {@link ShutdownListener} to the connection
136-
*/
137-
void addShutdownListener(ShutdownListener listener);
138-
139-
/**
140-
* Remove shutdown listener for the connection.
141-
*
142-
* @param listener {@link ShutdownListener} to be removed
143-
*/
144-
void removeShutdownListener(ShutdownListener listener);
145-
146-
/**
147-
* Retrieve connection close reason.
148-
*
149-
* @see com.rabbitmq.client.ShutdownCause
150-
* @return information about the cause of closing the connection, or null if connection is still open
151-
*/
152-
ShutdownSignalException getCloseReason();
153-
154-
/**
155-
* Determine whether the connection is currently open.
156-
* Will return false if we are currently closing.
157-
* Checking this method should be only for information,
158-
* because of the race conditions - state can change after the call.
159-
* Instead just execute and try to catch ShutdownSignalException
160-
* and IOException
161-
*
162-
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
163-
* @return true when connection is open, false otherwise
164-
*/
165-
boolean isOpen();
166130
}

src/com/rabbitmq/client/ShutdownListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
public interface ShutdownListener extends EventListener {
66

7-
public void service(ShutdownSignalException cause);
7+
public void shutdownInitiated(ShutdownSignalException cause);
88

99
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Interface for components that are shutdown capable and
5+
* that allow listeners to be added for shutdown signals
6+
* @see ShutdownListener
7+
* @see ShutdownSignalException
8+
*/
9+
public interface ShutdownNotifier {
10+
/**
11+
* Add shutdown listener.
12+
* If the component is already closed, handler is fired immediately
13+
*
14+
* @param listener {@link ShutdownListener} to the component
15+
*/
16+
public void addShutdownListener(ShutdownListener listener);
17+
18+
/**
19+
* Remove shutdown listener for the component.
20+
*
21+
* @param listener {@link ShutdownListener} to be removed
22+
*/
23+
public void removeShutdownListener(ShutdownListener listener);
24+
25+
/**
26+
* Get the shutdown reason object
27+
* @return ShutdownSignalException if component is closed, null otherwise
28+
*/
29+
public ShutdownSignalException getCloseReason();
30+
31+
/**
32+
* Protected API - notify the listeners attached to the component
33+
* @see com.rabbitmq.client.ShutdownListener
34+
*/
35+
public void notifyListeners();
36+
37+
/**
38+
* Determine whether the component is currently open.
39+
* Will return false if we are currently closing.
40+
* Checking this method should be only for information,
41+
* because of the race conditions - state can change after the call.
42+
* Instead just execute and try to catch ShutdownSignalException
43+
* and IOException
44+
*
45+
* @return true when component is open, false otherwise
46+
*/
47+
boolean isOpen();
48+
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.rabbitmq.client.AlreadyClosedException;
3131
import com.rabbitmq.client.Command;
3232
import com.rabbitmq.client.Connection;
33-
import com.rabbitmq.client.ShutdownListener;
3433
import com.rabbitmq.client.ShutdownSignalException;
3534
import com.rabbitmq.utility.BlockingValueOrException;
3635
import com.rabbitmq.utility.SingleShotLinearTimer;
@@ -43,7 +42,7 @@
4342
* @see ChannelN
4443
* @see Connection
4544
*/
46-
public abstract class AMQChannel {
45+
public abstract class AMQChannel extends ShutdownNotifierComponent {
4746
/** The connection this channel is associated with. */
4847
public final AMQConnection _connection;
4948

@@ -55,9 +54,6 @@ public abstract class AMQChannel {
5554

5655
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5756
public RpcContinuation _activeRpc = null;
58-
59-
/** Reason for closing the channel, null if still open */
60-
public volatile ShutdownSignalException _cause;
6157

6258
/**
6359
* Construct a channel on the given connection, with the given channel number.
@@ -173,24 +169,6 @@ public synchronized RpcContinuation nextOutstandingRpc()
173169
return result;
174170
}
175171

176-
/**
177-
* Public API - Indicates whether this channel is in an open state
178-
* @return true if channel is open, false otherwise
179-
*/
180-
public boolean isOpen()
181-
{
182-
return _cause == null;
183-
}
184-
185-
/**
186-
* Public API - Get the reason for closing the channel
187-
* @return object having information about the shutdown, or null if still open
188-
*/
189-
public ShutdownSignalException getCloseReason()
190-
{
191-
return _cause;
192-
}
193-
194172
public void ensureIsOpen()
195173
throws AlreadyClosedException
196174
{
@@ -282,7 +260,7 @@ public void run() {
282260
public void processShutdownSignal(ShutdownSignalException signal) {
283261
synchronized (this) {
284262
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
285-
_cause = signal;
263+
_shutdownCause = signal;
286264
}
287265
RpcContinuation k = nextOutstandingRpc();
288266
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import java.io.EOFException;
2929
import java.io.IOException;
3030
import java.net.SocketException;
31-
import java.util.ArrayList;
32-
import java.util.List;
3331
import java.util.Map;
3432

3533
import com.rabbitmq.client.AMQP;
@@ -41,7 +39,6 @@
4139
import com.rabbitmq.client.ConnectionParameters;
4240
import com.rabbitmq.client.MissedHeartbeatException;
4341
import com.rabbitmq.client.RedirectException;
44-
import com.rabbitmq.client.ShutdownListener;
4542
import com.rabbitmq.client.ShutdownSignalException;
4643
import com.rabbitmq.utility.Utility;
4744

@@ -63,7 +60,7 @@
6360
* int ticket = ch1.accessRequest(realmName);
6461
* </pre>
6562
*/
66-
public class AMQConnection implements Connection {
63+
public class AMQConnection extends ShutdownNotifierComponent implements Connection{
6764
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6865
public static final int HANDSHAKE_TIMEOUT = 10000;
6966

@@ -92,23 +89,12 @@ public class AMQConnection implements Connection {
9289
/** Flag controlling the main driver loop's termination */
9390
public volatile boolean _running = false;
9491

95-
/**
96-
* When this value is null, the connection is in an "open"
97-
* state. When non-null, the connection is in "closed" state, and
98-
* this value indicates the circumstances of the shutdown.
99-
*/
100-
public volatile ShutdownSignalException _shutdownCause = null;
101-
10292
/** Maximum frame length, or zero if no limit is set */
10393
public int _frameMax;
10494

10595
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
10696
public final ExceptionHandler _exceptionHandler;
10797

108-
/** List of all shutdown listeners associated with the connection */
109-
public List<ShutdownListener> listeners
110-
= new ArrayList<ShutdownListener>();
111-
11298
/**
11399
* Protected API - respond, in the driver thread, to a ShutdownSignal.
114100
* @param channelNumber the number of the channel to disconnect
@@ -117,14 +103,6 @@ public final void disconnectChannel(int channelNumber) {
117103
_channelManager.disconnectChannel(channelNumber);
118104
}
119105

120-
/**
121-
* Public API - Determine whether the connection is open
122-
* @return true if haven't yet received shutdown signal, false otherwise
123-
*/
124-
public boolean isOpen() {
125-
return _shutdownCause == null;
126-
}
127-
128106
public void ensureIsOpen()
129107
throws AlreadyClosedException
130108
{
@@ -629,55 +607,6 @@ public void close(int closeCode,
629607
}
630608
notifyListeners();
631609
}
632-
633-
/**
634-
* Private API - notify the listeners attached to this connection
635-
* @see com.rabbitmq.client.ShutdownListener
636-
*/
637-
public void notifyListeners()
638-
{
639-
synchronized(listeners) {
640-
for (ShutdownListener l: listeners)
641-
l.service(getCloseReason());
642-
}
643-
}
644-
645-
/**
646-
* Public API - Add shutdown listener fired when closing the connection
647-
* @see com.rabbitmq.client.Connection#addShutdownListener()
648-
*/
649-
public void addShutdownListener(ShutdownListener listener)
650-
{
651-
652-
boolean closed = false;
653-
synchronized(listeners) {
654-
closed = !isOpen();
655-
listeners.add(listener);
656-
}
657-
if (closed)
658-
listener.service(_shutdownCause);
659-
}
660-
661-
/**
662-
* Public API - Remove shutdown listener for this connection
663-
* Removing only the first found object
664-
* @see com.rabbitmq.client.Connection#removeShutdownListener()
665-
*/
666-
public void removeShutdownListener(ShutdownListener listener)
667-
{
668-
synchronized(listeners) {
669-
listeners.remove(listener);
670-
}
671-
}
672-
673-
/**
674-
* Public API - Get reason for shutdown, or null if open
675-
* @see com.rabbitmq.client.Connection#getShutdownReason()
676-
*/
677-
public ShutdownSignalException getCloseReason()
678-
{
679-
return _shutdownCause;
680-
}
681610

682611
@Override public String toString() {
683612
return "amqp://" + _params.getUserName() + "@" + getHost() + ":" + getPort() + _params.getVirtualHost();

0 commit comments

Comments
 (0)