Skip to content

Commit 8b780da

Browse files
committed
Anon consumer
1 parent 6cb38d7 commit 8b780da

File tree

2 files changed

+138
-41
lines changed

2 files changed

+138
-41
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
Адаптирована работа с `RPC_Server` и `RPC_Clients`.
1919

20+
Адаптирована работа с `Anon consumer`.
21+
2022
# Оригинальное readme.MD с некоторыми корректировками
2123

2224
## О проекте
@@ -265,7 +267,7 @@ class RandomIntServer
265267
- [ ] Multi-consumer
266268
- [ ] Dynamic consumer
267269
- [ ] Batch consumer
268-
- [ ] Anon consumer
270+
- [x] Anon consumer
269271
- [x] Rpc client
270272
- [x] Rpc server
271273
- [ ] Logged channel

lib/Integration/DI/Services.php

Lines changed: 135 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ class Services
3838
private $services;
3939

4040
/**
41-
* @var ContainerBuilder $containerBuilder Контейнер.
41+
* @var ContainerBuilder $container Контейнер.
4242
*/
43-
private $containerBuilder;
43+
private $container;
4444

4545
/**
4646
* @var boolean $booted Загружена ли уже конструкция.
@@ -56,12 +56,12 @@ public function __construct()
5656
$this->parameters = Configuration::getInstance('proklung.rabbitmq')->get('parameters') ?? [];
5757
$this->services = Configuration::getInstance('proklung.rabbitmq')->get('services') ?? [];
5858

59-
$this->containerBuilder = new ContainerBuilder();
59+
$this->container = new ContainerBuilder();
6060
$adapter = new BitrixSettingsDiAdapter();
6161

62-
$adapter->importParameters($this->containerBuilder, $this->config);
63-
$adapter->importParameters($this->containerBuilder, $this->parameters);
64-
$adapter->importServices($this->containerBuilder, $this->services);
62+
$adapter->importParameters($this->container, $this->config);
63+
$adapter->importParameters($this->container, $this->parameters);
64+
$adapter->importServices($this->container, $this->services);
6565
}
6666

6767
/**
@@ -113,12 +113,13 @@ public function load() : void
113113
$this->loadBindings();
114114
$this->loadProducers();
115115
$this->loadConsumers();
116+
$this->loadAnonConsumers();
116117
$this->loadRpcClients();
117118
$this->loadRpcServers();
118119

119120
$this->loadPartsHolder();
120121

121-
$this->containerBuilder->compile(false);
122+
$this->container->compile(false);
122123
}
123124

124125
/**
@@ -128,7 +129,7 @@ public function load() : void
128129
*/
129130
public function getContainer(): ContainerBuilder
130131
{
131-
return $this->containerBuilder;
132+
return $this->container;
132133
}
133134

134135
/**
@@ -147,26 +148,26 @@ private function loadPartsHolder() : void
147148
ksort($binding);
148149
$key = md5(json_encode($binding));
149150

150-
$part = $this->containerBuilder->get("rabbitmq.binding.{$key}");
151+
$part = $this->container->get("rabbitmq.binding.{$key}");
151152
$instance->addPart('rabbitmq.binding', $part);
152153
}
153154

154155
foreach ($this->config['producers'] as $key => $producer) {
155-
$part = $this->containerBuilder->get("rabbitmq.{$key}_producer");
156+
$part = $this->container->get("rabbitmq.{$key}_producer");
156157
$instance->addPart('rabbitmq.base_amqp', $part);
157158
$instance->addPart('rabbitmq.producer', $part);
158159
}
159160

160161
foreach ($this->config['consumers'] as $key => $consumer) {
161-
$part = $this->containerBuilder->get("rabbitmq.{$key}_consumer");
162+
$part = $this->container->get("rabbitmq.{$key}_consumer");
162163
$instance->addPart('rabbitmq.base_amqp', $part);
163164
$instance->addPart('rabbitmq.consumer', $part);
164165
}
165166

166167
return $instance;
167168
};
168169

169-
$this->containerBuilder->set('rabbitmq.parts_holder', $holder());
170+
$this->container->set('rabbitmq.parts_holder', $holder());
170171
}
171172

172173
/**
@@ -191,7 +192,7 @@ private function loadConnections() : void
191192

192193
if (isset($connection['connection_parameters_provider'])) {
193194
/** @var ConnectionParametersProviderInterface $parametersProvider */
194-
$parametersProvider = $this->containerBuilder->get($connection['connection_parameters_provider']);
195+
$parametersProvider = $this->container->get($connection['connection_parameters_provider']);
195196
}
196197

197198
/** @var AMQPConnectionFactory $instance */
@@ -205,15 +206,15 @@ private function loadConnections() : void
205206
};
206207

207208
$createConnector = function () use ($factoryName) {
208-
return $this->containerBuilder->get($factoryName)->createConnection();
209+
return $this->container->get($factoryName)->createConnection();
209210
};
210211

211-
$this->containerBuilder->set(
212+
$this->container->set(
212213
$factoryName,
213214
$constructor()
214215
);
215216

216-
$this->containerBuilder->set(
217+
$this->container->set(
217218
$connectionName,
218219
$createConnector()
219220
);
@@ -243,7 +244,7 @@ private function loadBindings() : void
243244
$connectionName = "rabbitmq.connection.{$binding['connection']}";
244245

245246
/** @var Binding $instance */
246-
$instance = new $className($this->containerBuilder->get($connectionName));
247+
$instance = new $className($this->container->get($connectionName));
247248

248249
$instance->setArguments($binding['arguments']);
249250
$instance->setDestination($binding['destination']);
@@ -255,7 +256,7 @@ private function loadBindings() : void
255256
return $instance;
256257
};
257258

258-
$this->containerBuilder->set(
259+
$this->container->set(
259260
"rabbitmq.binding.{$key}",
260261
$binding()
261262
);
@@ -291,7 +292,7 @@ private function loadProducers() : void
291292
$connectionName = "rabbitmq.connection.{$producer['connection']}";
292293

293294
/** @var Producer $instance */
294-
$instance = new $className($this->containerBuilder->get($connectionName));
295+
$instance = new $className($this->container->get($connectionName));
295296

296297
$instance->setExchangeOptions($producer['exchange_options']);
297298
$instance->setQueueOptions($producer['queue_options']);
@@ -301,20 +302,20 @@ private function loadProducers() : void
301302
}
302303

303304
if (isset($producer['enable_logger']) && $producer['enable_logger']) {
304-
$instance->setLogger($this->containerBuilder->get($producer['logger']));
305+
$instance->setLogger($this->container->get($producer['logger']));
305306
}
306307

307308
return $instance;
308309
};
309310

310-
$this->containerBuilder->set(
311+
$this->container->set(
311312
$producerServiceName,
312313
$producers()
313314
);
314315
}
315316
} else {
316317
foreach ($this->config['producers'] as $key => $producer) {
317-
$this->containerBuilder->register(
318+
$this->container->register(
318319
"rabbitmq.{$key}_producer",
319320
$this->parameters['rabbitmq.fallback.class']
320321
)->setPublic(true);
@@ -344,7 +345,7 @@ private function loadConsumers() : void
344345
$connectionName = "rabbitmq.connection.{$consumer['connection']}";
345346

346347
/** @var Consumer $instance */
347-
$instance = new $className($this->containerBuilder->get($connectionName));
348+
$instance = new $className($this->container->get($connectionName));
348349

349350
$instance->setExchangeOptions($consumer['exchange_options']);
350351
$instance->setQueueOptions($consumer['queue_options']);
@@ -385,7 +386,7 @@ private function loadConsumers() : void
385386
}
386387

387388
if (isset($consumer['enable_logger']) && $consumer['enable_logger']) {
388-
$instance->setLogger($this->containerBuilder->get($consumer['logger']));
389+
$instance->setLogger($this->container->get($consumer['logger']));
389390
}
390391

391392
if ($this->isDequeverAwareInterface(get_class($callback))) {
@@ -396,7 +397,7 @@ private function loadConsumers() : void
396397
return $instance;
397398
};
398399

399-
$this->containerBuilder->set(
400+
$this->container->set(
400401
"rabbitmq.{$key}_consumer",
401402
$consumers()
402403
);
@@ -425,7 +426,7 @@ private function loadRpcClients() : void
425426
}
426427
$definition->setPublic(true);
427428

428-
$this->containerBuilder->setDefinition(sprintf('rabbitmq.%s_rpc', $key), $definition);
429+
$this->container->setDefinition(sprintf('rabbitmq.%s_rpc', $key), $definition);
429430
}
430431
}
431432

@@ -435,10 +436,7 @@ private function loadRpcClients() : void
435436
private function loadRpcServers() : void
436437
{
437438
foreach ($this->config['rpc_servers'] as $key => $server) {
438-
// Регистрация callback как сервиса.
439-
$defCallBack = new Definition($server['callback']);
440-
$defCallBack->setPublic(true);
441-
$this->containerBuilder->setDefinition($server['callback'], $defCallBack);
439+
$this->registerCallbackAsService($server['callback']);
442440

443441
$definition = new Definition('%rabbitmq.rpc_server.class%');
444442
$definition
@@ -471,22 +469,119 @@ private function loadRpcServers() : void
471469
$definition->addMethodCall('setSerializer', array($server['serializer']));
472470
}
473471

474-
$this->containerBuilder->setDefinition(sprintf('rabbitmq.%s_server', $key), $definition);
472+
$this->container->setDefinition(sprintf('rabbitmq.%s_server', $key), $definition);
475473
}
476474
}
477475

478-
private function injectLoggedChannel(Definition $definition, $name, $connectionName)
476+
/**
477+
* @return void
478+
*/
479+
private function loadAnonConsumers() : void
480+
{
481+
foreach ($this->config['anon_consumers'] as $key => $anon) {
482+
$this->registerCallbackAsService($anon['callback']);
483+
484+
$definition = new Definition('%rabbitmq.anon_consumer.class%');
485+
$definition
486+
->setPublic(true)
487+
->addTag('rabbitmq.base_amqp')
488+
->addTag('rabbitmq.anon_consumer')
489+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
490+
->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
491+
$this->injectConnection($definition, $anon['connection']);
492+
493+
$name = sprintf('rabbitmq.%s_anon', $key);
494+
$this->container->setDefinition($name, $definition);
495+
$this->addDequeuerAwareCall($anon['callback'], $name);
496+
}
497+
}
498+
499+
/**
500+
* Регистрация класса сервисом.
501+
*
502+
* @param string $class Класс.
503+
*
504+
* @return void
505+
*/
506+
private function registerCallbackAsService(string $class)
479507
{
480-
$id = sprintf('rabbitmq.channel.%s', $name);
481-
$channel = new Definition('%rabbitmq.logged.channel.class%');
482-
$channel
483-
->setPublic(false)
484-
->addTag('rabbitmq.logged_channel');
485-
$this->injectConnection($channel, $connectionName);
508+
// Регистрация class как сервиса.
509+
$defCallBack = new Definition($class);
510+
$defCallBack->setPublic(true);
511+
$this->container->setDefinition($class, $defCallBack);
512+
}
486513

487-
$this->containerBuilder->setDefinition($id, $channel);
514+
/**
515+
* Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
516+
* parameter. So we revert the change for right configurations.
517+
*
518+
* @param array $config
519+
*
520+
* @return array
521+
*/
522+
private function normalizeArgumentKeys(array $config)
523+
{
524+
if (isset($config['arguments'])) {
525+
$arguments = $config['arguments'];
526+
// support for old configuration
527+
if (is_string($arguments)) {
528+
$arguments = $this->argumentsStringAsArray($arguments);
529+
}
488530

489-
$definition->addArgument(new Reference($id));
531+
$newArguments = array();
532+
foreach ($arguments as $key => $value) {
533+
if (strstr($key, '_')) {
534+
$key = str_replace('_', '-', $key);
535+
}
536+
$newArguments[$key] = $value;
537+
}
538+
$config['arguments'] = $newArguments;
539+
}
540+
return $config;
541+
}
542+
543+
/**
544+
* Support for arguments provided as string. Support for old configuration files.
545+
*
546+
* @deprecated
547+
* @param string $arguments
548+
* @return array
549+
*/
550+
private function argumentsStringAsArray($arguments)
551+
{
552+
$argumentsArray = array();
553+
554+
$argumentPairs = explode(',', $arguments);
555+
foreach ($argumentPairs as $argument) {
556+
$argumentPair = explode(':', $argument);
557+
$type = 'S';
558+
if (isset($argumentPair[2])) {
559+
$type = $argumentPair[2];
560+
}
561+
$argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
562+
}
563+
564+
return $argumentsArray;
565+
}
566+
567+
/**
568+
* Add proper dequeuer aware call.
569+
*
570+
* @param string $callback
571+
* @param string $name
572+
*
573+
* @return void
574+
*/
575+
private function addDequeuerAwareCall($callback, $name) : void
576+
{
577+
if (!$this->container->has($callback)) {
578+
return;
579+
}
580+
581+
$callbackDefinition = $this->container->findDefinition($callback);
582+
if ($this->isDequeverAwareInterface($callbackDefinition->getClass())) {
583+
$callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
584+
}
490585
}
491586

492587
private function injectConnection(Definition $definition, $connectionName)

0 commit comments

Comments
 (0)