Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Bundle\FrameworkBundle\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\Message\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Message\MessageBusInterface;
use Symfony\Component\Message\Transport\ReceiverInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessageConsumeCommand extends Command
{
protected static $defaultName = 'message:consume';

/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setDefinition(array(
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to dispatch the messages to', 'message_bus'),
))
->setDescription('Consume a message')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consume a message and dispatch it to the message bus.

%command.full_name% <consumer-service-name>

EOF
)
;
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();

if (!$container->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" does not exist', $receiverName));
} elseif (!($receiver = $container->get($receiverName)) instanceof ReceiverInterface) {
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the interface "%s"', $receiverName, ReceiverInterface::class));
}

if (!$container->has($busName = $input->getOption('bus'))) {
throw new \RuntimeException(sprintf('Bus "%s" does not exist', $busName));
} elseif (!($messageBus = $container->get($busName)) instanceof MessageBusInterface) {
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the interface "%s"', $busName, MessageBusInterface::class));
}

foreach ($receiver->receive() as $message) {
if (!$message instanceof ReceivedMessage) {
$message = new ReceivedMessage($message);
}

$messageBus->handle($message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\HttpFoundation\RequestStack;
use Symfony\Component\HttpFoundation\Session\SessionInterface;
use Symfony\Component\HttpKernel\HttpKernelInterface;
use Symfony\Component\Message\MessageBusInterface;
use Symfony\Component\Routing\RouterInterface;
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
Expand Down Expand Up @@ -84,6 +85,7 @@ public static function getSubscribedServices()
'security.token_storage' => '?'.TokenStorageInterface::class,
'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class,
'parameter_bag' => '?'.ContainerInterface::class,
'message_bus' => '?'.MessageBusInterface::class,
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Bundle\FrameworkBundle\DataCollector;

use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\Message\MiddlewareInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessagesDataCollector extends DataCollector implements MiddlewareInterface
{
/**
* {@inheritdoc}
*/
public function collect(Request $request, Response $response, \Exception $exception = null)
{
return $this->data;
}

/**
* {@inheritdoc}
*/
public function getName()
{
return 'messages';
}

/**
* {@inheritdoc}
*/
public function reset()
{
$this->data = array();
}

/**
* {@inheritdoc}
*/
public function handle($message, callable $next)
{
$debugRepresentation = array(
'message' => array(
'type' => get_class($message),
),
);

try {
$result = $next($message);

if (is_object($result)) {
$debugRepresentation['result'] = array(
'type' => get_class($result),
);
} else {
$debugRepresentation['result'] = array(
'type' => gettype($result),
'value' => $result,
);
}
} catch (\Throwable $exception) {
$debugRepresentation['exception'] = array(
'type' => get_class($exception),
'message' => $exception->getMessage(),
);
}

$this->data[] = $debugRepresentation;

if (isset($exception)) {
throw $exception;
}

return $result;
}

/**
* @return array
*/
public function getMessages()
{
return $this->data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Symfony\Component\Form\Form;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Lock\Store\SemaphoreStore;
use Symfony\Component\Message\MessageBusInterface;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Translation\Translator;
use Symfony\Component\Validator\Validation;
Expand Down Expand Up @@ -100,6 +101,7 @@ public function getConfigTreeBuilder()
$this->addPhpErrorsSection($rootNode);
$this->addWebLinkSection($rootNode);
$this->addLockSection($rootNode);
$this->addMessageSection($rootNode);

return $treeBuilder;
}
Expand Down Expand Up @@ -894,4 +896,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode)
->end()
;
}

private function addMessageSection(ArrayNodeDefinition $rootNode)
{
$rootNode
->children()
->arrayNode('message')
->info('Message configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->prototype('array')
->beforeNormalization()
->ifString()
->then(function ($v) {
return array('senders' => array($v));
})
->end()
->children()
->arrayNode('senders')
->requiresAtLeastOneElement()
->prototype('scalar')->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ public function load(array $configs, ContainerBuilder $container)
$this->registerLockConfiguration($config['lock'], $container, $loader);
}

if ($this->isConfigEnabled($container, $config['message'])) {
$this->registerMessageConfiguration($config['message'], $container, $loader);
}

if ($this->isConfigEnabled($container, $config['web_link'])) {
if (!class_exists(HttpHeaderSerializer::class)) {
throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.');
Expand Down Expand Up @@ -1359,6 +1363,20 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
}
}

private function registerMessageConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
{
$loader->load('message.xml');

$messageToSenderMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
$messageToSenderMapping[$message] = array_map(function (string $serviceName) {
return new Reference($serviceName);
}, $messageConfiguration['senders']);
}

$container->getDefinition('message.asynchronous.routing.sender_locator')->setArgument(0, $messageToSenderMapping);
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
{
$version = substr(str_replace('/', '-', base64_encode(hash('sha256', uniqid(mt_rand(), true), true))), 0, 22);
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass;
use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass;
use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass;
use Symfony\Component\Message\DependencyInjection\EventPass;
use Symfony\Component\Message\DependencyInjection\MessagePass;
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
Expand Down Expand Up @@ -114,6 +116,8 @@ public function build(ContainerBuilder $container)
$this->addCompilerPassIfExists($container, FormPass::class);
$container->addCompilerPass(new WorkflowGuardListenerPass());
$container->addCompilerPass(new ResettableServicePass());
$this->addCompilerPassIfExists($container, MessagePass::class);
$this->addCompilerPassIfExists($container, EventPass::class);

if ($container->getParameter('kernel.debug')) {
$container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
<tag name="console.command" command="debug:event-dispatcher" />
</service>

<service id="console.command.message_consume" class="Symfony\Bundle\FrameworkBundle\Command\MessageConsumeCommand">
<tag name="console.command" command="message:consume" />
</service>

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<tag name="console.command" command="debug:router" />
Expand Down
88 changes: 88 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/message.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">

<services>
<defaults public="false" />

<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
<argument type="collection" /> <!-- Middlewares -->
</service>

<service id="Symfony\Component\Message\MessageBusInterface" alias="message_bus" />

<!-- Handlers -->
<service id="message.handler_resolver" class="Symfony\Component\Message\ContainerHandlerLocator">
<argument type="service" id="service_container"/>
</service>

<service id="message.middleware.call_message_handler" class="Symfony\Component\Message\Middleware\HandleMessageMiddleware">
<argument type="service" id="message.handler_resolver" />

<tag name="message_middleware" priority="-10" />
</service>

<!-- Asynchronous -->
<service id="message.asynchronous.routing.sender_locator" class="Symfony\Component\Message\Asynchronous\Routing\SenderLocator">
<argument type="collection" /> <!-- Message to sender mapping -->
</service>
<service id="message.asynchronous.middleware.send_message_to_producer" class="Symfony\Component\Message\Asynchronous\Middleware\SendMessageMiddleware">
<argument type="service" id="message.asynchronous.routing.sender_locator" />

<tag name="message_middleware" priority="-5" />
</service>

<!-- Message encoding/decoding -->
<service id="message.transport.serialize_message_with_type_in_headers" class="Symfony\Component\Message\Transport\SymfonySerialization">
<argument type="service" id="serializer" />
</service>

<service id="message.transport.default_encoder" alias="message.transport.serialize_message_with_type_in_headers" public="true" />
<service id="message.transport.default_decoder" alias="message.transport.serialize_message_with_type_in_headers" public="true" />

<!-- Logging & Debug -->
<service id="message.middleware.debug.logging" class="Symfony\Component\Message\Debug\LoggingMiddleware">
<argument type="service" id="logger" />

<tag name="message_middleware" priority="10" />
<tag name="monolog.logger" channel="message" />
</service>

<service id="data_collector.messages" class="Symfony\Bundle\FrameworkBundle\DataCollector\MessagesDataCollector">
<tag name="data_collector" template="@WebProfiler/Collector/messages.html.twig" id="messages" priority="100" />
<tag name="message_middleware" />
</service>

<!-- Event Bus -->
<service id="event_bus" class="Symfony\Component\Message\EventBus" public="true">
<argument type="collection" /> <!-- Middlewares -->
</service>

<!-- Handlers -->
<service id="message.event_handler_resolver" class="Symfony\Component\Message\ContainerHandlerLocator">
<argument type="service" id="service_container"/>
</service>

<service id="message.middleware.call_event_handler" class="Symfony\Component\Message\Middleware\HandleMessageMiddleware">
<argument type="service" id="message.event_handler_resolver" />

<tag name="event_middleware" priority="-10" />
</service>

<service id="message.middleware.recorded_messages_handler" class="Symfony\Component\Message\Recorder\HandlesRecordedMessagesMiddleware">
<argument type="service" id="message.event_recorder" />
<argument type="service" id="event_bus" />

<tag name="message_middleware" priority="-20" />
</service>

<!-- Event Recorder -->
<service id="message.event_recorder" class="Symfony\Component\Message\Recorder\PublicMessageRecorder" />
<service id="event_recorder" alias="message.event_recorder" />
<service id="Symfony\Component\Message\Recorder\RecordsMessages" alias="event_recorder" />

</services>
</container>
Loading