1717package com .rabbitmq .examples .perf ;
1818
1919import com .rabbitmq .client .Channel ;
20+ import com .rabbitmq .client .Connection ;
21+ import com .rabbitmq .client .ShutdownSignalException ;
2022
2123import java .io .IOException ;
2224import java .util .ArrayList ;
@@ -140,10 +142,12 @@ public int getMinMsgSize() {
140142 return minMsgSize ;
141143 }
142144
143- public Producer createProducer (Channel channel , Stats stats , String id ) throws IOException {
145+ public Producer createProducer (Connection connection , Channel channel , Stats stats , String id ) throws IOException {
144146 if (producerTxSize > 0 ) channel .txSelect ();
145147 if (confirm >= 0 ) channel .confirmSelect ();
146- channel .exchangeDeclare (exchangeName , exchangeType );
148+ if (!exchangeExists (connection , exchangeName )) {
149+ channel .exchangeDeclare (exchangeName , exchangeType );
150+ }
147151 final Producer producer = new Producer (channel , exchangeName , id ,
148152 flags , producerTxSize ,
149153 rateLimit , producerMsgCount ,
@@ -154,9 +158,9 @@ public Producer createProducer(Channel channel, Stats stats, String id) throws I
154158 return producer ;
155159 }
156160
157- public Consumer createConsumer (Channel channel , Stats stats , String id ) throws IOException {
161+ public Consumer createConsumer (Connection connection , Channel channel , Stats stats , String id ) throws IOException {
158162 if (consumerTxSize > 0 ) channel .txSelect ();
159- String qName = configureQueue (channel , id );
163+ String qName = configureQueue (connection , channel , id );
160164 if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
161165 return new Consumer (channel , id , qName ,
162166 consumerTxSize , autoAck , multiAckEvery ,
@@ -167,13 +171,54 @@ public boolean shouldConfigureQueue() {
167171 return consumerCount == 0 && !queueName .equals ("" );
168172 }
169173
170- public String configureQueue (Channel channel , String id ) throws IOException {
171- channel .exchangeDeclare (exchangeName , exchangeType );
172- String qName = channel .queueDeclare (queueName ,
173- flags .contains ("persistent" ),
174- exclusive , autoDelete ,
175- null ).getQueue ();
174+ public String configureQueue (Connection connection , Channel channel , String id ) throws IOException {
175+ if (!exchangeExists (connection , exchangeName )) {
176+ channel .exchangeDeclare (exchangeName , exchangeType );
177+ }
178+ String qName = queueName ;
179+ if (!queueExists (connection , queueName )) {
180+ qName = channel .queueDeclare (queueName ,
181+ flags .contains ("persistent" ),
182+ exclusive , autoDelete ,
183+ null ).getQueue ();
184+ }
176185 channel .queueBind (qName , exchangeName , id );
177186 return qName ;
178187 }
188+
189+ private static boolean exchangeExists (Connection connection , final String exchangeName ) throws IOException {
190+ return exists (connection , new Checker () {
191+ public void check (Channel ch ) throws IOException {
192+ ch .exchangeDeclarePassive (exchangeName );
193+ }
194+ });
195+ }
196+
197+ private static boolean queueExists (Connection connection , final String queueName ) throws IOException {
198+ return queueName != null && exists (connection , new Checker () {
199+ public void check (Channel ch ) throws IOException {
200+ ch .queueDeclarePassive (queueName );
201+ }
202+ });
203+ }
204+
205+ private static interface Checker {
206+ public void check (Channel ch ) throws IOException ;
207+ }
208+
209+ private static boolean exists (Connection connection , Checker checker ) throws IOException {
210+ try {
211+ Channel ch = connection .createChannel ();
212+ checker .check (ch );
213+ ch .close ();
214+ return true ;
215+ }
216+ catch (IOException e ) {
217+ if (e .getCause () instanceof ShutdownSignalException ) {
218+ return false ;
219+ } else {
220+ throw e ;
221+ }
222+ }
223+ }
179224}
0 commit comments