55use Illuminate \Contracts \Queue \Queue as QueueContract ;
66use Illuminate \Queue \Queue as BaseQueue ;
77use Interop \Queue \Context ;
8+ use Interop \Amqp \Impl \AmqpMessage ;
89
910class Queue extends BaseQueue implements QueueContract
1011{
@@ -56,9 +57,15 @@ public function push($job, $data = '', $queue = null)
5657 */
5758 public function pushRaw ($ payload , $ queue = null , array $ options = [])
5859 {
59- $ this ->context ->createProducer ()->send (
60+ $ message = $ this ->context ->createMessage ($ payload );
61+
62+ if ($ message instanceof AmqpMessage) {
63+ $ message ->setDeliveryMode (\Interop \Amqp \AmqpMessage::DELIVERY_MODE_PERSISTENT );
64+ }
65+
66+ return $ this ->context ->createProducer ()->send (
6067 $ this ->getQueue ($ queue ),
61- $ this -> context -> createMessage ( $ payload )
68+ $ message
6269 );
6370 }
6471
@@ -69,11 +76,13 @@ public function later($delay, $job, $data = '', $queue = null)
6976 {
7077 $ message = $ this ->context ->createMessage ($ this ->createPayload ($ job , $ data ));
7178
72- $ this ->context ->createProducer ()
73- ->setDeliveryDelay ($ this ->secondsUntil ($ delay ) * 1000 )
79+ if ($ message instanceof AmqpMessage) {
80+ $ message ->setDeliveryMode (\Interop \Amqp \AmqpMessage::DELIVERY_MODE_PERSISTENT );
81+ }
7482
75- ->send ($ this ->getQueue ($ queue ), $ message )
76- ;
83+ return $ this ->context ->createProducer ()
84+ ->setDeliveryDelay ($ this ->secondsUntil ($ delay ) * 1000 )
85+ ->send ($ this ->getQueue ($ queue ), $ message );
7786 }
7887
7988 public function pop ($ queue = null )
0 commit comments