@@ -105,30 +105,31 @@ protected function checkQueueLength(): void
105105
106106 /**
107107 * @param mixed $data
108- * @param MessageInterface $message
108+ * @param MessageInterface|mixed $message
109109 */
110110 protected function getCallback ($ data , $ message ): callable
111111 {
112112 return function () use ($ data , $ message ) {
113113 try {
114- if ($ message instanceof MessageInterface) {
115- $ this ->event ?->dispatch(new BeforeHandle ($ message ));
114+ // If the message is invalid, just ack it.
115+ if (! $ message instanceof MessageInterface) {
116+ $ this ->ack ($ data );
117+ return ;
118+ }
116119
117- $ result = $ message -> job ()-> handle ( );
120+ $ this -> event ?->dispatch( new BeforeHandle ( $ message ) );
118121
119- match ($ result ) {
120- Result::REQUEUE => $ this ->remove ($ data ) && $ this ->retry ($ data ),
121- Result::RETRY => $ this ->remove ($ data ) && $ message ->attempts () && $ this ->retry ($ message ),
122- Result::DROP => $ this ->remove ($ data ),
123- Result::ACK => $ this ->ack ($ data ),
124- default => $ this ->ack ($ data ),
125- };
122+ $ result = $ message ->job ()->handle ();
126123
127- $ this ->event ?->dispatch(new AfterHandle ($ message ));
128- } else {
129- // If the message is invalid, just ack it.
130- $ this ->ack ($ data );
131- }
124+ match ($ result ) {
125+ Result::REQUEUE => $ this ->remove ($ data ) && $ this ->retry ($ data ),
126+ Result::RETRY => $ this ->remove ($ data ) && $ message ->attempts () && $ this ->retry ($ message ),
127+ Result::DROP => $ this ->remove ($ data ),
128+ Result::ACK => $ this ->ack ($ data ),
129+ default => $ this ->ack ($ data ),
130+ };
131+
132+ $ this ->event ?->dispatch(new AfterHandle ($ message , $ result instanceof Result ? $ result : null ));
132133 } catch (Throwable $ ex ) {
133134 if (isset ($ message , $ data )) {
134135 if ($ message ->attempts () && $ this ->remove ($ data )) {
0 commit comments