Skip to content

Commit 4492aba

Browse files
author
Steve Powell
committed
Merge default in.
2 parents 156e708 + 3cf29ae commit 4492aba

File tree

6 files changed

+55
-100
lines changed

6 files changed

+55
-100
lines changed

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ private void scheduleShutdownProcessing() {
9494
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
9595
final ConsumerWorkService ssWorkService = workService;
9696
Thread shutdownThread = new Thread( new Runnable() {
97-
@Override
9897
public void run() {
9998
for (CountDownLatch latch : sdSet) {
10099
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { }

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.rabbitmq.client.impl;
22

33
import java.util.Collection;
4-
import java.util.Deque;
54
import java.util.HashMap;
65
import java.util.HashSet;
76
import java.util.LinkedList;
@@ -74,7 +73,7 @@ public class WorkPool<K, W> {
7473
/** The set of clients which have work <i>in progress</i>. */
7574
private final Set<K> inProgress = new HashSet<K>();
7675
/** The pool of registered clients, with their work queues. */
77-
private final Map<K, Deque<W>> pool = new HashMap<K, Deque<W>>();
76+
private final Map<K, LinkedList<W>> pool = new HashMap<K, LinkedList<W>>();
7877

7978
/**
8079
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -128,25 +127,25 @@ public K nextWorkBlock(Collection<W> to, int size) {
128127
synchronized (this.monitor) {
129128
K nextKey = readyToInProgress();
130129
if (nextKey != null) {
131-
Deque<W> queue = this.pool.get(nextKey);
130+
LinkedList<W> queue = this.pool.get(nextKey);
132131
drainTo(queue, to, size);
133132
}
134133
return nextKey;
135134
}
136135
}
137136

138137
/**
139-
* Private implementation of <code><b>drainTo</b></code> (not implemented for <code><b>Deque&lt;W&gt;</b></code>s).
138+
* Private implementation of <code><b>drainTo</b></code> (not implemented for <code><b>LinkedList&lt;W&gt;</b></code>s).
140139
* @param <W> element type
141-
* @param deque to take (poll) elements from
140+
* @param deList to take (poll) elements from
142141
* @param c to add elements to
143-
* @param maxElements to take from deque
142+
* @param maxElements to take from deList
144143
* @return number of elements actually taken
145144
*/
146-
private static <W> int drainTo(Deque<W> deque, Collection<W> c, int maxElements) {
145+
private static <W> int drainTo(LinkedList<W> deList, Collection<W> c, int maxElements) {
147146
int n = 0;
148147
while (n < maxElements) {
149-
W first = deque.poll();
148+
W first = deList.poll();
150149
if (first == null)
151150
break;
152151
c.add(first);
@@ -206,8 +205,8 @@ public boolean finishWorkBlock(K key) {
206205
}
207206

208207
private boolean moreWorkItems(K key) {
209-
Deque<W> deque = this.pool.get(key);
210-
return (deque==null ? false : !deque.isEmpty());
208+
LinkedList<W> leList = this.pool.get(key);
209+
return (leList==null ? false : !leList.isEmpty());
211210
}
212211

213212
/* State identification functions */

test/src/com/rabbitmq/client/test/Bug20004Test.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
* an independent thread.
2929
*/
3030
public class Bug20004Test extends BrokerTestCase {
31-
public volatile Exception caughtException = null;
32-
public volatile boolean completed = false;
33-
public volatile boolean created = false;
31+
private volatile Exception caughtException = null;
32+
private volatile boolean completed = false;
33+
private volatile boolean created = false;
3434

3535
protected void releaseResources()
3636
throws IOException

test/src/com/rabbitmq/client/test/ConfirmBase.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,60 +11,51 @@
1111
// The Original Code is RabbitMQ.
1212
//
1313
// The Initial Developer of the Original Code is VMware, Inc.
14-
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
14+
// Copyright (c) 2011 VMware, Inc. All rights reserved.
1515
//
1616

1717
package com.rabbitmq.client.test;
1818

19-
import com.rabbitmq.client.ConfirmListener;
20-
import com.rabbitmq.client.MessageProperties;
19+
import com.rabbitmq.client.ShutdownSignalException;
2120

2221
import java.io.IOException;
23-
import java.util.Collections;
24-
import java.util.SortedSet;
25-
import java.util.TreeSet;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.FutureTask;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.concurrent.TimeUnit;
2627

27-
public class ConfirmBase extends BrokerTestCase {
28-
29-
protected SortedSet<Long> unconfirmedSet;
30-
31-
@Override
32-
protected void setUp() throws IOException {
33-
super.setUp();
34-
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35-
channel.addConfirmListener(new ConfirmListener() {
36-
public void handleAck(long seqNo, boolean multiple) {
37-
if (!unconfirmedSet.contains(seqNo)) {
38-
fail("got duplicate ack: " + seqNo);
39-
}
40-
if (multiple) {
41-
unconfirmedSet.headSet(seqNo + 1).clear();
42-
} else {
43-
unconfirmedSet.remove(seqNo);
44-
}
45-
}
28+
import junit.framework.AssertionFailedError;
4629

47-
public void handleNack(long seqNo, boolean multiple) {
48-
fail("got a nack");
49-
}
50-
});
51-
channel.confirmSelect();
52-
}
53-
54-
protected void waitAcks() throws InterruptedException {
55-
while (unconfirmedSet.size() > 0)
56-
Thread.sleep(10);
30+
public class ConfirmBase extends BrokerTestCase {
31+
protected void waitForConfirms() throws Exception
32+
{
33+
waitForConfirms("ConfirmBase.waitForConfirms");
5734
}
5835

59-
protected void publish(String exchangeName, String queueName,
60-
boolean persistent, boolean mandatory,
61-
boolean immediate)
62-
throws IOException {
63-
unconfirmedSet.add(channel.getNextPublishSeqNo());
64-
channel.basicPublish(exchangeName, queueName, mandatory, immediate,
65-
persistent ? MessageProperties.PERSISTENT_BASIC
66-
: MessageProperties.BASIC,
67-
"nop".getBytes());
36+
protected void waitForConfirms(final String testTitle) throws Exception
37+
{
38+
try {
39+
FutureTask<?> waiter = new FutureTask<Object>(new Runnable() {
40+
public void run() {
41+
try {
42+
channel.waitForConfirmsOrDie();
43+
} catch (IOException e) {
44+
throw (ShutdownSignalException)e.getCause();
45+
} catch (InterruptedException _) {
46+
fail(testTitle + ": interrupted");
47+
}
48+
}
49+
}, null);
50+
(Executors.newSingleThreadExecutor()).execute(waiter);
51+
waiter.get(10, TimeUnit.SECONDS);
52+
} catch (ExecutionException ee) {
53+
Throwable t = ee.getCause();
54+
if (t instanceof ShutdownSignalException) throw (ShutdownSignalException) t;
55+
if (t instanceof AssertionFailedError) throw (AssertionFailedError) t;
56+
throw (Exception)t;
57+
} catch (TimeoutException _) {
58+
fail(testTitle + ": timeout");
59+
}
6860
}
69-
7061
}

test/src/com/rabbitmq/client/test/server/MessageRecovery.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616
package com.rabbitmq.client.test.server;
1717

1818
import com.rabbitmq.client.MessageProperties;
19-
import com.rabbitmq.client.test.BrokerTestCase;
19+
import com.rabbitmq.client.test.ConfirmBase;
2020

21-
import java.io.IOException;
22-
23-
public class MessageRecovery extends BrokerTestCase
21+
public class MessageRecovery extends ConfirmBase
2422
{
2523

2624
private final static String Q = "recovery-test";
2725

28-
public void testMessageRecovery() throws IOException, InterruptedException {
26+
public void testMessageRecovery()
27+
throws Exception
28+
{
2929
channel.confirmSelect();
3030
channel.queueDeclare(Q, true, false, false, null);
3131
channel.basicPublish("", Q, false, false,
3232
MessageProperties.PERSISTENT_BASIC,
3333
"nop".getBytes());
34-
channel.waitForConfirmsOrDie();
34+
waitForConfirms();
3535

3636
restart();
3737
assertDelivered(Q, 1);

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@
1818
package com.rabbitmq.examples;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
22-
import java.util.SortedSet;
23-
import java.util.TreeSet;
2421

2522
import com.rabbitmq.client.Channel;
26-
import com.rabbitmq.client.ConfirmListener;
2723
import com.rabbitmq.client.Connection;
2824
import com.rabbitmq.client.ConnectionFactory;
2925
import com.rabbitmq.client.MessageProperties;
@@ -50,9 +46,6 @@ public static void main(String[] args)
5046
}
5147

5248
static class Publisher implements Runnable {
53-
private volatile SortedSet<Long> unconfirmedSet =
54-
Collections.synchronizedSortedSet(new TreeSet<Long>());
55-
5649
public void run() {
5750
try {
5851
long startTime = System.currentTimeMillis();
@@ -61,43 +54,16 @@ public void run() {
6154
Connection conn = connectionFactory.newConnection();
6255
Channel ch = conn.createChannel();
6356
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
64-
ch.addConfirmListener(new ConfirmListener() {
65-
public void handleAck(long seqNo, boolean multiple) {
66-
if (multiple) {
67-
unconfirmedSet.headSet(seqNo+1).clear();
68-
} else {
69-
unconfirmedSet.remove(seqNo);
70-
}
71-
}
72-
73-
public void handleNack(long seqNo, boolean multiple) {
74-
int lost = 0;
75-
if (multiple) {
76-
SortedSet<Long> nackd =
77-
unconfirmedSet.headSet(seqNo+1);
78-
lost = nackd.size();
79-
nackd.clear();
80-
} else {
81-
lost = 1;
82-
unconfirmedSet.remove(seqNo);
83-
}
84-
System.out.printf("Probably lost %d messages.\n",
85-
lost);
86-
}
87-
});
8857
ch.confirmSelect();
8958

9059
// Publish
9160
for (long i = 0; i < msgCount; ++i) {
92-
unconfirmedSet.add(ch.getNextPublishSeqNo());
9361
ch.basicPublish("", QUEUE_NAME,
9462
MessageProperties.PERSISTENT_BASIC,
9563
"nop".getBytes());
9664
}
9765

98-
// Wait
99-
while (unconfirmedSet.size() > 0)
100-
Thread.sleep(10);
66+
ch.waitForConfirmsOrDie();
10167

10268
// Cleanup
10369
ch.queueDelete(QUEUE_NAME);

0 commit comments

Comments
 (0)