@@ -79,8 +79,8 @@ class Connection
7979 * * flags: Exchange flags (Default: AMQP_DURABLE)
8080 * * arguments: Extra arguments
8181 * * delay:
82- * * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
83- * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
82+ * * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_% delay%")
83+ * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_% delay%")
8484 * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
8585 * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
8686 * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
@@ -90,9 +90,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
9090 {
9191 $ this ->connectionOptions = array_replace_recursive ([
9292 'delay ' => [
93- 'routing_key_pattern ' => 'delay_%delay% ' ,
93+ 'routing_key_pattern ' => 'delay_%routing_key%_% delay% ' ,
9494 'exchange_name ' => 'delay ' ,
95- 'queue_name_pattern ' => 'delay_queue_%delay% ' ,
95+ 'queue_name_pattern ' => 'delay_queue_%routing_key%_% delay% ' ,
9696 ],
9797 ], $ connectionOptions );
9898 $ this ->exchangeOptions = $ exchangeOptions ;
@@ -186,7 +186,7 @@ public function publish(string $body, array $headers = [], int $delay = 0, AmqpS
186186 $ this ->publishOnExchange (
187187 $ this ->exchange (),
188188 $ body ,
189- ( null !== $ amqpStamp ? $ amqpStamp -> getRoutingKey () : null ) ?? $ this ->getDefaultPublishRoutingKey ( ),
189+ $ this ->getRoutingKeyForMessage ( $ amqpStamp ),
190190 [
191191 'headers ' => $ headers ,
192192 ],
@@ -209,14 +209,16 @@ public function countMessagesInQueues(): int
209209 */
210210 private function publishWithDelay (string $ body , array $ headers , int $ delay , AmqpStamp $ amqpStamp = null )
211211 {
212+ $ routingKey = $ this ->getRoutingKeyForMessage ($ amqpStamp );
213+
212214 if ($ this ->shouldSetup ()) {
213- $ this ->setupDelay ($ delay , null !== $ amqpStamp ? $ amqpStamp -> getRoutingKey () : null );
215+ $ this ->setupDelay ($ delay , $ routingKey );
214216 }
215217
216218 $ this ->publishOnExchange (
217219 $ this ->getDelayExchange (),
218220 $ body ,
219- $ this ->getRoutingKeyForDelay ($ delay ),
221+ $ this ->getRoutingKeyForDelay ($ delay, $ routingKey ),
220222 [
221223 'headers ' => $ headers ,
222224 ],
@@ -245,7 +247,7 @@ private function setupDelay(int $delay, ?string $routingKey)
245247
246248 $ queue = $ this ->createDelayQueue ($ delay , $ routingKey );
247249 $ queue ->declareQueue ();
248- $ queue ->bind ($ exchange ->getName (), $ this ->getRoutingKeyForDelay ($ delay ));
250+ $ queue ->bind ($ exchange ->getName (), $ this ->getRoutingKeyForDelay ($ delay, $ routingKey ));
249251 }
250252
251253 private function getDelayExchange (): \AMQPExchange
@@ -271,13 +273,16 @@ private function getDelayExchange(): \AMQPExchange
271273 private function createDelayQueue (int $ delay , ?string $ routingKey )
272274 {
273275 $ queue = $ this ->amqpFactory ->createQueue ($ this ->channel ());
274- $ queue ->setName (str_replace ('%delay% ' , $ delay , $ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]));
276+ $ queue ->setName (str_replace (
277+ ['%delay% ' , '%routing_key% ' ],
278+ [$ delay , $ routingKey ?: '' ],
279+ $ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
280+ ));
275281 $ queue ->setArguments ([
276282 'x-message-ttl ' => $ delay ,
277283 'x-dead-letter-exchange ' => $ this ->exchange ()->getName (),
278284 ]);
279285
280- $ routingKey = $ routingKey ?? $ this ->getDefaultPublishRoutingKey ();
281286 if (null !== $ routingKey ) {
282287 // after being released from to DLX, this routing key will be used
283288 $ queue ->setArgument ('x-dead-letter-routing-key ' , $ routingKey );
@@ -286,9 +291,13 @@ private function createDelayQueue(int $delay, ?string $routingKey)
286291 return $ queue ;
287292 }
288293
289- private function getRoutingKeyForDelay (int $ delay ): string
294+ private function getRoutingKeyForDelay (int $ delay, ? string $ finalRoutingKey ): string
290295 {
291- return str_replace ('%delay% ' , $ delay , $ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]);
296+ return str_replace (
297+ ['%delay% ' , '%routing_key% ' ],
298+ [$ delay , $ finalRoutingKey ?: '' ],
299+ $ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]
300+ );
292301 }
293302
294303 /**
@@ -444,4 +453,9 @@ public function purgeQueues()
444453 $ this ->queue ($ queueName )->purge ();
445454 }
446455 }
456+
457+ private function getRoutingKeyForMessage (?AmqpStamp $ amqpStamp ): ?string
458+ {
459+ return (null !== $ amqpStamp ? $ amqpStamp ->getRoutingKey () : null ) ?? $ this ->getDefaultPublishRoutingKey ();
460+ }
447461}
0 commit comments