Skip to content

Commit b02e491

Browse files
committed
fixes
1 parent e20d71a commit b02e491

File tree

1 file changed

+47
-60
lines changed

1 file changed

+47
-60
lines changed

lib/Integration/DI/Services.php

Lines changed: 47 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -336,77 +336,64 @@ private function loadProducers() : void
336336
private function loadConsumers() : void
337337
{
338338
foreach ($this->config['consumers'] as $key => $consumer) {
339-
// this consumer doesn't define an exchange -> using AMQP Default
339+
$this->registerCallbackAsService($consumer['callback']);
340+
341+
$definition = new Definition('%rabbitmq.consumer.class%');
342+
$definition->setPublic(true);
343+
$definition->addTag('rabbitmq.base_amqp');
344+
$definition->addTag('rabbitmq.consumer');
345+
//this consumer doesn't define an exchange -> using AMQP Default
340346
if (!isset($consumer['exchange_options'])) {
341347
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
342348
}
343-
344-
// this consumer doesn't define a queue -> using AMQP Default
349+
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
350+
//this consumer doesn't define a queue -> using AMQP Default
345351
if (!isset($consumer['queue_options'])) {
346352
$consumer['queue_options'] = $this->getDefaultQueueOptions();
347353
}
354+
$definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
355+
$definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
348356

349-
$consumers = function () use ($consumer) {
350-
$className = $this->parameters['rabbitmq.consumer.class'];
351-
$connectionName = "rabbitmq.connection.{$consumer['connection']}";
352-
353-
/** @var Consumer $instance */
354-
$instance = new $className($this->container->get($connectionName));
355-
356-
$instance->setExchangeOptions($consumer['exchange_options']);
357-
$instance->setQueueOptions($consumer['queue_options']);
358-
359-
/** @var object $callback */
360-
// $callback = $this->container->get($consumer['callback']);
361-
$callback = new $consumer['callback'];
362-
363-
$instance->setCallback([$callback, 'execute']);
364-
365-
if (array_key_exists('qos_options', $consumer)) {
366-
$instance->setQosOptions(
367-
$consumer['qos_options']['prefetch_size'],
368-
$consumer['qos_options']['prefetch_count'],
369-
$consumer['qos_options']['global']
370-
);
371-
}
372-
373-
if (isset($consumer['idle_timeout'])) {
374-
$instance->setIdleTimeout($consumer['idle_timeout']);
375-
}
376-
377-
if (isset($consumer['idle_timeout_exit_code'])) {
378-
$instance->setIdleTimeoutExitCode($consumer['idle_timeout_exit_code']);
379-
}
380-
381-
if (isset($consumer['graceful_max_execution'])) {
382-
$instance->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(
383-
$consumer['graceful_max_execution']['timeout']
384-
);
385-
$instance->setGracefulMaxExecutionTimeoutExitCode(
386-
$consumer['graceful_max_execution']['exit_code']
387-
);
388-
}
389-
390-
if (isset($consumer['auto_setup_fabric']) && !$consumer['auto_setup_fabric']) {
391-
$instance->disableAutoSetupFabric();
392-
}
357+
if (array_key_exists('qos_options', $consumer)) {
358+
$definition->addMethodCall('setQosOptions', array(
359+
$consumer['qos_options']['prefetch_size'],
360+
$consumer['qos_options']['prefetch_count'],
361+
$consumer['qos_options']['global']
362+
));
363+
}
393364

394-
if (isset($consumer['enable_logger']) && $consumer['enable_logger']) {
395-
$instance->setLogger($this->container->get($consumer['logger']));
396-
}
365+
if (isset($consumer['idle_timeout'])) {
366+
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
367+
}
368+
if (isset($consumer['idle_timeout_exit_code'])) {
369+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
370+
}
371+
if (isset($consumer['timeout_wait'])) {
372+
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
373+
}
374+
if (isset($consumer['graceful_max_execution'])) {
375+
$definition->addMethodCall(
376+
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
377+
array($consumer['graceful_max_execution']['timeout'])
378+
);
379+
$definition->addMethodCall(
380+
'setGracefulMaxExecutionTimeoutExitCode',
381+
array($consumer['graceful_max_execution']['exit_code'])
382+
);
383+
}
384+
if (!$consumer['auto_setup_fabric']) {
385+
$definition->addMethodCall('disableAutoSetupFabric');
386+
}
397387

398-
if ($this->isDequeverAwareInterface(get_class($callback))) {
399-
/** @var DequeuerAwareInterface $callback */
400-
$callback->setDequeuer($instance);
401-
}
388+
$this->injectConnection($definition, $consumer['connection']);
402389

403-
return $instance;
404-
};
390+
if ($consumer['enable_logger']) {
391+
$this->injectLogger($definition);
392+
}
405393

406-
$this->container->set(
407-
"rabbitmq.{$key}_consumer",
408-
$consumers()
409-
);
394+
$name = sprintf('rabbitmq.%s_consumer', $key);
395+
$this->container->setDefinition($name, $definition);
396+
$this->addDequeuerAwareCall($consumer['callback'], $name);
410397
}
411398
}
412399

0 commit comments

Comments
 (0)