@@ -53,21 +53,21 @@ AMQP.BasicProperties setupDlx(boolean persistent) throws IOException {
5353
5454 public void testQueueSize () throws IOException , InterruptedException {
5555 declareQueue (false , false );
56- fill (false , false );
56+ fill (false , false , false );
5757 syncPublish (null , "msg" + MAXLENGTH1 );
5858 assertHead (MAXLENGTH , "msg2" , q );
5959 }
6060
6161 public void testQueueUnacked () throws IOException , InterruptedException {
6262 declareQueue (false , false );
63- fill (false , true );
63+ fill (false , true , false );
6464 syncPublish (null , "msg" + MAXLENGTH1 );
6565 assertHead (1 , "msg" + MAXLENGTH1 , q );
6666 }
6767
6868 public void testPersistent () throws IOException , InterruptedException {
6969 declareQueue (true , false );
70- fill (true , true );
70+ fill (true , true , false );
7171 syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH1 );
7272 assertHead (1 , "msg" + MAXLENGTH1 , q );
7373 }
@@ -90,31 +90,31 @@ public void testDlxTailDurable() throws IOException, InterruptedException {
9090
9191 public void dlxHead (boolean persistent ) throws IOException , InterruptedException {
9292 AMQP .BasicProperties props = setupDlx (persistent );
93- fill (persistent , false );
93+ fill (persistent , false , true );
9494 syncPublish (props , "msg" + MAXLENGTH1 );
9595 assertEquals (MAXLENGTH , declareQueue (persistent , true ));
9696 assertHead (1 , "msg1" , "DLQ" );
9797 }
9898
9999 public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
100100 AMQP .BasicProperties props = setupDlx (persistent );
101- fill (persistent , true );
101+ fill (persistent , true , true );
102102 syncPublish (props , "msg" + MAXLENGTH1 );
103103 assertNull (null , channel .basicGet ("DLQ" , true ));
104104 assertHead (1 , "msg" + MAXLENGTH1 , q );
105105 }
106106
107- private void fill (boolean persistent , boolean unAcked ) throws IOException , InterruptedException {
107+ private void fill (boolean persistent , boolean unAcked , boolean dlx ) throws IOException , InterruptedException {
108108 for (int i =1 ; i <= MAXLENGTH ; i ++){
109109 syncPublish (null , "msg" + i );
110110 if (unAcked ) {
111111 assertNotNull (channel .basicGet (q , false ));
112112 }
113113 }
114114 if (unAcked ) {
115- assertEquals (0 , declareQueue (persistent , false ));
115+ assertEquals (0 , declareQueue (persistent , dlx ));
116116 } else {
117- assertEquals (MAXLENGTH , declareQueue (persistent , false ));
117+ assertEquals (MAXLENGTH , declareQueue (persistent , dlx ));
118118 }
119119 }
120120
0 commit comments