Skip to content

Commit 4b06249

Browse files
committed
Added possibility to set serialize/unserialize function for rpc servers/clients
1 parent a15a4eb commit 4b06249

File tree

4 files changed

+25
-2
lines changed

4 files changed

+25
-2
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2015-02-07
2+
* Added possibility to set serialize/unserialize function for rpc servers/clients
3+
14
- 2014-11-27
25
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
36
* Added `queues_provider` configuration for multiple consumer

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ protected function loadRpcClients()
312312
if ($this->collectorEnabled) {
313313
$this->injectLoggedChannel($definition, $key, $client['connection']);
314314
}
315+
if (array_key_exists('unserializer', $client)) {
316+
$definition->addMethodCall('setUnserializer', array($client['unserializer']));
317+
}
315318

316319
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
317320
}
@@ -340,6 +343,9 @@ protected function loadRpcServers()
340343
if (array_key_exists('queue_options', $server)) {
341344
$definition->addMethodCall('setQueueOptions', array($server['queue_options']));
342345
}
346+
if (array_key_exists('serializer', $server)) {
347+
$definition->addMethodCall('setSerializer', array($server['serializer']));
348+
}
343349
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
344350
}
345351
}

RabbitMq/RpcClient.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class RpcClient extends BaseAmqp
1313
protected $timeout = 0;
1414

1515
private $queueName;
16+
private $unserializer = 'unserialize';
1617

1718
public function initClient($expectSerializedResponse = true)
1819
{
@@ -60,7 +61,7 @@ public function processMessage(AMQPMessage $msg)
6061
{
6162
$messageBody = $msg->body;
6263
if ($this->expectSerializedResponse) {
63-
$messageBody = unserialize($messageBody);
64+
$messageBody = call_user_func($this->unserializer, $messageBody);
6465
}
6566

6667
$this->replies[$msg->get('correlation_id')] = $messageBody;
@@ -74,4 +75,9 @@ protected function getQueueName()
7475

7576
return $this->queueName;
7677
}
78+
79+
public function setUnserializer($unserializer)
80+
{
81+
$this->unserializer = $unserializer;
82+
}
7783
}

RabbitMq/RpcServer.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
class RpcServer extends BaseConsumer
88
{
9+
private $serializer = 'serialize';
10+
911
public function initServer($name)
1012
{
1113
$this->setExchangeOptions(array('name' => $name, 'type' => 'direct'));
@@ -17,7 +19,8 @@ public function processMessage(AMQPMessage $msg)
1719
try {
1820
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
1921
$result = call_user_func($this->callback, $msg);
20-
$this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id'));
22+
$result = call_user_func($this->serializer, $result);
23+
$this->sendReply($result, $msg->get('reply_to'), $msg->get('correlation_id'));
2124
$this->consumed++;
2225
$this->maybeStopConsumer();
2326
} catch (\Exception $e) {
@@ -30,4 +33,9 @@ protected function sendReply($result, $client, $correlationId)
3033
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
3134
$this->getChannel()->basic_publish($reply, '', $client);
3235
}
36+
37+
public function setSerializer($serializer)
38+
{
39+
$this->serializer = $serializer;
40+
}
3341
}

0 commit comments

Comments
 (0)