Skip to content

Commit bee8083

Browse files
committed
Merged default into 18776
2 parents a60b6e9 + a456b7d commit bee8083

File tree

9 files changed

+173
-88
lines changed

9 files changed

+173
-88
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -198,19 +198,6 @@ void basicPublish(int ticket, String exchange, String routingKey, boolean mandat
198198
*/
199199
Exchange.DeclareOk exchangeDeclare(int ticket, String exchange, String type, boolean durable) throws IOException;
200200

201-
/**
202-
* Actively declare a non-exclusive, non-autodelete queue
203-
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
204-
* @see com.rabbitmq.client.AMQP.Queue.Declare
205-
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
206-
* @param ticket an access ticket for the appropriate realm
207-
* @param queue the name of the queue
208-
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
209-
* @return a declaration-confirm method to indicate the exchange was successfully declared
210-
* @throws java.io.IOException if an error is encountered
211-
*/
212-
Queue.DeclareOk queueDeclare(int ticket, String queue, boolean durable) throws IOException;
213-
214201
/**
215202
* Declare an exchange, via an interface that allows the complete set of arguments
216203
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
@@ -250,6 +237,19 @@ Exchange.DeclareOk exchangeDeclare(int ticket, String exchange, String type, boo
250237
* @throws java.io.IOException if an error is encountered
251238
*/
252239
Queue.DeclareOk queueDeclare(int ticket, String queue) throws IOException;
240+
241+
/**
242+
* Actively declare a non-exclusive, non-autodelete queue
243+
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
244+
* @see com.rabbitmq.client.AMQP.Queue.Declare
245+
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
246+
* @param ticket an access ticket for the appropriate realm
247+
* @param queue the name of the queue
248+
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
249+
* @return a declaration-confirm method to indicate the exchange was successfully declared
250+
* @throws java.io.IOException if an error is encountered
251+
*/
252+
Queue.DeclareOk queueDeclare(int ticket, String queue, boolean durable) throws IOException;
253253

254254
/**
255255
* Declare a queue
@@ -318,6 +318,33 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
318318
* @throws java.io.IOException if an error is encountered
319319
*/
320320
Queue.BindOk queueBind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
321+
322+
/**
323+
* Uninds a queue from an exchange, with no extra arguments.
324+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
325+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
326+
* @param ticket an access ticket for the appropriate realm
327+
* @param queue the name of the queue
328+
* @param exchange the name of the exchange
329+
* @param routingKey the routine key to use for the binding
330+
* @return an unbinding-confirm method if the binding was successfully deleted
331+
* @throws java.io.IOException if an error is encountered
332+
*/
333+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey) throws IOException;
334+
335+
/**
336+
* Unbind a queue from an exchange.
337+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
338+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
339+
* @param ticket an access ticket for the appropriate realm
340+
* @param queue the name of the queue
341+
* @param exchange the name of the exchange
342+
* @param routingKey the routine key to use for the binding
343+
* @param arguments other properties (binding parameters)
344+
* @return an unbinding-confirm method if the binding was successfully deleted
345+
* @throws java.io.IOException if an error is encountered
346+
*/
347+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
321348

322349
/**
323350
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,17 +215,29 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
215215
@Override public String toString() {
216216
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
217217
}
218-
218+
219219
/**
220220
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
221221
* @param signal the signal to handle
222+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
223+
* thrown when the channel is already closed
224+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
225+
* notified with the given signal
222226
*/
223-
public void processShutdownSignal(ShutdownSignalException signal) {
224-
synchronized (this) {
225-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
226-
_shutdownCause = signal;
227+
public void processShutdownSignal(ShutdownSignalException signal,
228+
boolean ignoreClosed,
229+
boolean notifyRpc) {
230+
try {
231+
synchronized (this) {
232+
if (!ignoreClosed)
233+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234+
if (isOpen())
235+
_shutdownCause = signal;
236+
}
237+
} finally {
238+
if (notifyRpc)
239+
notifyOutstandingRpc(signal);
227240
}
228-
notifyOutstandingRpc(signal);
229241
}
230242

231243
public void notifyOutstandingRpc(ShutdownSignalException signal) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
103103
*/
104104
public BlockingCell<Object> _appContinuation = new BlockingCell<Object>();
105105

106+
/** Flag indicating whether the client received Connection.Close message from the broker */
107+
public boolean _brokerInitiatedShutdown = false;
108+
106109
/**
107110
* Protected API - respond, in the driver thread, to a ShutdownSignal.
108111
* @param channelNumber the number of the channel to disconnect
@@ -191,6 +194,7 @@ public AMQConnection(ConnectionParameters params,
191194
_missedHeartbeats = 0;
192195
_heartbeat = 0;
193196
_exceptionHandler = exceptionHandler;
197+
_brokerInitiatedShutdown = false;
194198

195199
new MainLoop(); // start the main loop going
196200

@@ -457,26 +461,15 @@ public MainLoop() {
457461
}
458462
}
459463
} catch (EOFException ex) {
460-
if (isOpen()) {
461-
System.err.println("AMQConnection.mainLoop: connection close");
462-
shutdown(ex, false, ex);
463-
}
464+
if (!_brokerInitiatedShutdown)
465+
shutdown(ex, false, ex, true);
464466
} catch (Throwable ex) {
465467
_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this,
466468
ex);
467-
if (isOpen()) {
468-
shutdown(ex, false, ex);
469-
}
469+
shutdown(ex, false, ex, true);
470470
} finally {
471471
// Finally, shut down our underlying data connection.
472472
_frameHandler.close();
473-
474-
// Set shutdown exception for any outstanding rpc,
475-
// so that it does not wait infinitely for Connection.CloseOk.
476-
// This can only happen when the broker closed the socket
477-
// unexpectedly.
478-
_channel0.notifyOutstandingRpc(_shutdownCause);
479-
480473
_appContinuation.set(null);
481474
notifyListeners();
482475
}
@@ -562,18 +555,22 @@ public boolean processControlCommand(Command c)
562555
}
563556

564557
public void handleConnectionClose(Command closeCommand) {
565-
shutdown(closeCommand, false, null);
558+
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
566559
try {
567560
_channel0.quiescingTransmit(new AMQImpl.Connection.CloseOk());
568561
} catch (IOException ioe) {
569562
Utility.emptyStatement();
570563
}
571-
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
572-
new SocketCloseWait();
564+
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
565+
_brokerInitiatedShutdown = true;
566+
new SocketCloseWait(sse);
573567
}
574568

575569
private class SocketCloseWait extends Thread {
576-
public SocketCloseWait() {
570+
private ShutdownSignalException cause;
571+
572+
public SocketCloseWait(ShutdownSignalException sse) {
573+
cause = sse;
577574
start();
578575
}
579576

@@ -586,6 +583,7 @@ public SocketCloseWait() {
586583
_frameHandler.close();
587584
} finally {
588585
_running = false;
586+
_channel0.notifyOutstandingRpc(cause);
589587
}
590588
}
591589
}
@@ -594,27 +592,26 @@ public SocketCloseWait() {
594592
* Protected API - causes all attached channels to terminate with
595593
* a ShutdownSignal built from the argument, and stops this
596594
* connection from accepting further work from the application.
595+
*
596+
* @return a shutdown signal built using the given arguments
597597
*/
598-
public void shutdown(Object reason,
598+
public ShutdownSignalException shutdown(Object reason,
599599
boolean initiatedByApplication,
600-
Throwable cause)
600+
Throwable cause,
601+
boolean notifyRpc)
601602
{
602-
try {
603-
synchronized (this) {
603+
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
604+
reason, this);
605+
sse.initCause(cause);
606+
synchronized (this) {
607+
if (initiatedByApplication)
604608
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
605-
ShutdownSignalException sse = new ShutdownSignalException(true,
606-
initiatedByApplication,
607-
reason, this);
608-
sse.initCause(cause);
609+
if (isOpen())
609610
_shutdownCause = sse;
610-
}
611-
612-
_channel0.processShutdownSignal(_shutdownCause);
613-
} catch (AlreadyClosedException ace) {
614-
if (initiatedByApplication)
615-
throw ace;
616611
}
617-
_channelManager.handleSignal(_shutdownCause);
612+
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
613+
_channelManager.handleSignal(sse);
614+
return sse;
618615
}
619616

620617
public void close()
@@ -690,7 +687,7 @@ public void close(int closeCode,
690687
try {
691688
AMQImpl.Connection.Close reason =
692689
new AMQImpl.Connection.Close(closeCode, closeMessage, 0, 0);
693-
shutdown(reason, initiatedByApplication, cause);
690+
shutdown(reason, initiatedByApplication, cause, true);
694691
AMQChannel.SimpleBlockingRpcContinuation k =
695692
new AMQChannel.SimpleBlockingRpcContinuation();
696693
_channel0.quiescingRpc(reason, k);

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,7 @@ public void handleSignal(ShutdownSignalException signal) {
6868
}
6969
for (AMQChannel channel : channels) {
7070
disconnectChannel(channel.getChannelNumber());
71-
try {
72-
channel.processShutdownSignal(signal);
73-
} catch (ShutdownSignalException sse) {
74-
// Notify channels that are waiting for rpc
75-
channel.notifyOutstandingRpc(signal);
76-
}
71+
channel.processShutdownSignal(signal, true, true);
7772
}
7873
}
7974

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,11 @@ public void broadcastShutdownSignal(ShutdownSignalException signal) {
146146
* Protected API - overridden to broadcast the signal to all
147147
* consumers before calling the superclass's method.
148148
*/
149-
@Override public void processShutdownSignal(ShutdownSignalException signal) {
150-
super.processShutdownSignal(signal);
149+
@Override public void processShutdownSignal(ShutdownSignalException signal,
150+
boolean ignoreClosed,
151+
boolean notifyRpc)
152+
{
153+
super.processShutdownSignal(signal, ignoreClosed, notifyRpc);
151154
broadcastShutdownSignal(signal);
152155
}
153156

@@ -169,6 +172,22 @@ public void releaseChannelNumber() {
169172
// incoming commands except for a close-ok.
170173

171174
Method method = command.getMethod();
175+
176+
if (method instanceof Channel.Close) {
177+
// Channel should always respond to Channel.Close
178+
// from the server
179+
releaseChannelNumber();
180+
ShutdownSignalException signal = new ShutdownSignalException(false,
181+
false,
182+
command,
183+
this);
184+
synchronized(this) {
185+
processShutdownSignal(signal, true, true);
186+
quiescingTransmit(new Channel.CloseOk());
187+
}
188+
notifyListeners();
189+
return true;
190+
}
172191
if (isOpen()) {
173192
// We're in normal running mode.
174193

@@ -216,18 +235,6 @@ public void releaseChannelNumber() {
216235
}
217236
}
218237
return true;
219-
} else if (method instanceof Channel.Close) {
220-
releaseChannelNumber();
221-
ShutdownSignalException signal = new ShutdownSignalException(false,
222-
false,
223-
command,
224-
this);
225-
synchronized(this) {
226-
processShutdownSignal(signal);
227-
quiescingTransmit(new Channel.CloseOk());
228-
}
229-
notifyListeners();
230-
return true;
231238
} else {
232239
return false;
233240
}
@@ -289,7 +296,7 @@ public void close(int closeCode,
289296
// Synchronize the block below to avoid race conditions in case
290297
// connnection wants to send Connection-CloseOK
291298
synchronized(this) {
292-
processShutdownSignal(signal);
299+
processShutdownSignal(signal, !initiatedByApplication, true);
293300
quiescingRpc(reason, k);
294301
}
295302

@@ -532,6 +539,20 @@ public Queue.BindOk queueBind(int ticket, String queue, String exchange,
532539
false, arguments)).getMethod();
533540
}
534541

542+
/**
543+
* Public API - Unbind a queue from an exchange.
544+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
545+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
546+
*/
547+
public Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange,
548+
String routingKey, Map<String, Object> arguments)
549+
throws IOException
550+
{
551+
return (Queue.UnbindOk)
552+
exnWrappingRpc(new Queue.Unbind(ticket, queue, exchange, routingKey,
553+
arguments)).getMethod();
554+
}
555+
535556
/**
536557
* Public API - Bind a queue to an exchange, with no extra arguments.
537558
* @see com.rabbitmq.client.AMQP.Queue.Bind
@@ -543,6 +564,17 @@ public Queue.BindOk queueBind(int ticket, String queue, String exchange, String
543564
return queueBind(ticket, queue, exchange, routingKey, null);
544565
}
545566

567+
/**
568+
* Public API - Unbind a queue from an exchange, with no extra arguments.
569+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
570+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
571+
*/
572+
public Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey)
573+
throws IOException
574+
{
575+
return queueUnbind(ticket, queue, exchange, routingKey, null);
576+
}
577+
546578
/**
547579
* Public API - Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
548580
* @see com.rabbitmq.client.AMQP.Basic.Get

0 commit comments

Comments
 (0)