4040import com .rabbitmq .client .Connection ;
4141import com .rabbitmq .client .ConnectionFactory ;
4242import com .rabbitmq .client .MessageProperties ;
43+ import com .rabbitmq .client .AMQP .BasicProperties ;
4344
4445public class ProducerMain implements Runnable {
4546 public static final int SUMMARY_EVERY_MS = 1000 ;
@@ -150,15 +151,19 @@ private void runIt() throws IOException {
150151 _channel = _connection .createChannel ();
151152
152153 String queueName = "test queue" ;
154+ String exchangeName = "test exchange" ;
155+
153156 _channel .queueDeclare (queueName , true , false , false , null );
157+ _channel .exchangeDeclare (exchangeName , "fanout" );
158+ _channel .queueBind (queueName , exchangeName , "" );
154159
155160 if (shouldCommit ()) {
156161 _channel .txSelect ();
157162 }
158- sendBatch (queueName );
163+ sendBatch (exchangeName , queueName );
159164
160165 if (_sendCompletion ) {
161- String exchangeName = "test completion" ;
166+ exchangeName = "test completion" ;
162167 _channel .exchangeDeclarePassive (exchangeName );
163168 _channel .basicPublish (exchangeName , "" , MessageProperties .BASIC , new byte [0 ]);
164169 if (shouldCommit ())
@@ -181,7 +186,7 @@ public void primeServer(String queueName) throws IOException {
181186 System .out .println ("...starting." );
182187 }
183188
184- public void sendBatch (String queueName ) throws IOException {
189+ public void sendBatch (String exchangeName , String queueName ) throws IOException {
185190 //primeServer(queueName);
186191
187192 long startTime = System .currentTimeMillis ();
@@ -191,6 +196,9 @@ public void sendBatch(String queueName) throws IOException {
191196
192197 long nextSummaryTime = startTime + SUMMARY_EVERY_MS ;
193198 byte [] message = new byte [256 ];
199+ BasicProperties props = shouldPersist () ?
200+ MessageProperties .MINIMAL_PERSISTENT_BASIC :
201+ MessageProperties .MINIMAL_BASIC ;
194202 for (int i = 0 ; i < _messageCount ; i ++) {
195203 ByteArrayOutputStream acc = new ByteArrayOutputStream ();
196204 DataOutputStream d = new DataOutputStream (acc );
@@ -205,7 +213,7 @@ public void sendBatch(String queueName) throws IOException {
205213 acc .flush ();
206214 byte [] message0 = acc .toByteArray ();
207215 System .arraycopy (message0 , 0 , message , 0 , message0 .length );
208- _channel .basicPublish ("" , queueName , shouldPersist () ? MessageProperties . MINIMAL_PERSISTENT_BASIC : MessageProperties . MINIMAL_BASIC ,
216+ _channel .basicPublish (exchangeName , queueName , props ,
209217 message );
210218 sent ++;
211219 if (shouldCommit ()) {
0 commit comments