Skip to content

Commit 06b1d8f

Browse files
committed
[Messenger] Fix NoAutoAckStamp handling in Worker::flush()
The line `$envelope = $envelope->withoutAll(NoAutoAckStamp::class);` was modifying a local variable that was never used after assignment. This change moves the stamp removal to the catch block where it's actually needed, ensuring envelopes stored for acknowledgment don't contain contradictory NoAutoAckStamp.
1 parent 4cb717b commit 06b1d8f

File tree

3 files changed

+51
-1
lines changed

3 files changed

+51
-1
lines changed

src/Symfony/Component/Messenger/Tests/Fixtures/DummyReceiver.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,9 @@ public function getAcknowledgedEnvelopes(): array
6363
{
6464
return $this->acknowledgedEnvelopes;
6565
}
66+
67+
public function getRejectedEnvelopes(): array
68+
{
69+
return $this->rejectedEnvelopes;
70+
}
6671
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
use Symfony\Component\Messenger\MessageBusInterface;
3939
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
4040
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
41+
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
42+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
4143
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
4244
use Symfony\Component\Messenger\Stamp\SentStamp;
4345
use Symfony\Component\Messenger\Stamp\StampInterface;
@@ -584,6 +586,49 @@ public function testFlushBatchOnStop()
584586

585587
$this->assertSame($expectedMessages, $handler->processedMessages);
586588
}
589+
590+
public function testFlushRemovesNoAutoAckStampOnException()
591+
{
592+
$envelope = new Envelope(new DummyMessage('Test'));
593+
$receiver = new DummyReceiver([[$envelope]]);
594+
595+
$bus = new class implements MessageBusInterface {
596+
public function dispatch(object $message, array $stamps = []): Envelope
597+
{
598+
$envelope = Envelope::wrap($message, $stamps);
599+
if ($envelope->last(FlushBatchHandlersStamp::class)) {
600+
throw new \RuntimeException('Flush failed');
601+
}
602+
603+
return $envelope;
604+
}
605+
};
606+
607+
$dispatcher = new EventDispatcher();
608+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
609+
static $calls = 0;
610+
if (++$calls >= 2) {
611+
$event->getWorker()->stop();
612+
}
613+
});
614+
615+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock());
616+
617+
$reflection = new \ReflectionClass($worker);
618+
$unacksProperty = $reflection->getProperty('unacks');
619+
$unacks = $unacksProperty->getValue($worker);
620+
$dummyHandler = new DummyBatchHandler();
621+
$envelopeWithNoAutoAck = $envelope->with(new NoAutoAckStamp(new HandlerDescriptor($dummyHandler)));
622+
$unacks->attach($dummyHandler, [$envelopeWithNoAutoAck, 'transport']);
623+
624+
$worker->run();
625+
626+
$this->assertSame(1, $receiver->getRejectCount());
627+
$rejectedEnvelopes = $receiver->getRejectedEnvelopes();
628+
$this->assertCount(1, $rejectedEnvelopes);
629+
$rejectedEnvelope = $rejectedEnvelopes[0];
630+
$this->assertNull($rejectedEnvelope->last(NoAutoAckStamp::class));
631+
}
587632
}
588633

589634
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface

src/Symfony/Component/Messenger/Worker.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,9 @@ private function flush(bool $force): bool
257257
[$envelope, $transportName] = $unacks[$batchHandler];
258258
try {
259259
$this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
260-
$envelope = $envelope->withoutAll(NoAutoAckStamp::class);
261260
unset($unacks[$batchHandler], $batchHandler);
262261
} catch (\Throwable $e) {
262+
$envelope = $envelope->withoutAll(NoAutoAckStamp::class);
263263
$this->acks[] = [$transportName, $envelope, $e];
264264
}
265265
}

0 commit comments

Comments
 (0)