@@ -225,6 +225,104 @@ public function testShouldAllowSerializersToSerializeKeys()
225225 $ producer ->send (new RdKafkaTopic ('theQueueName ' ), $ message );
226226 }
227227
228+ public function testShouldGetPartitionFromMessage (): void
229+ {
230+ $ partition = 1 ;
231+
232+ $ kafkaTopic = $ this ->createKafkaTopicMock ();
233+ $ kafkaTopic
234+ ->expects ($ this ->once ())
235+ ->method ('producev ' )
236+ ->with (
237+ $ partition ,
238+ 0 ,
239+ 'theSerializedMessage ' ,
240+ 'theSerializedKey '
241+ )
242+ ;
243+
244+ $ kafkaProducer = $ this ->createKafkaProducerMock ();
245+ $ kafkaProducer
246+ ->expects ($ this ->once ())
247+ ->method ('newTopic ' )
248+ ->willReturn ($ kafkaTopic )
249+ ;
250+ $ kafkaProducer
251+ ->expects ($ this ->once ())
252+ ->method ('poll ' )
253+ ->with (0 )
254+ ;
255+ $ messageHeaders = ['bar ' => 'barVal ' ];
256+ $ message = new RdKafkaMessage ('theBody ' , ['foo ' => 'fooVal ' ], $ messageHeaders );
257+ $ message ->setKey ('key ' );
258+ $ message ->setPartition ($ partition );
259+
260+ $ serializer = $ this ->createSerializerMock ();
261+ $ serializer
262+ ->expects ($ this ->once ())
263+ ->method ('toString ' )
264+ ->willReturnCallback (function () use ($ message ) {
265+ $ message ->setKey ('theSerializedKey ' );
266+
267+ return 'theSerializedMessage ' ;
268+ })
269+ ;
270+
271+ $ destination = new RdKafkaTopic ('theQueueName ' );
272+
273+ $ producer = new RdKafkaProducer ($ kafkaProducer , $ serializer );
274+ $ producer ->send ($ destination , $ message );
275+ }
276+
277+ public function testShouldGetPartitionFromDestination (): void
278+ {
279+ $ partition = 2 ;
280+
281+ $ kafkaTopic = $ this ->createKafkaTopicMock ();
282+ $ kafkaTopic
283+ ->expects ($ this ->once ())
284+ ->method ('producev ' )
285+ ->with (
286+ $ partition ,
287+ 0 ,
288+ 'theSerializedMessage ' ,
289+ 'theSerializedKey '
290+ )
291+ ;
292+
293+ $ kafkaProducer = $ this ->createKafkaProducerMock ();
294+ $ kafkaProducer
295+ ->expects ($ this ->once ())
296+ ->method ('newTopic ' )
297+ ->willReturn ($ kafkaTopic )
298+ ;
299+ $ kafkaProducer
300+ ->expects ($ this ->once ())
301+ ->method ('poll ' )
302+ ->with (0 )
303+ ;
304+ $ messageHeaders = ['bar ' => 'barVal ' ];
305+ $ message = new RdKafkaMessage ('theBody ' , ['foo ' => 'fooVal ' ], $ messageHeaders );
306+ $ message ->setKey ('key ' );
307+
308+ $ serializer = $ this ->createSerializerMock ();
309+ $ serializer
310+ ->expects ($ this ->once ())
311+ ->method ('toString ' )
312+ ->willReturnCallback (function () use ($ message ) {
313+ $ message ->setKey ('theSerializedKey ' );
314+
315+ return 'theSerializedMessage ' ;
316+ })
317+ ;
318+
319+ $ destination = new RdKafkaTopic ('theQueueName ' );
320+ $ destination ->setPartition ($ partition );
321+
322+ $ producer = new RdKafkaProducer ($ kafkaProducer , $ serializer );
323+ $ producer ->send ($ destination , $ message );
324+ }
325+
228326 /**
229327 * @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic
230328 */
0 commit comments