File tree Expand file tree Collapse file tree 2 files changed +51
-1
lines changed Expand file tree Collapse file tree 2 files changed +51
-1
lines changed Original file line number Diff line number Diff line change 1616use Interop \Queue \Queue ;
1717use Interop \Queue \SubscriptionConsumer ;
1818use Interop \Queue \Topic ;
19+ use InvalidArgumentException ;
1920use RdKafka \Conf ;
2021use RdKafka \KafkaConsumer ;
2122use RdKafka \Producer as VendorProducer ;
@@ -54,8 +55,35 @@ public function __construct(array $config)
5455 $ this ->config = $ config ;
5556 $ this ->kafkaConsumers = [];
5657 $ this ->rdKafkaConsumers = [];
58+ $ this ->configureSerializer ($ config );
59+ }
60+
61+ /**
62+ * @param array $config
63+ * @return void
64+ */
65+ private function configureSerializer (array $ config ): void
66+ {
67+ if (!isset ($ config ['serializer ' ])) {
68+ $ this ->setSerializer (new JsonSerializer ());
69+ return ;
70+ }
5771
58- $ this ->setSerializer (new JsonSerializer ());
72+ if (is_string ($ config ['serializer ' ])) {
73+ $ this ->setSerializer (new $ config ['serializer ' ]());
74+ } elseif (is_array ($ config ['serializer ' ]) && isset ($ config ['serializer ' ]['class ' ])) {
75+ $ serializerClass = $ config ['serializer ' ]['class ' ];
76+ $ serializerOptions = $ config ['serializer ' ]['options ' ] ?? [];
77+ if (!empty ($ serializerOptions )) {
78+ $ this ->setSerializer (new $ serializerClass ($ serializerOptions ));
79+ } else {
80+ $ this ->setSerializer (new $ serializerClass ());
81+ }
82+ } elseif ($ config ['serializer ' ] instanceof Serializer) {
83+ $ this ->setSerializer ($ config ['serializer ' ]);
84+ } else {
85+ throw new InvalidArgumentException ('Invalid serializer configuration ' );
86+ }
5987 }
6088
6189 /**
Original file line number Diff line number Diff line change 88use Enqueue \RdKafka \Serializer ;
99use Interop \Queue \Exception \InvalidDestinationException ;
1010use Interop \Queue \Exception \TemporaryQueueNotSupportedException ;
11+ use InvalidArgumentException ;
1112use PHPUnit \Framework \TestCase ;
1213
1314class RdKafkaContextTest extends TestCase
@@ -36,6 +37,27 @@ public function testShouldSetJsonSerializerInConstructor()
3637 $ this ->assertInstanceOf (JsonSerializer::class, $ context ->getSerializer ());
3738 }
3839
40+ public function testShouldUseStringSerializerClassFromConfig ()
41+ {
42+ $ mockSerializerClass = get_class ($ this ->createMock (Serializer::class));
43+
44+ $ context = new RdKafkaContext ([
45+ 'serializer ' => $ mockSerializerClass
46+ ]);
47+
48+ $ this ->assertInstanceOf ($ mockSerializerClass , $ context ->getSerializer ());
49+ }
50+
51+ public function testShouldThrowExceptionOnInvalidSerializerConfig ()
52+ {
53+ $ this ->expectException (InvalidArgumentException::class);
54+ $ this ->expectExceptionMessage ('Invalid serializer configuration ' );
55+
56+ new RdKafkaContext ([
57+ 'serializer ' => 123
58+ ]);
59+ }
60+
3961 public function testShouldAllowGetPreviouslySetSerializer ()
4062 {
4163 $ context = new RdKafkaContext ([]);
You can’t perform that action at this time.
0 commit comments