Skip to content

Commit 43760ae

Browse files
committed
Migrate branch bug18743
1 parent 393c5fa commit 43760ae

22 files changed

+506
-96
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
/*
4+
* Thrown when application tries to perform an action on connection/channel
5+
* which was already closed
6+
*/
7+
public class AlreadyClosedException extends ShutdownSignalException {
8+
public AlreadyClosedException(String s)
9+
{
10+
super(true, true, s);
11+
}
12+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,4 +433,38 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436+
437+
/**
438+
* Add shutdown listener to the channel
439+
*
440+
* @param listener {@link ShutdownListener} for the channel
441+
*/
442+
void addShutdownListener(ShutdownListener listener);
443+
444+
/**
445+
* Remove shutdown listener for the channel.
446+
*
447+
* @param listener {@link ShutdownListener} to be removed
448+
*/
449+
void removeShutdownListener(ShutdownListener listener);
450+
451+
/**
452+
* Get connection channel shutdown reason.
453+
* Return null if channel is still open.
454+
* @see com.rabbitmq.client.ShutdownCause
455+
* @return shutdown reason if channel is closed
456+
*/
457+
ShutdownSignalException getCloseReason();
458+
459+
/**
460+
* Determine if channel is currently open.
461+
* Will return false if we are currently closing or closed.
462+
* Checking this method should be only for information,
463+
* because of the race conditions - state can change after the call.
464+
* Instead just execute and and try to catch AlreadyClosedException
465+
*
466+
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467+
* @return true when channel is open, false otherwise
468+
*/
469+
boolean isOpen();
436470
}

src/com/rabbitmq/client/Connection.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,75 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
121121
Channel createChannel(int channelNumber) throws IOException;
122122

123123
/**
124-
* Close this connection with the given code and message.
125-
* @param closeCode code indicating the reason for closing the connection - see AMQP spec for a list of codes
126-
* @param closeMessage optional message describing the reason for closing the connection
124+
* Close this connection and all its channels.
125+
*
126+
* This method will wait infinitely for all the close operations to
127+
* complete.
128+
*
127129
* @throws IOException if an I/O problem is encountered
128130
*/
129-
void close(int closeCode, String closeMessage) throws IOException;
131+
void close() throws IOException;
132+
133+
/**
134+
* Close this connection and all its channels
135+
*
136+
* This method will wait with the given timeout for all the close
137+
* operations to complete. If timeout is reached then socket is forced
138+
* to close
139+
* @param timeout timeout (in milioseconds) for completing all the close-related operations, use 0 for infinity
140+
* @throws IOException if an I/O problem is encountered
141+
*/
142+
void close(int timeout) throws IOException;
143+
144+
/**
145+
* Abort this connection and all its channels.
146+
*
147+
* This method will force the connection to close. It will silently discard
148+
* any exceptions enountered in close operations
149+
*/
150+
void abort();
151+
152+
/**
153+
* Abort this connection and all its channels.
154+
*
155+
* This method behaves in a similar way as abort(), with the only difference
156+
* that it will wait with a provided timeout for all the close operations to
157+
* complete. If timeout is reached socket is forced to close.
158+
*/
159+
void abort(int timeout);
160+
161+
/**
162+
* Add connection shutdown listener.
163+
* If the connection is already closed handler is fired immediately
164+
*
165+
* @param listener {@link ShutdownListener} to the connection
166+
*/
167+
void addShutdownListener(ShutdownListener listener);
168+
169+
/**
170+
* Remove shutdown listener for the connection.
171+
*
172+
* @param listener {@link ShutdownListener} to be removed
173+
*/
174+
void removeShutdownListener(ShutdownListener listener);
175+
176+
/**
177+
* Retrieve connection close reason.
178+
*
179+
* @see com.rabbitmq.client.ShutdownCause
180+
* @return information about the cause of closing the connection, or null if connection is still open
181+
*/
182+
ShutdownSignalException getCloseReason();
183+
184+
/**
185+
* Determine whether the connection is currently open.
186+
* Will return false if we are currently closing.
187+
* Checking this method should be only for information,
188+
* because of the race conditions - state can change after the call.
189+
* Instead just execute and and try to catch AlreadyClosedException
190+
*
191+
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
192+
* @return true when connection is open, false otherwise
193+
*/
194+
boolean isOpen();
130195
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.EventListener;
4+
5+
public interface ShutdownListener extends EventListener {
6+
7+
public void service(ShutdownSignalException cause);
8+
9+
}

src/com/rabbitmq/client/ShutdownSignalException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Encapsulates a shutdown condition for a connection to an AMQP broker.
2929
*/
3030

31-
public class ShutdownSignalException extends Exception {
31+
public class ShutdownSignalException extends RuntimeException {
3232
/** True if the connection is shut down, or false if this signal refers to a channel */
3333
private final boolean _hardError;
3434

@@ -59,11 +59,13 @@ public ShutdownSignalException(boolean hardError,
5959

6060
/** @return true if this signals a connection error, or false if a channel error */
6161
public boolean isHardError() { return _hardError; }
62+
6263
/** @return true if this exception was caused by explicit application
6364
* action; false if it originated with the broker or as a result
6465
* of detectable non-deliberate application failure
6566
*/
6667
public boolean isInitiatedByApplication() { return _initiatedByApplication; }
68+
6769
/** @return the reason object, if any */
6870
public Object getReason() { return _reason; }
6971

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@
2626
package com.rabbitmq.client.impl;
2727

2828
import java.io.IOException;
29+
import java.util.concurrent.TimeoutException;
2930

31+
import com.rabbitmq.client.AlreadyClosedException;
3032
import com.rabbitmq.client.Command;
3133
import com.rabbitmq.client.Connection;
3234
import com.rabbitmq.client.ShutdownSignalException;
3335
import com.rabbitmq.utility.BlockingValueOrException;
34-
import com.rabbitmq.utility.SingleShotLinearTimer;
3536

3637
/**
3738
* Base class modelling an AMQ channel. Subclasses implement close()
@@ -53,9 +54,9 @@ public abstract class AMQChannel {
5354

5455
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5556
public RpcContinuation _activeRpc = null;
56-
57-
/** Indicates whether this channel is in a state to handle further activity. */
58-
public volatile boolean _isOpen = true;
57+
58+
/** Reason for closing the channel, null if still open */
59+
public volatile ShutdownSignalException _cause;
5960

6061
/**
6162
* Construct a channel on the given connection, with the given channel number.
@@ -117,6 +118,10 @@ public AMQCommand exnWrappingRpc(Method m)
117118
{
118119
try {
119120
return rpc(m);
121+
} catch (AlreadyClosedException ace) {
122+
// Do not wrap it since it means that connection/channel
123+
// was closed in some action in the past
124+
throw ace;
120125
} catch (ShutdownSignalException ex) {
121126
throw wrap(ex);
122127
}
@@ -160,21 +165,36 @@ public synchronized void transmitAndEnqueue(Method m, RpcContinuation k)
160165
transmit(m);
161166
}
162167

163-
public synchronized RpcContinuation nextOutstandingRpc() {
168+
public synchronized RpcContinuation nextOutstandingRpc()
169+
{
164170
RpcContinuation result = _activeRpc;
165171
_activeRpc = null;
166172
return result;
167173
}
168174

169-
public boolean isOpen() {
170-
return _isOpen;
175+
/**
176+
* Public API - Indicates whether this channel is in an open state
177+
* @return true if channel is open, false otherwise
178+
*/
179+
public boolean isOpen()
180+
{
181+
return _cause == null;
182+
}
183+
184+
/**
185+
* Public API - Get the reason for closing the channel
186+
* @return object having information about the shutdown, or null if still open
187+
*/
188+
public ShutdownSignalException getCloseReason()
189+
{
190+
return _cause;
171191
}
172192

173193
public void ensureIsOpen()
174-
throws IllegalStateException
194+
throws AlreadyClosedException
175195
{
176196
if (!isOpen()) {
177-
throw new IllegalStateException("Attempt to use closed channel");
197+
throw new AlreadyClosedException("Attempt to use closed channel");
178198
}
179199
}
180200

@@ -211,34 +231,11 @@ public synchronized void rpc(Method m, RpcContinuation k)
211231
public AMQCommand quiescingRpc(Method m,
212232
int timeoutMillisec,
213233
final AMQCommand timeoutReply)
214-
throws IOException, ShutdownSignalException
234+
throws IOException, ShutdownSignalException, TimeoutException
215235
{
216-
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
236+
TimeoutBlockingRpcContinuation k = new TimeoutBlockingRpcContinuation();
217237
transmitAndEnqueue(m, k);
218-
if (timeoutMillisec != 0) {
219-
SingleShotLinearTimer timer = new SingleShotLinearTimer();
220-
221-
Runnable task = new Runnable() {
222-
public void run() {
223-
// Timed out waiting for reply.
224-
// Simulate a reply.
225-
// TODO: Warn the user somehow??
226-
try {
227-
handleCompleteInboundCommand(timeoutReply);
228-
} catch (IOException ioe) {
229-
// Ignore.
230-
}
231-
}
232-
};
233-
timer.schedule(task, timeoutMillisec);
234-
try {
235-
return k.getReply();
236-
} finally {
237-
timer.cancel();
238-
}
239-
} else {
240-
return k.getReply();
241-
}
238+
return k.getReply(timeoutMillisec);
242239
}
243240

244241
/**
@@ -253,15 +250,15 @@ public void run() {
253250
@Override public String toString() {
254251
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
255252
}
256-
253+
257254
/**
258255
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
259256
* @param signal the signal to handle
260257
*/
261258
public void processShutdownSignal(ShutdownSignalException signal) {
262259
synchronized (this) {
263260
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
264-
_isOpen = false;
261+
_cause = signal;
265262
}
266263
RpcContinuation k = nextOutstandingRpc();
267264
if (k != null) {
@@ -309,4 +306,14 @@ public AMQCommand transformReply(AMQCommand command) {
309306
return command;
310307
}
311308
}
309+
310+
public static class TimeoutBlockingRpcContinuation
311+
extends SimpleBlockingRpcContinuation
312+
{
313+
public AMQCommand getReply(int timeout)
314+
throws ShutdownSignalException, TimeoutException
315+
{
316+
return _blocker.uninterruptibleGetValue(timeout);
317+
}
318+
}
312319
}

0 commit comments

Comments
 (0)