diff --git a/src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php b/src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php new file mode 100644 index 0000000000000..8d5dd9c0082c4 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php @@ -0,0 +1,80 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bundle\FrameworkBundle\Command; + +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\Message\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Message\MessageBusInterface; +use Symfony\Component\Message\Transport\ReceiverInterface; + +/** + * @author Samuel Roze + */ +class MessageConsumeCommand extends Command +{ + protected static $defaultName = 'message:consume'; + + /** + * {@inheritdoc} + */ + protected function configure() + { + $this + ->setDefinition(array( + new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'), + new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to dispatch the messages to', 'message_bus'), + )) + ->setDescription('Consume a message') + ->setHelp(<<<'EOF' +The %command.name% command consume a message and dispatch it to the message bus. + + %command.full_name% + +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + /** @var ContainerInterface $container */ + $container = $this->getApplication()->getKernel()->getContainer(); + + if (!$container->has($receiverName = $input->getArgument('receiver'))) { + throw new \RuntimeException(sprintf('Receiver "%s" does not exist', $receiverName)); + } elseif (!($receiver = $container->get($receiverName)) instanceof ReceiverInterface) { + throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the interface "%s"', $receiverName, ReceiverInterface::class)); + } + + if (!$container->has($busName = $input->getOption('bus'))) { + throw new \RuntimeException(sprintf('Bus "%s" does not exist', $busName)); + } elseif (!($messageBus = $container->get($busName)) instanceof MessageBusInterface) { + throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the interface "%s"', $busName, MessageBusInterface::class)); + } + + foreach ($receiver->receive() as $message) { + if (!$message instanceof ReceivedMessage) { + $message = new ReceivedMessage($message); + } + + $messageBus->handle($message); + } + } +} diff --git a/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php b/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php index 2f1b2a9352410..e71195a438cf6 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php +++ b/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php @@ -18,6 +18,7 @@ use Symfony\Component\HttpFoundation\RequestStack; use Symfony\Component\HttpFoundation\Session\SessionInterface; use Symfony\Component\HttpKernel\HttpKernelInterface; +use Symfony\Component\Message\MessageBusInterface; use Symfony\Component\Routing\RouterInterface; use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface; @@ -84,6 +85,7 @@ public static function getSubscribedServices() 'security.token_storage' => '?'.TokenStorageInterface::class, 'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class, 'parameter_bag' => '?'.ContainerInterface::class, + 'message_bus' => '?'.MessageBusInterface::class, ); } } diff --git a/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessagesDataCollector.php b/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessagesDataCollector.php new file mode 100644 index 0000000000000..6bcbfdf3cfa55 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessagesDataCollector.php @@ -0,0 +1,95 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bundle\FrameworkBundle\DataCollector; + +use Symfony\Component\HttpFoundation\Request; +use Symfony\Component\HttpFoundation\Response; +use Symfony\Component\HttpKernel\DataCollector\DataCollector; +use Symfony\Component\Message\MiddlewareInterface; + +/** + * @author Samuel Roze + */ +class MessagesDataCollector extends DataCollector implements MiddlewareInterface +{ + /** + * {@inheritdoc} + */ + public function collect(Request $request, Response $response, \Exception $exception = null) + { + return $this->data; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return 'messages'; + } + + /** + * {@inheritdoc} + */ + public function reset() + { + $this->data = array(); + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $debugRepresentation = array( + 'message' => array( + 'type' => get_class($message), + ), + ); + + try { + $result = $next($message); + + if (is_object($result)) { + $debugRepresentation['result'] = array( + 'type' => get_class($result), + ); + } else { + $debugRepresentation['result'] = array( + 'type' => gettype($result), + 'value' => $result, + ); + } + } catch (\Throwable $exception) { + $debugRepresentation['exception'] = array( + 'type' => get_class($exception), + 'message' => $exception->getMessage(), + ); + } + + $this->data[] = $debugRepresentation; + + if (isset($exception)) { + throw $exception; + } + + return $result; + } + + /** + * @return array + */ + public function getMessages() + { + return $this->data; + } +} diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 8a2b10e8e4d92..f62b3fb02aa90 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -21,6 +21,7 @@ use Symfony\Component\Form\Form; use Symfony\Component\Lock\Lock; use Symfony\Component\Lock\Store\SemaphoreStore; +use Symfony\Component\Message\MessageBusInterface; use Symfony\Component\Serializer\Serializer; use Symfony\Component\Translation\Translator; use Symfony\Component\Validator\Validation; @@ -100,6 +101,7 @@ public function getConfigTreeBuilder() $this->addPhpErrorsSection($rootNode); $this->addWebLinkSection($rootNode); $this->addLockSection($rootNode); + $this->addMessageSection($rootNode); return $treeBuilder; } @@ -894,4 +896,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode) ->end() ; } + + private function addMessageSection(ArrayNodeDefinition $rootNode) + { + $rootNode + ->children() + ->arrayNode('message') + ->info('Message configuration') + ->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}() + ->children() + ->arrayNode('routing') + ->useAttributeAsKey('message_class') + ->prototype('array') + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return array('senders' => array($v)); + }) + ->end() + ->children() + ->arrayNode('senders') + ->requiresAtLeastOneElement() + ->prototype('scalar')->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ; + } } diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index f3b68fbc87f69..0ebd571255d8f 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -258,6 +258,10 @@ public function load(array $configs, ContainerBuilder $container) $this->registerLockConfiguration($config['lock'], $container, $loader); } + if ($this->isConfigEnabled($container, $config['message'])) { + $this->registerMessageConfiguration($config['message'], $container, $loader); + } + if ($this->isConfigEnabled($container, $config['web_link'])) { if (!class_exists(HttpHeaderSerializer::class)) { throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.'); @@ -1359,6 +1363,20 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont } } + private function registerMessageConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader) + { + $loader->load('message.xml'); + + $messageToSenderMapping = array(); + foreach ($config['routing'] as $message => $messageConfiguration) { + $messageToSenderMapping[$message] = array_map(function (string $serviceName) { + return new Reference($serviceName); + }, $messageConfiguration['senders']); + } + + $container->getDefinition('message.asynchronous.routing.sender_locator')->setArgument(0, $messageToSenderMapping); + } + private function registerCacheConfiguration(array $config, ContainerBuilder $container) { $version = substr(str_replace('/', '-', base64_encode(hash('sha256', uniqid(mt_rand(), true), true))), 0, 22); diff --git a/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php b/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php index 0cebbc3f49ae0..9a77531c1ce4b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php +++ b/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php @@ -32,6 +32,8 @@ use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass; use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass; use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass; +use Symfony\Component\Message\DependencyInjection\EventPass; +use Symfony\Component\Message\DependencyInjection\MessagePass; use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass; use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass; use Symfony\Component\Serializer\DependencyInjection\SerializerPass; @@ -114,6 +116,8 @@ public function build(ContainerBuilder $container) $this->addCompilerPassIfExists($container, FormPass::class); $container->addCompilerPass(new WorkflowGuardListenerPass()); $container->addCompilerPass(new ResettableServicePass()); + $this->addCompilerPassIfExists($container, MessagePass::class); + $this->addCompilerPassIfExists($container, EventPass::class); if ($container->getParameter('kernel.debug')) { $container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 34f47a0599b31..703ba3d3c239a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -64,6 +64,10 @@ + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/message.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/message.xml new file mode 100644 index 0000000000000..fe252ab21944c --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/message.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php index 84921d9737d60..a0f1747383717 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; use Symfony\Component\Config\Definition\Processor; use Symfony\Component\Lock\Store\SemaphoreStore; +use Symfony\Component\Message\MessageBusInterface; class ConfigurationTest extends TestCase { @@ -249,6 +250,10 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor ), ), ), + 'message' => array( + 'enabled' => !class_exists(FullStack::class) && class_exists(MessageBusInterface::class), + 'routing' => array(), + ), ); } } diff --git a/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig new file mode 100644 index 0000000000000..15382f42c6bfe --- /dev/null +++ b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig @@ -0,0 +1,65 @@ +{% extends '@WebProfiler/Profiler/layout.html.twig' %} + +{% import _self as helper %} + +{% block menu %} + + {{ include('@WebProfiler/Icon/messages.svg') }} + Messages + + {% if collector.messages|length > 0 %} + + {{ collector.messages|length }} + + {% endif %} + +{% endblock %} + +{% block panel %} +

Messages

+ + {% if collector.messages is empty %} +

No messages

+ {% else %} + + + + + + + + + {% for message in collector.messages %} + + + + + {% endfor %} + +
MessageResult
{{ message.message.type }} + {% if message.result.type is defined %} + {{ message.result.type }} + {% endif %} + + {% if message.exception.type is defined %} + {{ message.exception.type }} + {% endif %} +
+ {% endif %} +{% endblock %} + +{% block toolbar %} + {% set color_code = 'normal' %} + {% set message_count = 0 %} + {% set icon %} + {% if profiler_markup_version == 1 %} + {{ include('@WebProfiler/Icon/messages.svg', { height: 28, color: '#3F3F3F' }) }} + {{ message_count }} + {% else %} + {{ include('@WebProfiler/Icon/messages.svg') }} + {{ message_count }} + {% endif %} + {% endset %} + + {{ include('@WebProfiler/Profiler/toolbar_item.html.twig', { link: 'messages', status: color_code }) }} +{% endblock %} diff --git a/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg new file mode 100644 index 0000000000000..2fd49b55fe6d5 --- /dev/null +++ b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/src/Symfony/Component/Message/Asynchronous/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Message/Asynchronous/Middleware/SendMessageMiddleware.php new file mode 100644 index 0000000000000..5d656121bf3af --- /dev/null +++ b/src/Symfony/Component/Message/Asynchronous/Middleware/SendMessageMiddleware.php @@ -0,0 +1,49 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Asynchronous\Middleware; + +use Symfony\Component\Message\Asynchronous\Routing\SenderLocatorInterface; +use Symfony\Component\Message\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Message\MiddlewareInterface; + +/** + * @author Samuel Roze + */ +class SendMessageMiddleware implements MiddlewareInterface +{ + private $senderLocator; + + public function __construct(SenderLocatorInterface $senderLocator) + { + $this->senderLocator = $senderLocator; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + if ($message instanceof ReceivedMessage) { + $message = $message->getMessage(); + } elseif (!empty($senders = $this->senderLocator->getSendersForMessage($message))) { + foreach ($senders as $sender) { + $sender->send($message); + } + + if (!in_array(null, $senders)) { + return; + } + } + + return $next($message); + } +} diff --git a/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocator.php b/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocator.php new file mode 100644 index 0000000000000..4d58f72858591 --- /dev/null +++ b/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocator.php @@ -0,0 +1,38 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Asynchronous\Routing; + +/** + * @author Samuel Roze + */ +class SenderLocator implements SenderLocatorInterface +{ + /** + * Mapping describing which sender should be used for which message. + * + * @var array + */ + private $messageToSenderMapping; + + public function __construct(array $messageToSenderMapping) + { + $this->messageToSenderMapping = $messageToSenderMapping; + } + + /** + * {@inheritdoc} + */ + public function getSendersForMessage($message): array + { + return $this->messageToSenderMapping[get_class($message)] ?? $this->messageToSenderMapping['*'] ?? array(); + } +} diff --git a/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocatorInterface.php b/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocatorInterface.php new file mode 100644 index 0000000000000..e0f7af8d811b2 --- /dev/null +++ b/src/Symfony/Component/Message/Asynchronous/Routing/SenderLocatorInterface.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Asynchronous\Routing; + +use Symfony\Component\Message\Transport\SenderInterface; + +/** + * @author Samuel Roze + */ +interface SenderLocatorInterface +{ + /** + * Get the producer (if applicable) for the given message object. + * + * @param object $message + * + * @return SenderInterface[] + */ + public function getSendersForMessage($message): array; +} diff --git a/src/Symfony/Component/Message/Asynchronous/Transport/ReceivedMessage.php b/src/Symfony/Component/Message/Asynchronous/Transport/ReceivedMessage.php new file mode 100644 index 0000000000000..54ac0e888ae71 --- /dev/null +++ b/src/Symfony/Component/Message/Asynchronous/Transport/ReceivedMessage.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Asynchronous\Transport; + +/** + * Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify + * a message should not be sent if it was just received. + * + * @see \Symfony\Component\Message\Asynchronous\Middleware\SendMessageMiddleware + * + * @author Samuel Roze + */ +final class ReceivedMessage +{ + private $message; + + public function __construct($message) + { + $this->message = $message; + } + + public function getMessage() + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Message/Asynchronous/Transport/WrapIntoReceivedMessage.php b/src/Symfony/Component/Message/Asynchronous/Transport/WrapIntoReceivedMessage.php new file mode 100644 index 0000000000000..8057d70b574fb --- /dev/null +++ b/src/Symfony/Component/Message/Asynchronous/Transport/WrapIntoReceivedMessage.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Asynchronous\Transport; + +use Symfony\Component\Message\Transport\ReceiverInterface; + +/** + * @author Samuel Roze + */ +class WrapIntoReceivedMessage implements ReceiverInterface +{ + /** + * @var ReceiverInterface + */ + private $decoratedReceiver; + + public function __construct(ReceiverInterface $decoratedConsumer) + { + $this->decoratedReceiver = $decoratedConsumer; + } + + public function receive(): \iterable + { + foreach ($this->decoratedReceiver->receive() as $message) { + yield new ReceivedMessage($message); + } + } +} diff --git a/src/Symfony/Component/Message/ContainerHandlerLocator.php b/src/Symfony/Component/Message/ContainerHandlerLocator.php new file mode 100644 index 0000000000000..18a9120b2166c --- /dev/null +++ b/src/Symfony/Component/Message/ContainerHandlerLocator.php @@ -0,0 +1,43 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +use Psr\Container\ContainerInterface; +use Symfony\Component\Message\Exception\NoHandlerForMessageException; + +/** + * @author Miha Vrhovnik + * @author Samuel Roze + */ +class ContainerHandlerLocator implements HandlerLocatorInterface +{ + /** + * @var ContainerInterface + */ + private $container; + + public function __construct(ContainerInterface $container) + { + $this->container = $container; + } + + public function resolve($message): callable + { + $messageKey = get_class($message); + + if (!$this->container->has($messageKey)) { + throw new NoHandlerForMessageException(sprintf('No handler for message "%s"', $messageKey)); + } + + return $this->container->get($messageKey); + } +} diff --git a/src/Symfony/Component/Message/Debug/LoggingMiddleware.php b/src/Symfony/Component/Message/Debug/LoggingMiddleware.php new file mode 100644 index 0000000000000..c0816311571a0 --- /dev/null +++ b/src/Symfony/Component/Message/Debug/LoggingMiddleware.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Debug; + +use Symfony\Component\Message\MiddlewareInterface; +use Psr\Log\LoggerInterface; + +/** + * @author Samuel Roze + */ +class LoggingMiddleware implements MiddlewareInterface +{ + /** + * @var LoggerInterface + */ + private $logger; + + public function __construct(LoggerInterface $logger) + { + $this->logger = $logger; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $this->logger->debug('Starting processing message {class}', array( + 'message' => $message, + 'class' => get_class($message), + )); + + try { + $result = $next($message); + } catch (\Throwable $e) { + $this->logger->warning('An exception occurred while processing message {class}', array( + 'message' => $message, + 'exception' => $e, + 'class' => get_class($message), + )); + + throw $e; + } + + $this->logger->debug('Finished processing message {class}', array( + 'message' => $message, + 'class' => get_class($message), + )); + + return $result; + } +} diff --git a/src/Symfony/Component/Message/DependencyInjection/EventPass.php b/src/Symfony/Component/Message/DependencyInjection/EventPass.php new file mode 100644 index 0000000000000..9a901c30a5076 --- /dev/null +++ b/src/Symfony/Component/Message/DependencyInjection/EventPass.php @@ -0,0 +1,20 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\DependencyInjection; + +class EventPass extends MessagePass +{ + public function __construct(string $eventBusService = 'event_bus', string $middlewareTag = 'event_middleware', string $eventHandlerResolverService = 'message.event_handler_resolver', string $handlerTag = 'event_handler') + { + parent::__construct($eventBusService, $middlewareTag, $eventHandlerResolverService, $handlerTag); + } +} diff --git a/src/Symfony/Component/Message/DependencyInjection/MessagePass.php b/src/Symfony/Component/Message/DependencyInjection/MessagePass.php new file mode 100644 index 0000000000000..827be308a3832 --- /dev/null +++ b/src/Symfony/Component/Message/DependencyInjection/MessagePass.php @@ -0,0 +1,123 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\DependencyInjection; + +use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; +use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait; +use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass; +use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\Definition; +use Symfony\Component\DependencyInjection\Exception\RuntimeException; +use Symfony\Component\DependencyInjection\Reference; +use Symfony\Component\Message\Handler\ChainHandler; + +/** + * @author Samuel Roze + */ +class MessagePass implements CompilerPassInterface +{ + use PriorityTaggedServiceTrait; + + private $messageBusService; + private $middlewareTag; + private $messageHandlerResolverService; + private $handlerTag; + + public function __construct(string $messageBusService = 'message_bus', string $middlewareTag = 'message_middleware', string $messageHandlerResolverService = 'message.handler_resolver', string $handlerTag = 'message_handler') + { + $this->messageBusService = $messageBusService; + $this->middlewareTag = $middlewareTag; + $this->messageHandlerResolverService = $messageHandlerResolverService; + $this->handlerTag = $handlerTag; + } + + /** + * {@inheritdoc} + */ + public function process(ContainerBuilder $container) + { + if (!$container->hasDefinition($this->messageBusService)) { + return; + } + + if (!$middlewares = $this->findAndSortTaggedServices($this->middlewareTag, $container)) { + throw new RuntimeException(sprintf('You must tag at least one service as "%s" to use the "%s" service.', $this->middlewareTag, $this->messageBusService)); + } + + $busDefinition = $container->getDefinition($this->messageBusService); + $busDefinition->replaceArgument(0, $middlewares); + + $this->registerHandlers($container); + } + + private function registerHandlers(ContainerBuilder $container) + { + $handlersByMessage = array(); + + foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) { + foreach ($tags as $tag) { + $handles = isset($tag['handles']) ? $tag['handles'] : $this->guessHandledClass($container, $serviceId); + + if (!class_exists($handles)) { + throw new RuntimeException(sprintf('The message class "%s" declared in `__invoke` function of service "%s" does not exist', $handles, $serviceId)); + } + + $priority = isset($tag['priority']) ? $tag['priority'] : 0; + $handlersByMessage[$handles][$priority][] = new Reference($serviceId); + } + } + + foreach ($handlersByMessage as $message => $handlers) { + krsort($handlersByMessage[$message]); + $handlersByMessage[$message] = call_user_func_array('array_merge', $handlersByMessage[$message]); + } + + $definitions = array(); + foreach ($handlersByMessage as $message => $handlers) { + if (1 === count($handlers)) { + $handlersByMessage[$message] = current($handlers); + } else { + $d = new Definition(ChainHandler::class, array($handlers)); + $d->setPrivate(true); + $serviceId = hash('sha1', $message); + $definitions[$serviceId] = $d; + $handlersByMessage[$message] = new Reference($serviceId); + } + } + $container->addDefinitions($definitions); + + $handlerResolver = $container->getDefinition($this->messageHandlerResolverService); + $handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersByMessage)); + } + + private function guessHandledClass(ContainerBuilder $container, string $serviceId): string + { + $reflection = new \ReflectionClass($container->getDefinition($serviceId)->getClass()); + try { + $method = $reflection->getMethod('__invoke'); + } catch (\ReflectionException $e) { + throw new RuntimeException(sprintf('Service "%s" should have an `__invoke` function', $serviceId)); + } + + $parameters = $method->getParameters(); + if (1 !== count($parameters)) { + throw new RuntimeException(sprintf('`__invoke` function of service "%s" must have exactly one parameter', $serviceId)); + } + + $parameter = $parameters[0]; + if (null === $parameter->getClass()) { + throw new RuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the Message class it handles', $serviceId)); + } + + return $parameter->getClass()->getName(); + } +} diff --git a/src/Symfony/Component/Message/EventBus.php b/src/Symfony/Component/Message/EventBus.php new file mode 100644 index 0000000000000..a2333d3bf11e6 --- /dev/null +++ b/src/Symfony/Component/Message/EventBus.php @@ -0,0 +1,20 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +/** + * @author Samuel Roze + * @author Matthias Noback + */ +class EventBus extends MessageBus +{ +} diff --git a/src/Symfony/Component/Message/Exception/ExceptionInterface.php b/src/Symfony/Component/Message/Exception/ExceptionInterface.php new file mode 100644 index 0000000000000..f816c0e54155f --- /dev/null +++ b/src/Symfony/Component/Message/Exception/ExceptionInterface.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Exception; + +/** + * Base Message component's exception. + * + * @author Samuel Roze + */ +interface ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Message/Exception/NoHandlerForMessageException.php b/src/Symfony/Component/Message/Exception/NoHandlerForMessageException.php new file mode 100644 index 0000000000000..f1fd281eb3f15 --- /dev/null +++ b/src/Symfony/Component/Message/Exception/NoHandlerForMessageException.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Exception; + +/** + * @author Samuel Roze + */ +class NoHandlerForMessageException extends \RuntimeException implements ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Message/Handler/ChainHandler.php b/src/Symfony/Component/Message/Handler/ChainHandler.php new file mode 100644 index 0000000000000..1ca2dc3470dae --- /dev/null +++ b/src/Symfony/Component/Message/Handler/ChainHandler.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Handler; + +/** + * Represents a collection of message handlers. + * + * @author Samuel Roze + */ +class ChainHandler +{ + /** + * @var callable[] + */ + private $handlers; + + /** + * @param callable[] $handlers + */ + public function __construct(array $handlers) + { + if (empty($handlers)) { + throw new \InvalidArgumentException('A collection of message handlers requires at least one handler'); + } + + $this->handlers = $handlers; + } + + public function __invoke($message) + { + return array_map(function ($handler) use ($message) { + return $handler($message); + }, $this->handlers); + } +} diff --git a/src/Symfony/Component/Message/HandlerLocator.php b/src/Symfony/Component/Message/HandlerLocator.php new file mode 100644 index 0000000000000..c10450bcb3383 --- /dev/null +++ b/src/Symfony/Component/Message/HandlerLocator.php @@ -0,0 +1,56 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +use Symfony\Component\Message\Exception\NoHandlerForMessageException; +use Symfony\Component\Message\Handler\ChainHandler; + +/** + * @author Samuel Roze + */ +class HandlerLocator implements HandlerLocatorInterface +{ + /** + * Maps a message (its class) to a given handler. + * + * @var array + */ + private $messageToHandlerMapping; + + public function __construct(array $messageToHandlerMapping = array()) + { + $this->messageToHandlerMapping = $messageToHandlerMapping; + } + + public function resolve($message): callable + { + $messageKey = get_class($message); + + if (!array_key_exists($messageKey, $this->messageToHandlerMapping)) { + throw new NoHandlerForMessageException(sprintf('No handler for message "%s"', $messageKey)); + } + + $handler = $this->messageToHandlerMapping[$messageKey]; + if ($this->isCollectionOfHandlers($handler)) { + $handler = new ChainHandler($handler); + } + + return $handler; + } + + private function isCollectionOfHandlers($handler): bool + { + return is_array($handler) && array_reduce($handler, function (bool $allHandlers, $handler) { + return $allHandlers && is_callable($handler); + }, true); + } +} diff --git a/src/Symfony/Component/Message/HandlerLocatorInterface.php b/src/Symfony/Component/Message/HandlerLocatorInterface.php new file mode 100644 index 0000000000000..ac27078ad5c33 --- /dev/null +++ b/src/Symfony/Component/Message/HandlerLocatorInterface.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +use Symfony\Component\Message\Exception\NoHandlerForMessageException; + +/** + * @author Samuel Roze + */ +interface HandlerLocatorInterface +{ + /** + * Return the handler for the given message. + * + * @param object $message + * + * @throws NoHandlerForMessageException + * + * @return callable + */ + public function resolve($message): callable; +} diff --git a/src/Symfony/Component/Message/MessageBus.php b/src/Symfony/Component/Message/MessageBus.php new file mode 100644 index 0000000000000..0b8241ff26bf7 --- /dev/null +++ b/src/Symfony/Component/Message/MessageBus.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +/** + * @author Samuel Roze + * @author Matthias Noback + */ +class MessageBus implements MessageBusInterface +{ + /** + * @var MiddlewareInterface[] + */ + private $middlewares; + + /** + * @param MiddlewareInterface[] $middlewares + */ + public function __construct(array $middlewares = array()) + { + $this->middlewares = $middlewares; + } + + /** + * {@inheritdoc} + */ + public function dispatch($message) + { + return call_user_func($this->callableForNextMiddleware(0), $message); + } + + private function callableForNextMiddleware($index): callable + { + if (!isset($this->middlewares[$index])) { + return function () {}; + } + + $middleware = $this->middlewares[$index]; + + return function ($message) use ($middleware, $index) { + $middleware->handle($message, $this->callableForNextMiddleware($index + 1)); + }; + } +} diff --git a/src/Symfony/Component/Message/MessageBusInterface.php b/src/Symfony/Component/Message/MessageBusInterface.php new file mode 100644 index 0000000000000..b66f070623a03 --- /dev/null +++ b/src/Symfony/Component/Message/MessageBusInterface.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +/** + * @author Samuel Roze + */ +interface MessageBusInterface +{ + /** + * Dispatch the given message. + * + * The bus can return a value coming from handlers, but is not required to do so. + * + * @param object $message + * + * @return mixed + */ + public function dispatch($message); +} diff --git a/src/Symfony/Component/Message/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Message/Middleware/HandleMessageMiddleware.php new file mode 100644 index 0000000000000..94d331d6100f8 --- /dev/null +++ b/src/Symfony/Component/Message/Middleware/HandleMessageMiddleware.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Middleware; + +use Symfony\Component\Message\MiddlewareInterface; +use Symfony\Component\Message\HandlerLocatorInterface; + +/** + * @author Samuel Roze + */ +class HandleMessageMiddleware implements MiddlewareInterface +{ + /** + * @var HandlerLocatorInterface + */ + private $messageHandlerResolver; + + public function __construct(HandlerLocatorInterface $messageHandlerResolver) + { + $this->messageHandlerResolver = $messageHandlerResolver; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $handler = $this->messageHandlerResolver->resolve($message); + $result = $handler($message); + + $next($message); + + return $result; + } +} diff --git a/src/Symfony/Component/Message/MiddlewareInterface.php b/src/Symfony/Component/Message/MiddlewareInterface.php new file mode 100644 index 0000000000000..db953f2097025 --- /dev/null +++ b/src/Symfony/Component/Message/MiddlewareInterface.php @@ -0,0 +1,26 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message; + +/** + * @author Samuel Roze + */ +interface MiddlewareInterface +{ + /** + * @param object $message + * @param callable $next + * + * @return mixed + */ + public function handle($message, callable $next); +} diff --git a/src/Symfony/Component/Message/README.md b/src/Symfony/Component/Message/README.md new file mode 100644 index 0000000000000..7791e77372921 --- /dev/null +++ b/src/Symfony/Component/Message/README.md @@ -0,0 +1,263 @@ +Message Component +================= + +The Message component helps application to send and receive messages to/from other applications or via +message queues. + +Resources +--------- + + * [Contributing](https://symfony.com/doc/current/contributing/index.html) + * [Report issues](https://github.com/symfony/symfony/issues) and + [send Pull Requests](https://github.com/symfony/symfony/pulls) + in the [main Symfony repository](https://github.com/symfony/symfony) + + +Documentation +------------- + +**Note:** this documentation is to be moved to symfony.com when merging the Component. + +### Concepts + +![Component overview](Resources/doc/component-overview.png) + +1. **Sender** + Responsible for serializing and sending the message to _something_. This something can be a message broker or a 3rd + party API for example. + +2. **Receiver** + Responsible for deserializing and forwarding the messages to handler(s). This can be a message queue puller or an API + endpoint for example. + +3. **Handler** + Given a received message, contains the user business logic related to the message. In practice, that is just a PHP + callable. + + +### Bus + +The bus is used to dispatch messages. MessageBus' behaviour is in its ordered middleware stack. When using +the message bus with Symfony's FrameworkBundle, the following middlewares are configured for you: + +1. `LoggingMiddleware` (log the processing of your messages) +2. `SendMessageMiddleware` (enable asynchronous processing) +3. `HandleMessageMiddleware` (call the registered handle) + +```php +use App\Message\MyMessage; + +$result = $this->get('message_bus')->handle(new MyMessage(/* ... */)); +``` + +### Handlers + +Once dispatched to the bus, messages will be handled by a "message handler". A message handler is a PHP callable +(i.e. a function or an instance of a class) that will do the required processing for your message. It _might_ return a +result. + +```php +namespace App\MessageHandler; + +use App\Message\MyMessage; + +class MyMessageHandler +{ + public function __invoke(MyMessage $message) + { + // Message processing... + } +} +``` + +```xml + + + +``` + +**Note:** If the message cannot be guessed from the handler's type-hint, use the `handles` attribute on the tag. + +### Asynchronous messages + +Using the Message Component is useful to decouple your application but it also very useful when you want to do some +asychronous processing. This means that your application will produce a message to a queuing system and consume this +message later in the background, using a _worker_. + +#### Adapters + +The communication with queuing system or 3rd parties is for delegated to libraries for now. You can use one of the +following adapters: + +- [PHP Enqueue bridge](https://github.com/sroze/enqueue-bridge) to use one of their 10+ compatible queues such as + RabbitMq, Amazon SQS or Google Pub/Sub. +- [Swarrot adapter](https://github.com/sroze/swarrot-bridge) to use Swarrot, a library specialised in consuming + messages from AMQP brokers such as RabbitMq. +- [HTTP adapter](https://github.com/sroze/message-http-adapter) to receive and send messages through HTTP APIs. + +#### Routing + +When doing asynchronous processing, the key is to route the message to the right sender. As the routing is +application-specific and not message-specific, the configuration can be made within the `framework.yaml` +configuration file as well: + +```yaml +framework: + message: + routing: + 'My\Message\MessageAboutDoingOperationalWork': my_operations_queue_sender +``` + +Such configuration would only route the `MessageAboutDoingOperationalWork` message to be asynchronous, the rest of the +messages would still be directly handled. + +If you want to do route all the messages to a queue by default, you can use such configuration: +```yaml +framework: + message: + routing: + 'My\Message\MessageAboutDoingOperationalWork': my_operations_queue_sender + '*': my_default_sender +``` + +Note that you can also route a message to multiple senders at the same time: +```yaml +framework: + message: + routing: + 'My\Message\AnImportantMessage': [my_default_sender, my_audit_semder] +``` + +#### Same bus received and sender + +To allow us to receive and send messages on the same bus and prevent a loop, the message bus is equipped with the +`WrapIntoReceivedMessage` received. It will wrap the received messages into `ReceivedMessage` objects and the +`SendMessageMiddleware` middleware will know it should not send these messages. + +### Your own sender + +Using the `SenderInterface`, you can easily create your own message sender. Let's say you already have an +`ImportantAction` message going through the message bus and handled by a handler. Now, you also want to send this +message as an email. + +1. Create your sender + +```php +namespace App\MessageSender; + +use Symfony\Component\Message\SenderInterface; +use App\Message\ImportantAction; + +class ImportantActionToEmailSender implements SenderInterface +{ + private $toEmail; + private $mailer; + + public function __construct(\Swift_Mailer $mailer, string $toEmail) + { + $this->mailer = $mailer; + $this->toEmail = $toEmail; + } + + public function send($message) + { + if (!$message instanceof ImportantAction) { + throw new \InvalidArgumentException(sprintf('Producer only supports "%s" messages', ImportantAction::class)); + } + + $this->mailer->send( + (new \Swift_Message('Important action made')) + ->setTo($this->toEmail) + ->setBody( + '

Important action

Made by '.$message->getUsername().'

', + 'text/html' + ) + ); + } +} +``` + +2. Register your sender service + +```yaml +services: + App\MessageSender\ImportantActionToEmailSender: + arguments: + - "@mailer" + - "%to_email%" + + tags: + - message.sender +``` + +3. Route your important message to the sender + +```yaml +framework: + message: + routing: + 'App\Message\ImportantAction': [App\MessageSender\ImportantActionToEmailSender, ~] +``` + +**Note:** this example shows you how you can at the same time send your message and directly handle it using a `null` +(`~`) sender. + +### Your own receiver + +A consumer is responsible of receiving messages from a source and dispatching them to the application. + +Let's say you already proceed some "orders" on your application using a `NewOrder` message. Now you want to integrate with +a 3rd party or a legacy application but you can't use an API and need to use a shared CSV file with new orders. + +You will read this CSV file and dispatch a `NewOrder` message. All you need to do is your custom CSV consumer and Symfony will do the rest. + +1. Create your receiver + +```php +namespace App\MessageReceiver; + +use Symfony\Component\Message\ReceiverInterface; +use Symfony\Component\Serializer\SerializerInterface; + +use App\Message\NewOrder; + +class NewOrdersFromCsvFile implements ReceiverInterface +{ + private $serializer; + private $filePath; + + public function __construct(SerializerInteface $serializer, string $filePath) + { + $this->serializer = $serializer; + $this->filePath = $filePath; + } + + public function receive() : \Generator + { + $ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv'); + + foreach ($ordersFromCsv as $orderFromCsv) { + yield new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']); + } + } +} +``` + +2. Register your receiver service + +```yaml +services: + App\MessageReceiver\NewOrdersFromCsvFile: + arguments: + - "@serializer" + - "%new_orders_csv_file_path%" + + tags: + - message.receiver +``` + +3. Use your consumer + +```bash +$ bin/console message:consume App\MessageReceived\NewOrdersFromCsvFile +``` diff --git a/src/Symfony/Component/Message/Recorder/AggregatesRecordedMessages.php b/src/Symfony/Component/Message/Recorder/AggregatesRecordedMessages.php new file mode 100644 index 0000000000000..f7e8e1d1482b1 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/AggregatesRecordedMessages.php @@ -0,0 +1,63 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +/** + * @author Matthias Noback + */ +class AggregatesRecordedMessages implements ContainsRecordedMessages +{ + /** + * @var ContainsRecordedMessages[] + */ + private $messageRecorders; + + public function __construct(array $messageRecorders) + { + foreach ($messageRecorders as $messageRecorder) { + $this->addMessageRecorder($messageRecorder); + } + } + + /** + * Get messages recorded by all known message recorders. + * + * {@inheritdoc} + */ + public function recordedMessages(): array + { + $allRecordedMessages = []; + + foreach ($this->messageRecorders as $messageRecorder) { + $allRecordedMessages = array_merge($allRecordedMessages, $messageRecorder->recordedMessages()); + } + + return $allRecordedMessages; + } + + /** + * Erase messages recorded by all known message recorders. + * + * {@inheritdoc} + */ + public function eraseMessages(): void + { + foreach ($this->messageRecorders as $messageRecorder) { + $messageRecorder->eraseMessages(); + } + } + + private function addMessageRecorder(ContainsRecordedMessages $messageRecorder) + { + $this->messageRecorders[] = $messageRecorder; + } +} diff --git a/src/Symfony/Component/Message/Recorder/ContainsRecordedMessages.php b/src/Symfony/Component/Message/Recorder/ContainsRecordedMessages.php new file mode 100644 index 0000000000000..9b480a775caf5 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/ContainsRecordedMessages.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +/** + * @author Matthias Noback + */ +interface ContainsRecordedMessages +{ + /** + * Fetch recorded messages. + */ + public function recordedMessages(): array; + + /** + * Erase messages that were recorded since the last call to eraseMessages(). + */ + public function eraseMessages(): void; +} diff --git a/src/Symfony/Component/Message/Recorder/HandlesRecordedMessagesMiddleware.php b/src/Symfony/Component/Message/Recorder/HandlesRecordedMessagesMiddleware.php new file mode 100644 index 0000000000000..872a8506c6642 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/HandlesRecordedMessagesMiddleware.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +use Exception; +use Symfony\Component\Message\MessageBusInterface; +use Symfony\Component\Message\MiddlewareInterface; + +/** + * @author Matthias Noback + */ +class HandlesRecordedMessagesMiddleware implements MiddlewareInterface +{ + /** + * @var ContainsRecordedMessages + */ + private $messageRecorder; + + /** + * @var MessageBusInterface + */ + private $messageBus; + + public function __construct(ContainsRecordedMessages $messageRecorder, MessageBusInterface $messageBus) + { + $this->messageRecorder = $messageRecorder; + $this->messageBus = $messageBus; + } + + public function handle($message, callable $next) + { + try { + $next($message); + } catch (Exception $exception) { + $this->messageRecorder->eraseMessages(); + + throw $exception; + } + + $recordedMessages = $this->messageRecorder->recordedMessages(); + + $this->messageRecorder->eraseMessages(); + + foreach ($recordedMessages as $recordedMessage) { + $this->messageBus->dispatch($recordedMessage); + } + } +} diff --git a/src/Symfony/Component/Message/Recorder/PrivateMessageRecorderCapabilities.php b/src/Symfony/Component/Message/Recorder/PrivateMessageRecorderCapabilities.php new file mode 100644 index 0000000000000..428c15006f061 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/PrivateMessageRecorderCapabilities.php @@ -0,0 +1,49 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +/** + * @author Matthias Noback + * + * Use this trait in classes which implement ContainsRecordedMessages to privately record and later release Message + * instances, like events. + */ +trait PrivateMessageRecorderCapabilities +{ + private $messages = []; + + /** + * {@inheritdoc} + */ + public function recordedMessages(): array + { + return $this->messages; + } + + /** + * {@inheritdoc} + */ + public function eraseMessages(): void + { + $this->messages = []; + } + + /** + * Record a message. + * + * @param object $message + */ + protected function record($message): void + { + $this->messages[] = $message; + } +} diff --git a/src/Symfony/Component/Message/Recorder/PublicMessageRecorder.php b/src/Symfony/Component/Message/Recorder/PublicMessageRecorder.php new file mode 100644 index 0000000000000..6497ce92202f0 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/PublicMessageRecorder.php @@ -0,0 +1,20 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +/** + * @author Matthias Noback + */ +class PublicMessageRecorder implements RecordsMessages +{ + use PrivateMessageRecorderCapabilities { record as public; } +} diff --git a/src/Symfony/Component/Message/Recorder/RecordsMessages.php b/src/Symfony/Component/Message/Recorder/RecordsMessages.php new file mode 100644 index 0000000000000..a5384e00de7e9 --- /dev/null +++ b/src/Symfony/Component/Message/Recorder/RecordsMessages.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Recorder; + +/** + * @author Matthias Noback + */ +interface RecordsMessages extends ContainsRecordedMessages +{ + /** + * Record a message. + * + * @param object $message + */ + public function record($message); +} diff --git a/src/Symfony/Component/Message/Resources/doc/component-overview.png b/src/Symfony/Component/Message/Resources/doc/component-overview.png new file mode 100644 index 0000000000000..074255b4667f5 Binary files /dev/null and b/src/Symfony/Component/Message/Resources/doc/component-overview.png differ diff --git a/src/Symfony/Component/Message/Transport/ReceiverInterface.php b/src/Symfony/Component/Message/Transport/ReceiverInterface.php new file mode 100644 index 0000000000000..92cd8d83eb5f4 --- /dev/null +++ b/src/Symfony/Component/Message/Transport/ReceiverInterface.php @@ -0,0 +1,20 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Transport; + +/** + * @author Samuel Roze + */ +interface ReceiverInterface +{ + public function receive(): \iterable; +} diff --git a/src/Symfony/Component/Message/Transport/SenderInterface.php b/src/Symfony/Component/Message/Transport/SenderInterface.php new file mode 100644 index 0000000000000..d5e2a6b5d9bcb --- /dev/null +++ b/src/Symfony/Component/Message/Transport/SenderInterface.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Transport; + +/** + * @author Samuel Roze + */ +interface SenderInterface +{ + /** + * Send the given message. + * + * @param object $message + */ + public function send($message); +} diff --git a/src/Symfony/Component/Message/Transport/Serialization/DecoderInterface.php b/src/Symfony/Component/Message/Transport/Serialization/DecoderInterface.php new file mode 100644 index 0000000000000..57e1504a101f1 --- /dev/null +++ b/src/Symfony/Component/Message/Transport/Serialization/DecoderInterface.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Transport\Serialization; + +/** + * @author Samuel Roze + */ +interface DecoderInterface +{ + /** + * Decode the message from an encoded-form. The `$encodedMessage` parameter is a key-value array that + * describes the message, that will be used by the different adapters. + * + * The most common keys are: + * - `body` (string) - the message body + * - `headers` (string) - a key/value pair of headers + * + * @param array $encodedMessage + * + * @return object + */ + public function decode(array $encodedMessage); +} diff --git a/src/Symfony/Component/Message/Transport/Serialization/EncoderInterface.php b/src/Symfony/Component/Message/Transport/Serialization/EncoderInterface.php new file mode 100644 index 0000000000000..d6ae1906c518b --- /dev/null +++ b/src/Symfony/Component/Message/Transport/Serialization/EncoderInterface.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Transport\Serialization; + +/** + * @author Samuel Roze + */ +interface EncoderInterface +{ + /** + * Encode a message to a common format understandable by adapters. The encoded array should only + * contain scalar and arrays. + * + * The most common keys of the encoded array are: + * - `body` (string) - the message body + * - `headers` (string) - a key/value pair of headers + * + * @param object $message + * + * @return array + */ + public function encode($message): array; +} diff --git a/src/Symfony/Component/Message/Transport/Serialization/SymfonySerialization.php b/src/Symfony/Component/Message/Transport/Serialization/SymfonySerialization.php new file mode 100644 index 0000000000000..5f1821088728f --- /dev/null +++ b/src/Symfony/Component/Message/Transport/Serialization/SymfonySerialization.php @@ -0,0 +1,63 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Message\Transport\Serialization; + +use Symfony\Component\Serializer\SerializerInterface; + +/** + * @author Samuel Roze + */ +class SymfonySerialization implements DecoderInterface, EncoderInterface +{ + /** + * @var SerializerInterface + */ + private $serializer; + + /** + * @var string + */ + private $format; + + public function __construct(SerializerInterface $serializer, string $format = 'json') + { + $this->serializer = $serializer; + $this->format = $format; + } + + /** + * {@inheritdoc} + */ + public function decode(array $encodedMessage) + { + if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) { + throw new \InvalidArgumentException('Encoded message should have at least a `body` some `headers`'); + } elseif (empty($encodedMessage['headers']['type'])) { + throw new \InvalidArgumentException('Encoded message does not have a `type` header'); + } + + return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format); + } + + /** + * {@inheritdoc} + */ + public function encode($message): array + { + return array( + 'body' => $this->serializer->serialize($message, $this->format), + 'headers' => array( + 'type' => get_class($message), + ), + ); + } +} diff --git a/src/Symfony/Component/Message/composer.json b/src/Symfony/Component/Message/composer.json new file mode 100644 index 0000000000000..de7ef6ac0d88a --- /dev/null +++ b/src/Symfony/Component/Message/composer.json @@ -0,0 +1,39 @@ +{ + "name": "symfony/message", + "type": "library", + "description": "Symfony Message Component", + "keywords": [], + "homepage": "https://symfony.com", + "license": "MIT", + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "require": { + "php": "^7.1.3" + }, + "require-dev": { + "symfony/serializer": "~3.4|~4.0" + }, + "suggest": { + "sroze/enqueue-bridge": "For using the php-enqueue library as an adapater." + }, + "autoload": { + "psr-4": { "Symfony\\Component\\Message\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "4.0-dev" + } + } +} diff --git a/src/Symfony/Component/Message/phpunit.xml.dist b/src/Symfony/Component/Message/phpunit.xml.dist new file mode 100644 index 0000000000000..9bc89a7ef9a21 --- /dev/null +++ b/src/Symfony/Component/Message/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + + + + ./Tests/ + + + + + + ./ + + ./Tests + ./vendor + + + +