11package com .rabbitmq .client .test ;
22
33import com .rabbitmq .client .*;
4- import com .rabbitmq .client .impl .nio .BlockingQueueNioQueue ;
5- import com .rabbitmq .client .impl .nio .DefaultByteBufferFactory ;
6- import com .rabbitmq .client .impl .nio .NioContext ;
7- import com .rabbitmq .client .impl .nio .NioParams ;
8- import com .rabbitmq .client .impl .nio .NioQueue ;
9- import com .rabbitmq .client .impl .nio .NioQueueFactory ;
10- import com .rabbitmq .client .impl .nio .WriteRequest ;
4+ import com .rabbitmq .client .impl .nio .*;
5+ import org .assertj .core .api .Condition ;
116import org .junit .After ;
127import org .junit .Before ;
138import org .junit .Test ;
1813import java .util .concurrent .*;
1914import java .util .concurrent .atomic .AtomicInteger ;
2015
21- import static org .hamcrest .Matchers .hasSize ;
22- import static org .hamcrest .Matchers .isOneOf ;
16+ import static org .assertj .core .api .Assertions .assertThat ;
2317import static org .junit .Assert .assertEquals ;
24- import static org .junit .Assert .assertThat ;
2518import static org .junit .Assert .assertTrue ;
2619
2720/**
@@ -129,19 +122,21 @@ public void shutdownCompleted(ShutdownSignalException cause) {
129122 public void nioLoopCleaning () throws Exception {
130123 ConnectionFactory connectionFactory = new ConnectionFactory ();
131124 connectionFactory .useNio ();
132- for (int i = 0 ; i < 10 ; i ++) {
125+ for (int i = 0 ; i < 10 ; i ++) {
133126 Connection connection = connectionFactory .newConnection ();
134127 connection .abort ();
135128 }
136129 }
137130
138- @ Test public void messageSize () throws Exception {
131+ @ Test
132+ public void messageSize () throws Exception {
139133 for (int i = 0 ; i < 50 ; i ++) {
140134 sendAndVerifyMessage (testConnection , 76390 );
141135 }
142136 }
143137
144- @ Test public void byteBufferFactory () throws Exception {
138+ @ Test
139+ public void byteBufferFactory () throws Exception {
145140 ConnectionFactory cf = new ConnectionFactory ();
146141 cf .useNio ();
147142 int baseCapacity = 32768 ;
@@ -159,19 +154,19 @@ public ByteBuffer allocate(int capacity) {
159154 }
160155 })));
161156
162- Connection c = cf .newConnection ();
163- try {
157+ try (Connection c = cf .newConnection ()) {
164158 sendAndVerifyMessage (c , 100 );
165- } finally {
166- TestUtils .close (c );
167159 }
168160
169- assertThat (byteBuffers , hasSize (2 ));
170- assertThat (byteBuffers .get (0 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
171- assertThat (byteBuffers .get (1 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
161+ assertThat (byteBuffers ).hasSize (2 );
162+ Condition <Integer > condition = new Condition <>(c -> c == nioParams .getReadByteBufferSize () ||
163+ c == nioParams .getWriteByteBufferSize (), "capacity set by factory" );
164+ assertThat (byteBuffers .get (0 ).capacity ()).is (condition );
165+ assertThat (byteBuffers .get (1 ).capacity ()).is (condition );
172166 }
173167
174- @ Test public void directByteBuffers () throws Exception {
168+ @ Test
169+ public void directByteBuffers () throws Exception {
175170 ConnectionFactory cf = new ConnectionFactory ();
176171 cf .useNio ();
177172 cf .setNioParams (new NioParams ().setByteBufferFactory (new DefaultByteBufferFactory (new DefaultByteBufferFactory .ByteBufferAllocator () {
@@ -189,7 +184,8 @@ public ByteBuffer allocate(int capacity) {
189184 }
190185 }
191186
192- @ Test public void customWriteQueue () throws Exception {
187+ @ Test
188+ public void customWriteQueue () throws Exception {
193189 ConnectionFactory cf = new ConnectionFactory ();
194190 cf .useNio ();
195191 final AtomicInteger count = new AtomicInteger (0 );
@@ -199,8 +195,8 @@ public ByteBuffer allocate(int capacity) {
199195 public NioQueue create (NioContext ctx ) {
200196 count .incrementAndGet ();
201197 return new BlockingQueueNioQueue (
202- new LinkedBlockingQueue <WriteRequest >(ctx .getNioParams ().getWriteQueueCapacity ()),
203- ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
198+ new LinkedBlockingQueue <WriteRequest >(ctx .getNioParams ().getWriteQueueCapacity ()),
199+ ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
204200 );
205201 }
206202 }));
@@ -220,7 +216,7 @@ private void sendAndVerifyMessage(Connection connection, int size) throws Except
220216 }
221217
222218 private Connection basicGetBasicConsume (ConnectionFactory connectionFactory , String queue , final CountDownLatch latch )
223- throws IOException , TimeoutException {
219+ throws IOException , TimeoutException {
224220 Connection connection = connectionFactory .newConnection ();
225221 Channel channel = connection .createChannel ();
226222 channel .queueDeclare (queue , false , false , false , null );
@@ -240,7 +236,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
240236 }
241237
242238 private boolean basicGetBasicConsume (Connection connection , String queue , final CountDownLatch latch , int msgSize )
243- throws Exception {
239+ throws Exception {
244240 Channel channel = connection .createChannel ();
245241 channel .queueDeclare (queue , false , false , false , null );
246242 channel .queuePurge (queue );
0 commit comments