@@ -33,9 +33,9 @@ class Connection
3333 'x-message-ttl ' ,
3434 ];
3535
36- private $ connectionConfiguration ;
37- private $ exchangeConfiguration ;
38- private $ queuesConfiguration ;
36+ private $ connectionOptions ;
37+ private $ exchangeOptions ;
38+ private $ queuesOptions ;
3939 private $ amqpFactory ;
4040
4141 /**
@@ -86,17 +86,17 @@ class Connection
8686 * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
8787 * * prefetch_count: set channel prefetch count
8888 */
89- public function __construct (array $ connectionConfiguration , array $ exchangeConfiguration , array $ queuesConfiguration , AmqpFactory $ amqpFactory = null )
89+ public function __construct (array $ connectionOptions , array $ exchangeOptions , array $ queuesOptions , AmqpFactory $ amqpFactory = null )
9090 {
91- $ this ->connectionConfiguration = array_replace_recursive ([
91+ $ this ->connectionOptions = array_replace_recursive ([
9292 'delay ' => [
9393 'routing_key_pattern ' => 'delay_%delay% ' ,
9494 'exchange_name ' => 'delay ' ,
9595 'queue_name_pattern ' => 'delay_queue_%delay% ' ,
9696 ],
97- ], $ connectionConfiguration );
98- $ this ->exchangeConfiguration = $ exchangeConfiguration ;
99- $ this ->queuesConfiguration = $ queuesConfiguration ;
97+ ], $ connectionOptions );
98+ $ this ->exchangeOptions = $ exchangeOptions ;
99+ $ this ->queuesOptions = $ queuesOptions ;
100100 $ this ->amqpFactory = $ amqpFactory ?: new AmqpFactory ();
101101 }
102102
@@ -183,8 +183,6 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
183183 $ this ->setup ();
184184 }
185185
186- // TODO - allow flag & attributes to be configured on the message
187-
188186 $ this ->exchange ()->publish (
189187 $ body ,
190188 $ routingKey ?? $ this ->getDefaultPublishRoutingKey (),
@@ -214,8 +212,6 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
214212 $ this ->setupDelay ($ delay , $ exchangeRoutingKey );
215213 }
216214
217- // TODO - allow flag & attributes to be configured on the message
218-
219215 $ this ->getDelayExchange ()->publish (
220216 $ body ,
221217 $ this ->getRoutingKeyForDelay ($ delay ),
@@ -244,7 +240,7 @@ private function getDelayExchange(): \AMQPExchange
244240 {
245241 if (null === $ this ->amqpDelayExchange ) {
246242 $ this ->amqpDelayExchange = $ this ->amqpFactory ->createExchange ($ this ->channel ());
247- $ this ->amqpDelayExchange ->setName ($ this ->connectionConfiguration ['delay ' ]['exchange_name ' ]);
243+ $ this ->amqpDelayExchange ->setName ($ this ->connectionOptions ['delay ' ]['exchange_name ' ]);
248244 $ this ->amqpDelayExchange ->setType (AMQP_EX_TYPE_DIRECT );
249245 }
250246
@@ -262,10 +258,8 @@ private function getDelayExchange(): \AMQPExchange
262258 */
263259 private function createDelayQueue (int $ delay , ?string $ routingKey )
264260 {
265- $ delayConfiguration = $ this ->connectionConfiguration ['delay ' ];
266-
267261 $ queue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
268- $ queue ->setName (str_replace ('%delay% ' , $ delay , $ delayConfiguration ['queue_name_pattern ' ]));
262+ $ queue ->setName (str_replace ('%delay% ' , $ delay , $ this -> connectionOptions [ ' delay ' ] ['queue_name_pattern ' ]));
269263 $ queue ->setArguments ([
270264 'x-message-ttl ' => $ delay ,
271265 'x-dead-letter-exchange ' => $ this ->exchange ()->getName (),
@@ -282,7 +276,7 @@ private function createDelayQueue(int $delay, ?string $routingKey)
282276
283277 private function getRoutingKeyForDelay (int $ delay ): string
284278 {
285- return str_replace ('%delay% ' , $ delay , $ this ->connectionConfiguration ['delay ' ]['routing_key_pattern ' ]);
279+ return str_replace ('%delay% ' , $ delay , $ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]);
286280 }
287281
288282 /**
@@ -332,7 +326,7 @@ public function setup(): void
332326
333327 $ this ->exchange ()->declareExchange ();
334328
335- foreach ($ this ->queuesConfiguration as $ queueName => $ queueConfig ) {
329+ foreach ($ this ->queuesOptions as $ queueName => $ queueConfig ) {
336330 $ this ->queue ($ queueName )->declareQueue ();
337331 foreach ($ queueConfig ['binding_keys ' ] ?? [null ] as $ bindingKey ) {
338332 $ this ->queue ($ queueName )->bind ($ this ->exchange ()->getName (), $ bindingKey );
@@ -345,40 +339,37 @@ public function setup(): void
345339 */
346340 public function getQueueNames (): array
347341 {
348- return array_keys ($ this ->queuesConfiguration );
342+ return array_keys ($ this ->queuesOptions );
349343 }
350344
351- /**
352- * @internal
353- */
354345 public function channel (): \AMQPChannel
355346 {
356347 if (null === $ this ->amqpChannel ) {
357- $ connection = $ this ->amqpFactory ->createConnection ($ this ->connectionConfiguration );
358- $ connectMethod = 'true ' === ($ this ->connectionConfiguration ['persistent ' ] ?? 'false ' ) ? 'pconnect ' : 'connect ' ;
348+ $ connection = $ this ->amqpFactory ->createConnection ($ this ->connectionOptions );
349+ $ connectMethod = 'true ' === ($ this ->connectionOptions ['persistent ' ] ?? 'false ' ) ? 'pconnect ' : 'connect ' ;
359350
360351 try {
361352 $ connection ->{$ connectMethod }();
362353 } catch (\AMQPConnectionException $ e ) {
363- $ credentials = $ this ->connectionConfiguration ;
354+ $ credentials = $ this ->connectionOptions ;
364355 $ credentials ['password ' ] = '******** ' ;
365356
366357 throw new \AMQPException (sprintf ('Could not connect to the AMQP server. Please verify the provided DSN. (%s) ' , json_encode ($ credentials )), 0 , $ e );
367358 }
368359 $ this ->amqpChannel = $ this ->amqpFactory ->createChannel ($ connection );
369360
370- if (isset ($ this ->connectionConfiguration ['prefetch_count ' ])) {
371- $ this ->amqpChannel ->setPrefetchCount ($ this ->connectionConfiguration ['prefetch_count ' ]);
361+ if (isset ($ this ->connectionOptions ['prefetch_count ' ])) {
362+ $ this ->amqpChannel ->setPrefetchCount ($ this ->connectionOptions ['prefetch_count ' ]);
372363 }
373364 }
374365
375366 return $ this ->amqpChannel ;
376367 }
377368
378- private function queue (string $ queueName ): \AMQPQueue
369+ public function queue (string $ queueName ): \AMQPQueue
379370 {
380371 if (!isset ($ this ->amqpQueues [$ queueName ])) {
381- $ queueConfig = $ this ->queuesConfiguration [$ queueName ];
372+ $ queueConfig = $ this ->queuesOptions [$ queueName ];
382373
383374 $ amqpQueue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
384375 $ amqpQueue ->setName ($ queueName );
@@ -394,27 +385,22 @@ private function queue(string $queueName): \AMQPQueue
394385 return $ this ->amqpQueues [$ queueName ];
395386 }
396387
397- private function exchange (): \AMQPExchange
388+ public function exchange (): \AMQPExchange
398389 {
399390 if (null === $ this ->amqpExchange ) {
400391 $ this ->amqpExchange = $ this ->amqpFactory ->createExchange ($ this ->channel ());
401- $ this ->amqpExchange ->setName ($ this ->exchangeConfiguration ['name ' ]);
402- $ this ->amqpExchange ->setType ($ this ->exchangeConfiguration ['type ' ] ?? AMQP_EX_TYPE_FANOUT );
403- $ this ->amqpExchange ->setFlags ($ this ->exchangeConfiguration ['flags ' ] ?? AMQP_DURABLE );
392+ $ this ->amqpExchange ->setName ($ this ->exchangeOptions ['name ' ]);
393+ $ this ->amqpExchange ->setType ($ this ->exchangeOptions ['type ' ] ?? AMQP_EX_TYPE_FANOUT );
394+ $ this ->amqpExchange ->setFlags ($ this ->exchangeOptions ['flags ' ] ?? AMQP_DURABLE );
404395
405- if (isset ($ this ->exchangeConfiguration ['arguments ' ])) {
406- $ this ->amqpExchange ->setArguments ($ this ->exchangeConfiguration ['arguments ' ]);
396+ if (isset ($ this ->exchangeOptions ['arguments ' ])) {
397+ $ this ->amqpExchange ->setArguments ($ this ->exchangeOptions ['arguments ' ]);
407398 }
408399 }
409400
410401 return $ this ->amqpExchange ;
411402 }
412403
413- public function getConnectionConfiguration (): array
414- {
415- return $ this ->connectionConfiguration ;
416- }
417-
418404 private function clear (): void
419405 {
420406 $ this ->amqpChannel = null ;
@@ -424,11 +410,11 @@ private function clear(): void
424410
425411 private function shouldSetup (): bool
426412 {
427- if (!\array_key_exists ('auto_setup ' , $ this ->connectionConfiguration )) {
413+ if (!\array_key_exists ('auto_setup ' , $ this ->connectionOptions )) {
428414 return true ;
429415 }
430416
431- if (\in_array ($ this ->connectionConfiguration ['auto_setup ' ], [false , 'false ' ], true )) {
417+ if (\in_array ($ this ->connectionOptions ['auto_setup ' ], [false , 'false ' ], true )) {
432418 return false ;
433419 }
434420
@@ -437,7 +423,7 @@ private function shouldSetup(): bool
437423
438424 private function getDefaultPublishRoutingKey (): ?string
439425 {
440- return $ this ->exchangeConfiguration ['default_publish_routing_key ' ] ?? null ;
426+ return $ this ->exchangeOptions ['default_publish_routing_key ' ] ?? null ;
441427 }
442428
443429 public function purgeQueues ()
0 commit comments