Skip to content

Commit 0db2e74

Browse files
author
Hubert Plociniczak
committed
Merge default into bug18743
2 parents 43760ae + c60e2e9 commit 0db2e74

File tree

9 files changed

+139
-248
lines changed

9 files changed

+139
-248
lines changed

.hgignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
syntax: glob
2+
*~
3+
4+
syntax: regexp
5+
^build/

src/com/rabbitmq/client/Channel.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
*
5757
*/
5858

59-
public interface Channel {
59+
public interface Channel extends ShutdownNotifier{
6060
/**
6161
* Retrieve this channel's channel number.
6262
* @return the channel number
@@ -433,38 +433,4 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436-
437-
/**
438-
* Add shutdown listener to the channel
439-
*
440-
* @param listener {@link ShutdownListener} for the channel
441-
*/
442-
void addShutdownListener(ShutdownListener listener);
443-
444-
/**
445-
* Remove shutdown listener for the channel.
446-
*
447-
* @param listener {@link ShutdownListener} to be removed
448-
*/
449-
void removeShutdownListener(ShutdownListener listener);
450-
451-
/**
452-
* Get connection channel shutdown reason.
453-
* Return null if channel is still open.
454-
* @see com.rabbitmq.client.ShutdownCause
455-
* @return shutdown reason if channel is closed
456-
*/
457-
ShutdownSignalException getCloseReason();
458-
459-
/**
460-
* Determine if channel is currently open.
461-
* Will return false if we are currently closing or closed.
462-
* Checking this method should be only for information,
463-
* because of the race conditions - state can change after the call.
464-
* Instead just execute and and try to catch AlreadyClosedException
465-
*
466-
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467-
* @return true when channel is open, false otherwise
468-
*/
469-
boolean isOpen();
470436
}

src/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* Current implementations are thread-safe for code at the client API level,
5050
* and in fact thread-safe internally except for code within RPC calls.
5151
*/
52-
public interface Connection { // rename to AMQPConnection later, this is a temporary name
52+
public interface Connection extends ShutdownNotifier { // rename to AMQPConnection later, this is a temporary name
5353
/**
5454
* Retrieve the host.
5555
* @return the hostname of the peer we're connected to.
@@ -157,39 +157,4 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
157157
* complete. If timeout is reached socket is forced to close.
158158
*/
159159
void abort(int timeout);
160-
161-
/**
162-
* Add connection shutdown listener.
163-
* If the connection is already closed handler is fired immediately
164-
*
165-
* @param listener {@link ShutdownListener} to the connection
166-
*/
167-
void addShutdownListener(ShutdownListener listener);
168-
169-
/**
170-
* Remove shutdown listener for the connection.
171-
*
172-
* @param listener {@link ShutdownListener} to be removed
173-
*/
174-
void removeShutdownListener(ShutdownListener listener);
175-
176-
/**
177-
* Retrieve connection close reason.
178-
*
179-
* @see com.rabbitmq.client.ShutdownCause
180-
* @return information about the cause of closing the connection, or null if connection is still open
181-
*/
182-
ShutdownSignalException getCloseReason();
183-
184-
/**
185-
* Determine whether the connection is currently open.
186-
* Will return false if we are currently closing.
187-
* Checking this method should be only for information,
188-
* because of the race conditions - state can change after the call.
189-
* Instead just execute and and try to catch AlreadyClosedException
190-
*
191-
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
192-
* @return true when connection is open, false otherwise
193-
*/
194-
boolean isOpen();
195160
}

src/com/rabbitmq/client/ShutdownListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
public interface ShutdownListener extends EventListener {
66

7-
public void service(ShutdownSignalException cause);
7+
public void shutdownCompleted(ShutdownSignalException cause);
88

99
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Interface for components that are shutdown capable and
5+
* that allow listeners to be added for shutdown signals
6+
*
7+
* @see ShutdownListener
8+
* @see ShutdownSignalException
9+
*/
10+
public interface ShutdownNotifier {
11+
/**
12+
* Add shutdown listener.
13+
* If the component is already closed, handler is fired immediately
14+
*
15+
* @param listener {@link ShutdownListener} to the component
16+
*/
17+
public void addShutdownListener(ShutdownListener listener);
18+
19+
/**
20+
* Remove shutdown listener for the component.
21+
*
22+
* @param listener {@link ShutdownListener} to be removed
23+
*/
24+
public void removeShutdownListener(ShutdownListener listener);
25+
26+
/**
27+
* Get the shutdown reason object
28+
* @return ShutdownSignalException if component is closed, null otherwise
29+
*/
30+
public ShutdownSignalException getCloseReason();
31+
32+
/**
33+
* Protected API - notify the listeners attached to the component
34+
* @see com.rabbitmq.client.ShutdownListener
35+
*/
36+
public void notifyListeners();
37+
38+
/**
39+
* Determine whether the component is currently open.
40+
* Will return false if we are currently closing.
41+
* Checking this method should be only for information,
42+
* because of the race conditions - state can change after the call.
43+
* Instead just execute and try to catch ShutdownSignalException
44+
* and IOException
45+
*
46+
* @return true when component is open, false otherwise
47+
*/
48+
boolean isOpen();
49+
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* @see ChannelN
4343
* @see Connection
4444
*/
45-
public abstract class AMQChannel {
45+
public abstract class AMQChannel extends ShutdownNotifierComponent {
4646
/** The connection this channel is associated with. */
4747
public final AMQConnection _connection;
4848

@@ -54,9 +54,6 @@ public abstract class AMQChannel {
5454

5555
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5656
public RpcContinuation _activeRpc = null;
57-
58-
/** Reason for closing the channel, null if still open */
59-
public volatile ShutdownSignalException _cause;
6057

6158
/**
6259
* Construct a channel on the given connection, with the given channel number.
@@ -172,24 +169,6 @@ public synchronized RpcContinuation nextOutstandingRpc()
172169
return result;
173170
}
174171

175-
/**
176-
* Public API - Indicates whether this channel is in an open state
177-
* @return true if channel is open, false otherwise
178-
*/
179-
public boolean isOpen()
180-
{
181-
return _cause == null;
182-
}
183-
184-
/**
185-
* Public API - Get the reason for closing the channel
186-
* @return object having information about the shutdown, or null if still open
187-
*/
188-
public ShutdownSignalException getCloseReason()
189-
{
190-
return _cause;
191-
}
192-
193172
public void ensureIsOpen()
194173
throws AlreadyClosedException
195174
{
@@ -250,15 +229,15 @@ public AMQCommand quiescingRpc(Method m,
250229
@Override public String toString() {
251230
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
252231
}
253-
232+
254233
/**
255234
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
256235
* @param signal the signal to handle
257236
*/
258237
public void processShutdownSignal(ShutdownSignalException signal) {
259238
synchronized (this) {
260239
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
261-
_cause = signal;
240+
_shutdownCause = signal;
262241
}
263242
RpcContinuation k = nextOutstandingRpc();
264243
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,9 @@
2727
import java.io.EOFException;
2828
import java.io.IOException;
2929
import java.net.SocketException;
30-
import java.util.Collections;
31-
import java.util.LinkedList;
32-
import java.util.List;
3330
import java.util.Map;
3431
import java.util.concurrent.TimeoutException;
3532

36-
import java.io.EOFException;
37-
import java.io.IOException;
38-
import java.net.SocketException;
39-
import java.util.Map;
40-
4133
import com.rabbitmq.client.AMQP;
4234
import com.rabbitmq.client.Address;
4335
import com.rabbitmq.client.AlreadyClosedException;
@@ -47,7 +39,6 @@
4739
import com.rabbitmq.client.ConnectionParameters;
4840
import com.rabbitmq.client.MissedHeartbeatException;
4941
import com.rabbitmq.client.RedirectException;
50-
import com.rabbitmq.client.ShutdownListener;
5142
import com.rabbitmq.client.ShutdownSignalException;
5243
import com.rabbitmq.utility.BlockingCell;
5344
import com.rabbitmq.utility.Utility;
@@ -70,13 +61,12 @@
7061
* int ticket = ch1.accessRequest(realmName);
7162
* </pre>
7263
*/
73-
public class AMQConnection implements Connection {
64+
public class AMQConnection extends ShutdownNotifierComponent implements Connection{
7465
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
7566
public static final int HANDSHAKE_TIMEOUT = 10000;
76-
67+
7768
/** Timeout used while waiting for a connection.close-ok (milliseconds) */
7869
public static final int CONNECTION_CLOSING_TIMEOUT = 10000;
79-
8070

8171
private static final Version clientVersion =
8272
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);
@@ -100,23 +90,12 @@ public class AMQConnection implements Connection {
10090
/** Flag controlling the main driver loop's termination */
10191
public volatile boolean _running = false;
10292

103-
/**
104-
* When this value is null, the connection is in an "open"
105-
* state. When non-null, the connection is in "closed" state, and
106-
* this value indicates the circumstances of the shutdown.
107-
*/
108-
public volatile ShutdownSignalException _shutdownCause = null;
109-
11093
/** Maximum frame length, or zero if no limit is set */
11194
public int _frameMax;
11295

11396
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
11497
public final ExceptionHandler _exceptionHandler;
11598

116-
/** List of all shutdown listeners associated with the connection */
117-
public List<ShutdownListener> listeners
118-
= Collections.synchronizedList(new LinkedList<ShutdownListener>());
119-
12099
public BlockingCell<Object> appContinuation = new BlockingCell<Object>();
121100

122101
/**
@@ -127,14 +106,6 @@ public final void disconnectChannel(int channelNumber) {
127106
_channelManager.disconnectChannel(channelNumber);
128107
}
129108

130-
/**
131-
* Public API - Determine whether the connection is open
132-
* @return true if haven't yet received shutdown signal, false otherwise
133-
*/
134-
public boolean isOpen() {
135-
return _shutdownCause == null;
136-
}
137-
138109
public void ensureIsOpen()
139110
throws AlreadyClosedException
140111
{
@@ -492,7 +463,7 @@ public MainLoop() {
492463
shutdown(ex, false, ex);
493464
}
494465
}
495-
466+
496467
// Finally, shut down our underlying data connection.
497468
_frameHandler.close();
498469

@@ -724,55 +695,6 @@ public void close(int closeCode,
724695
}
725696
notifyListeners();
726697
}
727-
728-
/**
729-
* Private API - notify the listeners attached to this connection
730-
* @see com.rabbitmq.client.ShutdownListener
731-
*/
732-
public void notifyListeners()
733-
{
734-
synchronized(listeners) {
735-
for (ShutdownListener l: listeners)
736-
l.service(getCloseReason());
737-
}
738-
}
739-
740-
/**
741-
* Public API - Add shutdown listener fired when closing the connection
742-
* @see com.rabbitmq.client.Connection#addShutdownListener()
743-
*/
744-
public void addShutdownListener(ShutdownListener listener)
745-
{
746-
747-
boolean closed = false;
748-
synchronized(listeners) {
749-
closed = !isOpen();
750-
listeners.add(listener);
751-
}
752-
if (closed)
753-
listener.service(_shutdownCause);
754-
}
755-
756-
/**
757-
* Public API - Remove shutdown listener for this connection
758-
* Removing only the first found object
759-
* @see com.rabbitmq.client.Connection#removeShutdownListener()
760-
*/
761-
public void removeShutdownListener(ShutdownListener listener)
762-
{
763-
synchronized(listeners) {
764-
listeners.remove(listener);
765-
}
766-
}
767-
768-
/**
769-
* Public API - Get reason for shutdown, or null if open
770-
* @see com.rabbitmq.client.Connection#getShutdownReason()
771-
*/
772-
public ShutdownSignalException getCloseReason()
773-
{
774-
return _shutdownCause;
775-
}
776698

777699
@Override public String toString() {
778700
return "amqp://" + _params.getUserName() + "@" + getHost() + ":" + getPort() + _params.getVirtualHost();

0 commit comments

Comments
 (0)