Skip to content

Commit eb3a963

Browse files
committed
Polish previous code with Java 8 features
[#154263515] References #341
1 parent cdee068 commit eb3a963

File tree

6 files changed

+67
-70
lines changed

6 files changed

+67
-70
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
220220
this._virtualHost = params.getVirtualHost();
221221
this._exceptionHandler = params.getExceptionHandler();
222222

223-
this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
223+
this._clientProperties = new HashMap<>(params.getClientProperties());
224224
this.requestedFrameMax = params.getRequestedFrameMax();
225225
this.requestedChannelMax = params.getRequestedChannelMax();
226226
this.requestedHeartbeat = params.getRequestedHeartbeat();
@@ -252,10 +252,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
252252
this.metricsCollector = metricsCollector;
253253

254254
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
255-
new ErrorOnWriteListener() {
256-
@Override
257-
public void handle(Connection connection, IOException exception) { }
258-
};
255+
(connection, exception) -> { };
259256
this.workPoolTimeout = params.getWorkPoolTimeout();
260257
}
261258

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,10 +453,10 @@ protected void processDelivery(Command command, Basic.Deliver method) {
453453
// in case a manual ack in the callback, the stats will be able to record the ack
454454
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
455455
this.dispatcher.handleDelivery(callback,
456-
m.getConsumerTag(),
457-
envelope,
458-
(BasicProperties) command.getContentHeader(),
459-
command.getContentBody());
456+
m.getConsumerTag(),
457+
envelope,
458+
(BasicProperties) command.getContentHeader(),
459+
command.getContentBody());
460460
} catch (WorkPoolFullException e) {
461461
// couldn't enqueue in work pool, propagating
462462
throw e;

src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ final public class ConsumerWorkService {
3434
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
3535
this.privateExecutor = (executor == null);
3636
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
37-
: executor;
38-
this.workPool = new WorkPool<Channel, Runnable>(queueingTimeout);
37+
: executor;
38+
this.workPool = new WorkPool<>(queueingTimeout);
3939
this.shutdownTimeout = shutdownTimeout;
4040
}
4141

src/main/java/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import java.util.Iterator;
2222
import java.util.Map;
2323
import java.util.Set;
24-
import java.util.concurrent.BlockingQueue;
25-
import java.util.concurrent.Callable;
2624
import java.util.concurrent.TimeUnit;
25+
import java.util.function.BiConsumer;
2726

2827
/**
2928
* <p>This is a generic implementation of the channels specification
@@ -64,33 +63,26 @@ public class WorkPool<K, W> {
6463
private final Map<K, VariableLinkedBlockingQueue<W>> pool = new HashMap<K, VariableLinkedBlockingQueue<W>>();
6564
/** Those keys which want limits to be removed. We do not limit queue size if this is non-empty. */
6665
private final Set<K> unlimited = new HashSet<K>();
67-
private final EnqueueingCallback<W> enqueueingCallback;
66+
private final BiConsumer<VariableLinkedBlockingQueue<W>, W> enqueueingCallback;
6867

6968
public WorkPool(final int queueingTimeout) {
7069
if (queueingTimeout > 0) {
71-
this.enqueueingCallback = new EnqueueingCallback<W>() {
72-
@Override
73-
public void enqueue(BlockingQueue<W> queue, W item) {
74-
try {
75-
boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
76-
if (!offered) {
77-
throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
78-
}
79-
} catch (InterruptedException e) {
80-
Thread.currentThread();
70+
this.enqueueingCallback = (queue, item) -> {
71+
try {
72+
boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
73+
if (!offered) {
74+
throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
8175
}
76+
} catch (InterruptedException e) {
77+
Thread.currentThread();
8278
}
8379
};
8480
} else {
85-
this.enqueueingCallback = new EnqueueingCallback<W>() {
86-
87-
@Override
88-
public void enqueue(BlockingQueue<W> queue, W item) {
89-
try {
90-
queue.put(item);
91-
} catch (InterruptedException e) {
92-
Thread.currentThread().interrupt();
93-
}
81+
this.enqueueingCallback = (queue, item) -> {
82+
try {
83+
queue.put(item);
84+
} catch (InterruptedException e) {
85+
Thread.currentThread().interrupt();
9486
}
9587
};
9688
}
@@ -212,7 +204,7 @@ public boolean addWorkItem(K key, W item) {
212204
}
213205
// The put operation may block. We need to make sure we are not holding the lock while that happens.
214206
if (queue != null) {
215-
enqueueingCallback.enqueue(queue, item);
207+
enqueueingCallback.accept(queue, item);
216208

217209
synchronized (this) {
218210
if (isDormant(key)) {
@@ -274,9 +266,4 @@ private K readyToInProgress() {
274266
return key;
275267
}
276268

277-
private interface EnqueueingCallback<W> {
278-
279-
void enqueue(BlockingQueue<W> queue, W item);
280-
281-
}
282269
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.rabbitmq.client.*;
1919
import com.rabbitmq.client.impl.AMQConnection;
2020
import com.rabbitmq.client.impl.ConnectionParams;
21-
import com.rabbitmq.client.impl.ErrorOnWriteListener;
2221
import com.rabbitmq.client.impl.FrameHandlerFactory;
2322
import com.rabbitmq.client.impl.NetworkConnection;
2423
import com.rabbitmq.utility.Utility;
@@ -104,28 +103,25 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
104103
private void setupErrorOnWriteListenerForPotentialRecovery() {
105104
final ThreadFactory threadFactory = this.params.getThreadFactory();
106105
final Lock errorOnWriteLock = new ReentrantLock();
107-
this.params.setErrorOnWriteListener(new ErrorOnWriteListener() {
108-
@Override
109-
public void handle(final Connection connection, final IOException exception) throws IOException {
110-
// this is called for any write error
111-
// we should trigger the error handling and the recovery only once
112-
if (errorOnWriteLock.tryLock()) {
113-
try {
114-
Thread recoveryThread = threadFactory.newThread(new Runnable() {
115-
@Override
116-
public void run() {
117-
AMQConnection c = (AMQConnection) connection;
118-
c.handleIoError(exception);
119-
}
120-
});
121-
recoveryThread.setName("RabbitMQ Error On Write Thread");
122-
recoveryThread.start();
123-
} finally {
124-
errorOnWriteLock.unlock();
125-
}
106+
this.params.setErrorOnWriteListener((connection, exception) -> {
107+
// this is called for any write error
108+
// we should trigger the error handling and the recovery only once
109+
if (errorOnWriteLock.tryLock()) {
110+
try {
111+
Thread recoveryThread = threadFactory.newThread(new Runnable() {
112+
@Override
113+
public void run() {
114+
AMQConnection c = (AMQConnection) connection;
115+
c.handleIoError(exception);
116+
}
117+
});
118+
recoveryThread.setName("RabbitMQ Error On Write Thread");
119+
recoveryThread.start();
120+
} finally {
121+
errorOnWriteLock.unlock();
126122
}
127-
throw exception;
128123
}
124+
throw exception;
129125
});
130126
}
131127

src/test/java/com/rabbitmq/client/test/SharedThreadPoolTest.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.Executors;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import com.rabbitmq.client.Connection;
2627
import org.junit.Test;
2728

2829
import com.rabbitmq.client.ConnectionFactory;
@@ -32,34 +33,50 @@ public class SharedThreadPoolTest extends BrokerTestCase {
3233
@Test public void willShutDownExecutor() throws IOException, TimeoutException {
3334
ExecutorService executor1 = null;
3435
ExecutorService executor2 = null;
36+
AMQConnection conn1 = null;
37+
AMQConnection conn2 = null;
38+
AMQConnection conn3 = null;
39+
AMQConnection conn4 = null;
3540
try {
3641
ConnectionFactory cf = TestUtils.connectionFactory();
3742
cf.setAutomaticRecoveryEnabled(false);
3843
executor1 = Executors.newFixedThreadPool(8);
3944
cf.setSharedExecutor(executor1);
4045

41-
AMQConnection conn1 = (AMQConnection)cf.newConnection();
46+
conn1 = (AMQConnection)cf.newConnection();
4247
assertFalse(conn1.willShutDownConsumerExecutor());
4348

4449
executor2 = Executors.newSingleThreadExecutor();
45-
AMQConnection conn2 = (AMQConnection)cf.newConnection(executor2);
50+
conn2 = (AMQConnection)cf.newConnection(executor2);
4651
assertFalse(conn2.willShutDownConsumerExecutor());
4752

48-
AMQConnection conn3 = (AMQConnection)cf.newConnection((ExecutorService)null);
53+
conn3 = (AMQConnection)cf.newConnection((ExecutorService)null);
4954
assertTrue(conn3.willShutDownConsumerExecutor());
5055

5156
cf.setSharedExecutor(null);
5257

53-
AMQConnection conn4 = (AMQConnection)cf.newConnection();
58+
conn4 = (AMQConnection)cf.newConnection();
5459
assertTrue(conn4.willShutDownConsumerExecutor());
5560
} finally {
56-
if (executor1 != null) {
57-
executor1.shutdownNow();
58-
}
59-
if (executor2 != null) {
60-
executor2.shutdownNow();
61-
}
61+
close(conn1);
62+
close(conn2);
63+
close(conn3);
64+
close(conn4);
65+
close(executor1);
66+
close(executor2);
6267
}
6368

6469
}
70+
71+
void close(ExecutorService executor) {
72+
if (executor != null) {
73+
executor.shutdownNow();
74+
}
75+
}
76+
77+
void close(Connection connection) throws IOException {
78+
if (connection != null) {
79+
connection.close();
80+
}
81+
}
6582
}

0 commit comments

Comments
 (0)