Skip to content

Commit d4e4089

Browse files
author
Hubert Plociniczak
committed
Added abort methods to Channel. BrokerTestCase uses
now the new interface.
1 parent 2fffd7a commit d4e4089

File tree

3 files changed

+57
-24
lines changed

3 files changed

+57
-24
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,23 @@ public interface Channel extends ShutdownNotifier{
8585
* @throws java.io.IOException if an error is encountered
8686
*/
8787
void close(int closeCode, String closeMessage) throws IOException;
88+
89+
/**
90+
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
91+
* and message 'OK'.
92+
*
93+
* Forces the channel to close and waits for the close operation to complete.
94+
* Any encountered exceptions in the close operation are silently discarded.
95+
*/
96+
void abort() throws IOException;
97+
98+
/**
99+
* Abort this channel.
100+
*
101+
* Forces the channel to close and waits for the close operation to complete.
102+
* Any encountered exceptions in the close operation are silently discarded.
103+
*/
104+
void abort(int closeCode, String closeMessage) throws IOException;
88105

89106
/**
90107
* Return the current {@link ReturnListener}.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,19 @@ public void close()
273273
public void close(int closeCode, String closeMessage)
274274
throws IOException
275275
{
276-
close(closeCode, closeMessage, true, null);
276+
close(closeCode, closeMessage, true, null, false);
277+
}
278+
279+
public void abort()
280+
throws IOException
281+
{
282+
abort(AMQP.REPLY_SUCCESS, "OK");
283+
}
284+
285+
public void abort(int closeCode, String closeMessage)
286+
throws IOException
287+
{
288+
close(closeCode, closeMessage, true, null, true);
277289
}
278290

279291
/**
@@ -284,7 +296,8 @@ public void close(int closeCode, String closeMessage)
284296
public void close(int closeCode,
285297
String closeMessage,
286298
boolean initiatedByApplication,
287-
Throwable cause)
299+
Throwable cause,
300+
boolean abort)
288301
throws IOException
289302
{
290303
// First, notify all our dependents that we are shutting down.
@@ -301,29 +314,38 @@ public void close(int closeCode,
301314
}
302315

303316
BlockingRpcContinuation<AMQCommand> k = new SimpleBlockingRpcContinuation();
304-
// Synchronize the block below to avoid race conditions in case
305-
// connnection wants to send Connection-CloseOK
306-
synchronized(this) {
307-
processShutdownSignal(signal, !initiatedByApplication, true);
308-
quiescingRpc(reason, k);
309-
}
310-
317+
boolean notify = false;
311318
try {
319+
// Synchronize the block below to avoid race conditions in case
320+
// connnection wants to send Connection-CloseOK
321+
synchronized(this) {
322+
processShutdownSignal(signal, !initiatedByApplication, true);
323+
quiescingRpc(reason, k);
324+
}
325+
312326
// Now that we're in quiescing state, channel.close was sent and
313327
// we wait for the reply. We ignore the result. (It's always
314328
// close-ok.)
329+
notify = true;
315330
k.getReply(-1);
316331
} catch (TimeoutException ise) {
317332
// Will never happen since we wait infinitely
333+
} catch (ShutdownSignalException sse) {
334+
if (!abort)
335+
throw sse;
336+
} catch (IOException ioe) {
337+
if (!abort)
338+
throw ioe;
318339
} finally {
319-
320-
// Now we know everything's been cleaned up and there should
321-
// be no more surprises arriving on the wire. Release the
322-
// channel number, and dissociate this ChannelN instance from
323-
// our connection so that any further frames inbound on this
324-
// channel can be caught as the errors they are.
325-
releaseChannelNumber();
326-
notifyListeners();
340+
if (abort || notify) {
341+
// Now we know everything's been cleaned up and there should
342+
// be no more surprises arriving on the wire. Release the
343+
// channel number, and dissociate this ChannelN instance from
344+
// our connection so that any further frames inbound on this
345+
// channel can be caught as the errors they are.
346+
releaseChannelNumber();
347+
notifyListeners();
348+
}
327349
}
328350
}
329351

test/src/com/rabbitmq/client/test/functional/BrokerTestCase.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import junit.framework.TestCase;
3131

32-
import com.rabbitmq.client.AlreadyClosedException;
3332
import com.rabbitmq.client.Channel;
3433
import com.rabbitmq.client.Connection;
3534
import com.rabbitmq.client.ConnectionFactory;
@@ -112,12 +111,7 @@ public void closeChannel()
112111
throws IOException
113112
{
114113
if (channel != null) {
115-
try {
116-
channel.close();
117-
} catch (AlreadyClosedException ace) {
118-
// The API is broken so we have to catch this here.
119-
// See bug 19676.
120-
}
114+
channel.abort();
121115
channel = null;
122116
}
123117
}

0 commit comments

Comments
 (0)