@@ -88,6 +88,42 @@ private void basicAck()
8888 basicAck (latestTag , false );
8989 }
9090
91+ private void basicNack (long tag , boolean multiple , boolean requeue )
92+ throws IOException
93+ {
94+ channel .basicNack (tag , multiple , requeue );
95+ }
96+
97+ private void basicNack (boolean requeue )
98+ throws IOException
99+ {
100+ basicNack (latestTag , false , requeue );
101+ }
102+
103+ private void basicNack ()
104+ throws IOException
105+ {
106+ basicNack (latestTag , false , true );
107+ }
108+
109+ private void basicReject (long tag , boolean requeue )
110+ throws IOException
111+ {
112+ channel .basicReject (tag , requeue );
113+ }
114+
115+ private void basicReject (boolean requeue )
116+ throws IOException
117+ {
118+ basicReject (latestTag , requeue );
119+ }
120+
121+ private void basicReject ()
122+ throws IOException
123+ {
124+ basicReject (latestTag , true );
125+ }
126+
91127 /*
92128 publishes are embargoed until commit
93129 */
@@ -329,4 +365,75 @@ public void testShuffleAcksBeforeRollback()
329365 basicAck (tags [2 ], false );
330366 txCommit ();
331367 }
368+
369+ /*
370+ messages with nacks get requeued after the transaction commit.
371+ messages with nacks with requeue = false are not requeued.
372+ */
373+ public void testCommitNacks ()
374+ throws IOException
375+ {
376+ basicPublish ();
377+ basicPublish ();
378+ txSelect ();
379+ basicGet ();
380+ basicNack ();
381+ basicGet ();
382+ basicNack (false );
383+ assertNull (basicGet ());
384+ txCommit ();
385+ assertNotNull (basicGet ());
386+ basicGet ();
387+ assertNull (basicGet ());
388+ }
389+
390+ public void testRollbackNacks ()
391+ throws IOException
392+ {
393+ basicPublish ();
394+ basicPublish ();
395+ txSelect ();
396+ basicGet ();
397+ basicNack (false );
398+ basicGet ();
399+ basicAck ();
400+ txRollback ();
401+ assertNull (basicGet ());
402+ }
403+
404+ /*
405+ messages with rejects get requeued after the transaction commit.
406+ messages with rejects with requeue = false are not requeued.
407+ */
408+ public void testCommitRejects ()
409+ throws IOException
410+ {
411+ basicPublish ();
412+ basicPublish ();
413+ txSelect ();
414+ basicGet ();
415+ basicReject ();
416+ basicGet ();
417+ basicReject (false );
418+ assertNull (basicGet ());
419+ txCommit ();
420+ assertNotNull (basicGet ());
421+ basicGet ();
422+ assertNull (basicGet ());
423+ }
424+
425+ public void testRollbackRejects ()
426+ throws IOException
427+ {
428+ basicPublish ();
429+ basicPublish ();
430+ txSelect ();
431+ basicGet ();
432+ basicReject (false );
433+ basicGet ();
434+ basicAck ();
435+ txRollback ();
436+ assertNull (basicGet ());
437+ }
438+
332439}
0 commit comments