Skip to content

Commit a9138fd

Browse files
author
Steve Powell
committed
Rename onlyAcksReceived to noNacksReceived;
Protect vars with unconfirmedSet monitor; Correct boolean on handleAckNack call for nack; Restructure Confirm tests to avoid repeatedly creating queues.
1 parent 4ba5975 commit a9138fd

File tree

6 files changed

+101
-55
lines changed

6 files changed

+101
-55
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,20 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8282
/** The ConfirmListener collection. */
8383
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
8484

85-
/** Sequence number of next published message requiring confirmation. */
86-
private long nextPublishSeqNo = 0L;
87-
8885
/** The current default consumer, or null if there is none. */
8986
private volatile Consumer defaultConsumer = null;
9087

9188
/** Set of currently unconfirmed messages (i.e. messages that have
92-
* not been ack'd or nack'd by the server yet. */
89+
* not been ack'd or nack'd by the server yet.
90+
* Used as monitor and protects nextPublishSeqNo and onlyAcksReceived. */
9391
private volatile SortedSet<Long> unconfirmedSet =
9492
Collections.synchronizedSortedSet(new TreeSet<Long>());
95-
93+
/** Sequence number of next published message requiring confirmation.
94+
* 0 means no confirmations. */
95+
private volatile long nextPublishSeqNo = 0L;
9696
/** Whether any nacks have been received since the last
9797
* waitForConfirms(). */
98-
private volatile boolean onlyAcksReceived = true;
98+
private volatile boolean noNacksReceived = true;
9999

100100
/**
101101
* Construct a new channel on the given connection with the given
@@ -167,8 +167,8 @@ public boolean waitForConfirms()
167167
throw Utility.fixStackTrace(getCloseReason());
168168
}
169169
if (unconfirmedSet.isEmpty()) {
170-
boolean aux = onlyAcksReceived;
171-
onlyAcksReceived = true;
170+
boolean aux = noNacksReceived;
171+
noNacksReceived = true;
172172
return aux;
173173
}
174174
unconfirmedSet.wait();
@@ -321,7 +321,7 @@ public void releaseChannelNumber() {
321321
} else if (method instanceof Basic.Nack) {
322322
Basic.Nack nack = (Basic.Nack) method;
323323
callConfirmListeners(command, nack);
324-
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), false);
324+
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
325325
return true;
326326
} else if (method instanceof Basic.RecoverOk) {
327327
for (Consumer callback: _consumers.values()) {
@@ -551,9 +551,11 @@ public void basicPublish(String exchange, String routingKey,
551551
BasicProperties props, byte[] body)
552552
throws IOException
553553
{
554-
if (nextPublishSeqNo > 0) {
555-
unconfirmedSet.add(getNextPublishSeqNo());
556-
nextPublishSeqNo++;
554+
synchronized(unconfirmedSet) {
555+
if (nextPublishSeqNo > 0) {
556+
unconfirmedSet.add(nextPublishSeqNo);
557+
nextPublishSeqNo++;
558+
}
557559
}
558560
BasicProperties useProps = props;
559561
if (props == null) {
@@ -994,7 +996,9 @@ public Tx.RollbackOk txRollback()
994996
public Confirm.SelectOk confirmSelect()
995997
throws IOException
996998
{
997-
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
999+
synchronized(unconfirmedSet) {
1000+
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
1001+
}
9981002
return (Confirm.SelectOk)
9991003
exnWrappingRpc(new Confirm.Select(false)).getMethod();
10001004

@@ -1012,7 +1016,9 @@ public Channel.FlowOk getFlow() {
10121016

10131017
/** Public API - {@inheritDoc} */
10141018
public long getNextPublishSeqNo() {
1015-
return nextPublishSeqNo;
1019+
synchronized(unconfirmedSet) {
1020+
return nextPublishSeqNo;
1021+
}
10161022
}
10171023

10181024
public void asyncRpc(Method method) throws IOException {
@@ -1024,13 +1030,13 @@ public AMQCommand rpc(Method method) throws IOException {
10241030
}
10251031

10261032
protected void handleAckNack(long seqNo, boolean multiple, boolean nack) {
1027-
if (multiple) {
1028-
unconfirmedSet.headSet(seqNo + 1).clear();
1029-
} else {
1030-
unconfirmedSet.remove(seqNo);
1031-
}
10321033
synchronized (unconfirmedSet) {
1033-
onlyAcksReceived = onlyAcksReceived && !nack;
1034+
if (multiple) {
1035+
unconfirmedSet.headSet(seqNo + 1).clear();
1036+
} else {
1037+
unconfirmedSet.remove(seqNo);
1038+
}
1039+
noNacksReceived = noNacksReceived && !nack;
10341040
if (unconfirmedSet.isEmpty())
10351041
unconfirmedSet.notifyAll();
10361042
}

test/src/com/rabbitmq/client/test/Bug20004Test.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
* tests.
2626
*/
2727
public class Bug20004Test extends BrokerTestCase {
28-
public Exception caughtException = null;
29-
public boolean completed = false;
30-
public boolean created = false;
28+
private volatile Exception caughtException = null;
29+
private volatile boolean completed = false;
30+
private volatile boolean created = false;
3131

3232
protected void releaseResources()
3333
throws IOException
@@ -37,6 +37,7 @@ protected void releaseResources()
3737
}
3838
}
3939

40+
@SuppressWarnings("deprecation")
4041
public void testBug20004()
4142
throws IOException
4243
{

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void testQueuePurge() throws IOException {
5656
* longer purged, even if the channel they were sent down is not
5757
* (Tx-)transacted."
5858
*/
59+
@SuppressWarnings("deprecation")
5960
public void testUnackedPurge() throws IOException {
6061
Binding binding = setupExchangeBindings(false);
6162
channel.basicPublish(binding.x, binding.k, null, payload);

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,64 +38,81 @@ public class Confirm extends ConfirmBase
3838

3939
private static final String TTL_ARG = "x-message-ttl";
4040

41+
private DefaultConsumer defaultConsumer = null;
42+
4143
@Override
42-
protected void setUp() throws IOException {
43-
super.setUp();
44+
protected void createResources() throws IOException {
4445
channel.confirmSelect();
45-
channel.queueDeclare("confirm-test", true, true, false, null);
46-
channel.basicConsume("confirm-test", true,
47-
new DefaultConsumer(channel));
48-
channel.queueDeclare("confirm-test-nondurable", false, true,
49-
false, null);
50-
channel.basicConsume("confirm-test-nondurable", true,
51-
new DefaultConsumer(channel));
52-
channel.queueDeclare("confirm-test-noconsumer", true,
53-
true, false, null);
54-
channel.queueDeclare("confirm-test-2", true, true, false, null);
55-
channel.basicConsume("confirm-test-2", true,
56-
new DefaultConsumer(channel));
57-
Map<String, Object> argMap =
58-
Collections.singletonMap(TTL_ARG, (Object)1);
59-
channel.queueDeclare("confirm-ttl", true, true, false, argMap);
60-
channel.queueBind("confirm-test", "amq.direct",
61-
"confirm-multiple-queues");
62-
channel.queueBind("confirm-test-2", "amq.direct",
63-
"confirm-multiple-queues");
46+
defaultConsumer = new DefaultConsumer(channel);
47+
}
48+
49+
@Override
50+
protected void releaseResources() throws IOException {
51+
defaultConsumer = null;
52+
}
53+
54+
private void declareQueue(String queueName, boolean durable)
55+
throws IOException {
56+
declareQueue(queueName, durable, null);
57+
}
58+
59+
private void declareQueue(String queueName, boolean durable,
60+
Map<String, Object> args)
61+
throws IOException {
62+
channel.queueDeclare(queueName, durable, true, false, args);
63+
}
64+
65+
private void declareConsumeQueue(String queueName, boolean durable)
66+
throws IOException {
67+
declareQueue(queueName, durable);
68+
channel.basicConsume(queueName, true, defaultConsumer);
69+
}
70+
71+
private void declareBindQueue(String queueName, boolean durable)
72+
throws IOException {
73+
declareConsumeQueue(queueName, durable);
74+
channel.queueBind(queueName, "amq.direct", "confirm-multiple-queues");
6475
}
6576

6677
public void testTransient()
6778
throws Exception
6879
{
80+
declareConsumeQueue("confirm-test", true);
6981
confirmTest("", "confirm-test", false, false, false);
7082
}
7183

7284
public void testPersistentSimple()
7385
throws Exception
7486
{
87+
declareConsumeQueue("confirm-test", true);
7588
confirmTest("", "confirm-test", true, false, false);
7689
}
7790

7891
public void testNonDurable()
7992
throws Exception
8093
{
94+
declareConsumeQueue("confirm-test-nondurable", false);
8195
confirmTest("", "confirm-test-nondurable", true, false, false);
8296
}
8397

8498
public void testPersistentImmediate()
8599
throws Exception
86100
{
101+
declareConsumeQueue("confirm-test", true);
87102
confirmTest("", "confirm-test", true, false, true);
88103
}
89104

90105
public void testPersistentImmediateNoConsumer()
91106
throws Exception
92107
{
108+
declareQueue("confirm-test-noconsumer", true);
93109
confirmTest("", "confirm-test-noconsumer", true, false, true);
94110
}
95111

96112
public void testPersistentMandatory()
97113
throws Exception
98114
{
115+
declareConsumeQueue("confirm-test", true);
99116
confirmTest("", "confirm-test", true, true, false);
100117
}
101118

@@ -108,6 +125,8 @@ public void testPersistentMandatoryReturn()
108125
public void testMultipleQueues()
109126
throws Exception
110127
{
128+
declareBindQueue("confirm-test", true);
129+
declareBindQueue("confirm-test-2", true);
111130
confirmTest("amq.direct", "confirm-multiple-queues",
112131
true, false, false);
113132
}
@@ -120,6 +139,8 @@ public void testMultipleQueues()
120139
public void testQueueDelete()
121140
throws Exception
122141
{
142+
declareQueue("confirm-test-noconsumer", true);
143+
123144
publishN("","confirm-test-noconsumer", true, false, false);
124145

125146
channel.queueDelete("confirm-test-noconsumer");
@@ -130,6 +151,8 @@ public void testQueueDelete()
130151
public void testQueuePurge()
131152
throws Exception
132153
{
154+
declareQueue("confirm-test-noconsumer", true);
155+
133156
publishN("", "confirm-test-noconsumer", true, false, false);
134157

135158
channel.queuePurge("confirm-test-noconsumer");
@@ -140,14 +163,19 @@ public void testQueuePurge()
140163
public void testBasicReject()
141164
throws Exception
142165
{
143-
basicRejectCommon(false);
166+
declareQueue("confirm-test-noconsumer", true);
167+
168+
basicRejectCommon("confirm-test-noconsumer", false);
144169

145170
waitForConfirms();
146171
}
147172

148173
public void testQueueTTL()
149174
throws Exception
150175
{
176+
declareQueue("confirm-ttl", true,
177+
Collections.singletonMap(TTL_ARG, (Object)Long.valueOf(1L)));
178+
151179
publishN("", "confirm-ttl", true, false, false);
152180

153181
waitForConfirms();
@@ -156,20 +184,24 @@ public void testQueueTTL()
156184
public void testBasicRejectRequeue()
157185
throws Exception
158186
{
159-
basicRejectCommon(true);
187+
declareQueue("confirm-test-noconsumer", true);
188+
189+
basicRejectCommon("confirm-test-noconsumer", true);
160190

161191
/* wait confirms to go through the broker */
162-
Thread.sleep(1000);
192+
//Thread.sleep(1000);
163193

164194
channel.basicConsume("confirm-test-noconsumer", true,
165-
new DefaultConsumer(channel));
195+
defaultConsumer);
166196

167197
waitForConfirms();
168198
}
169199

170200
public void testBasicRecover()
171201
throws Exception
172202
{
203+
declareQueue("confirm-test-noconsumer", true);
204+
173205
publishN("", "confirm-test-noconsumer", true, false, false);
174206

175207
for (long i = 0; i < NUM_MESSAGES; i++) {
@@ -181,10 +213,10 @@ public void testBasicRecover()
181213

182214
channel.basicRecover(true);
183215

184-
Thread.sleep(1000);
216+
//Thread.sleep(1000);
185217

186218
channel.basicConsume("confirm-test-noconsumer", true,
187-
new DefaultConsumer(channel));
219+
defaultConsumer);
188220

189221
waitForConfirms();
190222
}
@@ -214,6 +246,8 @@ public void testSelect()
214246
public void testWaitForConfirms()
215247
throws Exception
216248
{
249+
declareConsumeQueue("confirm-test", true);
250+
217251
final SortedSet<Long> unconfirmedSet =
218252
Collections.synchronizedSortedSet(new TreeSet<Long>());
219253
channel.addConfirmListener(new ConfirmListener() {
@@ -247,6 +281,8 @@ public void handleNack(long seqNo, boolean multiple) {
247281
public void testWaitForConfirmsNoOp()
248282
throws Exception
249283
{
284+
declareConsumeQueue("confirm-test", true);
285+
250286
channel = connection.createChannel();
251287
// Don't enable Confirm mode
252288
publish("", "confirm-test", true, false, false);
@@ -256,6 +292,8 @@ public void testWaitForConfirmsNoOp()
256292
public void testWaitForConfirmsException()
257293
throws Exception
258294
{
295+
declareConsumeQueue("confirm-test", true);
296+
259297
publishN("", "confirm-test", true, false, false);
260298
channel.close();
261299
try {
@@ -291,14 +329,14 @@ private void publishN(String exchangeName, String queueName,
291329
}
292330
}
293331

294-
private void basicRejectCommon(boolean requeue)
332+
private void basicRejectCommon(String queueName, boolean requeue)
295333
throws Exception
296334
{
297-
publishN("", "confirm-test-noconsumer", true, false, false);
335+
publishN("", queueName, true, false, false);
298336

299337
for (long i = 0; i < NUM_MESSAGES; i++) {
300338
GetResponse resp =
301-
channel.basicGet("confirm-test-noconsumer", false);
339+
channel.basicGet(queueName, false);
302340
long dtag = resp.getEnvelope().getDeliveryTag();
303341
channel.basicReject(dtag, requeue);
304342
}

test/src/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
7777
}
7878

7979
RecoverCallback recoverAsync = new RecoverCallback() {
80+
@SuppressWarnings("deprecation")
8081
public void recover(Channel channel) throws IOException {
8182
channel.basicRecoverAsync(true);
8283
}

test/src/com/rabbitmq/examples/FileProducer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public static void main(String[] args) {
8282
Map<String, Object> headers = new HashMap<String, Object>();
8383
headers.put("filename", filename);
8484
headers.put("length", (int) f.length());
85-
BasicProperties props = new BasicProperties();
86-
props.setHeaders(headers);
85+
BasicProperties props = new BasicProperties.Builder().headers(headers).build();
8786
ch.basicPublish(exchange, routingKey, props, body);
8887
System.out.println(" done.");
8988
}

0 commit comments

Comments
 (0)