You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
feature #30958 [Messenger] Allows to register handlers on a specific transport (sroze)
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Allows to register handlers on a specific transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | #30110
| License | MIT
| Doc PR | symfony/symfony-docs#11236
With the #30008 pull-request in, we can now do the following:
```yaml
framework:
messenger:
transports:
events:
dsn: amqp://guest:guest@127.0.0.1/%2f/events
options:
queues:
3rdparty:
binding_keys: [ 3rdparty ]
projection:
binding_keys: [ projection ]
events_3rdparty: amqp://guest:guest@127.0.0.1/%2f/events?queues[3rdparty]
events_projection: amqp://guest:guest@127.0.0.1/%2f/events?queues[projection]
routing:
'App\Message\RegisterBet': events
```
This will bind two queues to the `events` exchange, fantastic:
<img width="325" alt="Screenshot 2019-04-07 at 10 26 27" src="https://user-images.githubusercontent.com/804625/55680861-af373580-591f-11e9-8f1e-2d3b6ddba2fd.png">
---
Now, in this setup, the message will be duplicated within the `3rdparty` & `projection` queues. If you just run the consumer for each transport, it will consume the message and call all the handlers. You can't do different things based on which queue you have consumed the message. **This pull-request adds the following feature:**
```php
class RegisterBetHandler implements MessageSubscriberInterface
{
public function __invoke(RegisterBet $message)
{
// Do something only when the message comes from the `events_projection` transport...
}
/**
* {@inheritdoc}
*/
public static function getHandledMessages(): iterable
{
yield RegisterBet::class => [
'from_transport' => 'events_projection',
];
}
}
```
---
In the debugger, it looks like this:
<img width="649" alt="Screenshot 2019-04-07 at 10 29 55" src="https://user-images.githubusercontent.com/804625/55680892-1d7bf800-5920-11e9-80f7-853f0201c6d8.png">
Commits
-------
f0b2acd67d Allows to register handlers on a specific transport (and get rid of this handler alias)
@@ -94,32 +95,33 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
94
95
$message = null;
95
96
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
96
97
97
-
foreach ($handlesas$message => $method) {
98
+
foreach ($handlesas$message => $options) {
98
99
$buses = $handlerBuses;
100
+
99
101
if (\is_int($message)) {
100
-
$message = $method;
101
-
$method = '__invoke';
102
+
if (\is_string($options)) {
103
+
$message = $options;
104
+
$options = [];
105
+
} else {
106
+
thrownewRuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
107
+
}
102
108
}
103
109
104
-
if (\is_array($message)) {
105
-
list($message, $priority) = $message;
106
-
} else {
107
-
$priority = $tag['priority'] ?? 0;
110
+
if (\is_string($options)) {
111
+
$options = ['method' => $options];
108
112
}
109
113
110
-
if (\is_array($method)) {
111
-
if (isset($method['bus'])) {
112
-
if (!\in_array($method['bus'], $busIds)) {
113
-
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
thrownewRuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
116
-
}
117
+
if (isset($options['bus'])) {
118
+
if (!\in_array($options['bus'], $busIds)) {
119
+
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
117
120
118
-
$buses = [$method['bus']];
121
+
thrownewRuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
119
122
}
120
123
121
-
$priority = $method['priority'] ?? $priority;
122
-
$method = $method['method'] ?? '__invoke';
124
+
$buses = [$options['bus']];
123
125
}
124
126
125
127
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -141,7 +143,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
thrownewLogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));
0 commit comments