Skip to content

Commit 57cf998

Browse files
committed
Polish
References #380
1 parent 3952ded commit 57cf998

File tree

3 files changed

+65
-31
lines changed

3 files changed

+65
-31
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,12 @@ public void run() {
299299
}
300300

301301
protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
302-
Runnable shutdown = () -> state.getConnection().doFinalShutdown();
302+
Runnable shutdown = new Runnable() {
303+
@Override
304+
public void run() {
305+
state.getConnection().doFinalShutdown();
306+
}
307+
};
303308
if (this.connectionShutdownExecutor != null) {
304309
connectionShutdownExecutor.execute(shutdown);
305310
} else if (executorService() != null) {

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,38 +26,66 @@
2626

2727
/**
2828
* Parameters used to configure the NIO mode of a {@link com.rabbitmq.client.ConnectionFactory}.
29+
*
2930
* @since 4.0.0
3031
*/
3132
public class NioParams {
3233

33-
/** size of the byte buffer used for inbound data */
34+
/**
35+
* size of the byte buffer used for inbound data
36+
*/
3437
private int readByteBufferSize = 32768;
3538

36-
/** size of the byte buffer used for outbound data */
39+
/**
40+
* size of the byte buffer used for outbound data
41+
*/
3742
private int writeByteBufferSize = 32768;
3843

39-
/** the max number of IO threads */
44+
/**
45+
* the max number of IO threads
46+
*/
4047
private int nbIoThreads = 1;
4148

42-
/** the timeout to enqueue outbound frames */
49+
/**
50+
* the timeout to enqueue outbound frames
51+
*/
4352
private int writeEnqueuingTimeoutInMs = 10 * 1000;
4453

45-
/** the capacity of the queue used for outbound frames */
54+
/**
55+
* the capacity of the queue used for outbound frames
56+
*/
4657
private int writeQueueCapacity = 10000;
4758

48-
/** the executor service used for IO threads and connections shutdown */
59+
/**
60+
* the executor service used for IO threads and connections shutdown
61+
*/
4962
private ExecutorService nioExecutor;
5063

51-
/** the thread factory used for IO threads and connections shutdown */
64+
/**
65+
* the thread factory used for IO threads and connections shutdown
66+
*/
5267
private ThreadFactory threadFactory;
5368

54-
/** the hook to configure the socket channel before it's open */
69+
/**
70+
* the hook to configure the socket channel before it's open
71+
*/
5572
private SocketChannelConfigurator socketChannelConfigurator = new DefaultSocketChannelConfigurator();
5673

57-
/** the hook to configure the SSL engine before the connection is open */
58-
private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { };
74+
/**
75+
* the hook to configure the SSL engine before the connection is open
76+
*/
77+
private SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator() {
78+
79+
@Override
80+
public void configure(SSLEngine sslEngine) throws IOException {
81+
}
82+
};
5983

60-
/** the executor service used for connection shutdown */
84+
/**
85+
* the executor service used for connection shutdown
86+
*
87+
* @since 4.8.0
88+
*/
6189
private ExecutorService connectionShutdownExecutor;
6290

6391
public NioParams() {
@@ -82,7 +110,7 @@ public int getReadByteBufferSize() {
82110
/**
83111
* Sets the size in byte of the read {@link java.nio.ByteBuffer} used in the NIO loop.
84112
* Default is 32768.
85-
*
113+
* <p>
86114
* This parameter isn't used when using SSL/TLS, where {@link java.nio.ByteBuffer}
87115
* size is set up according to the {@link javax.net.ssl.SSLSession} packet size.
88116
*
@@ -104,7 +132,7 @@ public int getWriteByteBufferSize() {
104132
/**
105133
* Sets the size in byte of the write {@link java.nio.ByteBuffer} used in the NIO loop.
106134
* Default is 32768.
107-
*
135+
* <p>
108136
* This parameter isn't used when using SSL/TLS, where {@link java.nio.ByteBuffer}
109137
* size is set up according to the {@link javax.net.ssl.SSLSession} packet size.
110138
*
@@ -131,7 +159,7 @@ public int getNbIoThreads() {
131159
* 10 connections have been created).
132160
* Once a connection is created, it's assigned to a thread/task and
133161
* all its IO activity is handled by this thread/task.
134-
*
162+
* <p>
135163
* When idle for a few seconds (i.e. without any connection to perform IO for),
136164
* a thread/task stops and is recreated if necessary.
137165
*
@@ -155,14 +183,14 @@ public int getWriteEnqueuingTimeoutInMs() {
155183
* Every requests to the server is divided into frames
156184
* that are then queued in a {@link java.util.concurrent.BlockingQueue} before
157185
* being sent on the network by a IO thread.
158-
*
186+
* <p>
159187
* If the IO thread cannot cope with the frames dispatch, the
160188
* {@link java.util.concurrent.BlockingQueue} gets filled up and blocks
161189
* (blocking the calling thread by the same occasion). This timeout is the
162190
* time the {@link java.util.concurrent.BlockingQueue} will wait before
163191
* rejecting the outbound frame. The calling thread will then received
164192
* an exception.
165-
*
193+
* <p>
166194
* The appropriate value depends on the application scenarios:
167195
* rate of outbound data (published messages, acknowledgment, etc), network speed...
168196
*
@@ -182,17 +210,17 @@ public ExecutorService getNioExecutor() {
182210
/**
183211
* Sets the {@link ExecutorService} to use for NIO threads/tasks.
184212
* Default is to use the thread factory.
185-
*
213+
* <p>
186214
* The {@link ExecutorService} should be able to run the
187215
* number of requested IO threads, plus a few more, as it's also
188216
* used to dispatch the shutdown of connections.
189-
*
217+
* <p>
190218
* Connection shutdown can also be handled by a dedicated {@link ExecutorService},
191219
* see {@link #setConnectionShutdownExecutor(ExecutorService)}.
192-
*
220+
* <p>
193221
* It's developer's responsibility to shut down the executor
194222
* when it is no longer needed.
195-
*
223+
* <p>
196224
* The thread factory isn't used if an executor service is set up.
197225
*
198226
* @param nioExecutor {@link ExecutorService} used for IO threads and connection shutdown
@@ -214,7 +242,7 @@ public ThreadFactory getThreadFactory() {
214242
* Sets the {@link ThreadFactory} to use for NIO threads/tasks.
215243
* Default is to use the {@link com.rabbitmq.client.ConnectionFactory}'s
216244
* {@link ThreadFactory}.
217-
*
245+
* <p>
218246
* The {@link ThreadFactory} is used to spawn the IO threads
219247
* and dispatch the shutdown of connections.
220248
*
@@ -248,6 +276,10 @@ public NioParams setWriteQueueCapacity(int writeQueueCapacity) {
248276
return this;
249277
}
250278

279+
public SocketChannelConfigurator getSocketChannelConfigurator() {
280+
return socketChannelConfigurator;
281+
}
282+
251283
/**
252284
* Set the {@link java.nio.channels.SocketChannel} configurator.
253285
* This gets a chance to "configure" a socket channel
@@ -260,8 +292,8 @@ public void setSocketChannelConfigurator(SocketChannelConfigurator configurator)
260292
this.socketChannelConfigurator = configurator;
261293
}
262294

263-
public SocketChannelConfigurator getSocketChannelConfigurator() {
264-
return socketChannelConfigurator;
295+
public SslEngineConfigurator getSslEngineConfigurator() {
296+
return sslEngineConfigurator;
265297
}
266298

267299
/**
@@ -277,8 +309,8 @@ public void setSslEngineConfigurator(SslEngineConfigurator configurator) {
277309
this.sslEngineConfigurator = configurator;
278310
}
279311

280-
public SslEngineConfigurator getSslEngineConfigurator() {
281-
return sslEngineConfigurator;
312+
public ExecutorService getConnectionShutdownExecutor() {
313+
return connectionShutdownExecutor;
282314
}
283315

284316
/**
@@ -303,13 +335,10 @@ public SslEngineConfigurator getSslEngineConfigurator() {
303335
* @param connectionShutdownExecutor the executor service to use
304336
* @return this {@link NioParams} instance
305337
* @see NioParams#setNioExecutor(ExecutorService)
338+
* @since 4.8.0
306339
*/
307340
public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) {
308341
this.connectionShutdownExecutor = connectionShutdownExecutor;
309342
return this;
310343
}
311-
312-
public ExecutorService getConnectionShutdownExecutor() {
313-
return connectionShutdownExecutor;
314-
}
315344
}

src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void setUp() {
5757
.setConnectionShutdownExecutor(connectionShutdownExecutorService)
5858
.setNbIoThreads(2);
5959
cf.setNioParams(params);
60-
connections = new ArrayList<>();
60+
connections = new ArrayList<Connection>();
6161
}
6262

6363
@After

0 commit comments

Comments
 (0)