@@ -2,21 +2,22 @@ package net.manub.embeddedkafka
22
33import java .net .InetSocketAddress
44import java .util .Properties
5- import java .util .concurrent .Executors
5+ import java .util .concurrent .{ TimeUnit , Executors }
66
77import kafka .consumer .{Consumer , ConsumerConfig , Whitelist }
88import kafka .serializer .StringDecoder
99import kafka .server .{KafkaConfig , KafkaServer }
10- import org .apache .kafka .clients .producer .{KafkaProducer , ProducerRecord }
10+ import org .apache .kafka .clients .producer .{KafkaProducer , ProducerConfig , ProducerRecord }
1111import org .apache .kafka .common .serialization .StringSerializer
1212import org .apache .zookeeper .server .{ServerCnxnFactory , ZooKeeperServer }
1313import org .scalatest .Suite
1414
1515import scala .collection .JavaConversions .mapAsJavaMap
1616import scala .concurrent .duration ._
17- import scala .concurrent .{ Await , ExecutionContext , Future }
17+ import scala .concurrent ._
1818import scala .language .postfixOps
1919import scala .reflect .io .Directory
20+ import scala .util .Try
2021
2122trait EmbeddedKafka {
2223
@@ -50,37 +51,49 @@ trait EmbeddedKafka {
5051 * @param topic the topic to which publish the message (it will be auto-created)
5152 * @param message the message to publish
5253 * @param config an implicit [[EmbeddedKafkaConfig ]]
54+ * @throws KafkaUnavailableException if unable to connect to Kafka
5355 */
56+ @ throws(classOf [KafkaUnavailableException ])
5457 def publishToKafka (topic : String , message : String )(implicit config : EmbeddedKafkaConfig ): Unit = {
5558
56- val producerProps = Map (
57- " bootstrap.servers" -> s " localhost: ${config.kafkaPort}" ,
58- " key.serializer" -> classOf [StringSerializer ].getName,
59- " value.serializer" -> classOf [StringSerializer ].getName
60- )
59+ val kafkaProducer = new KafkaProducer [String , String ](Map [String , String ](
60+ ProducerConfig .BOOTSTRAP_SERVERS_CONFIG -> s " localhost: ${config.kafkaPort}" ,
61+ ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG -> classOf [StringSerializer ].getName,
62+ ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG -> classOf [StringSerializer ].getName,
63+ ProducerConfig .METADATA_FETCH_TIMEOUT_CONFIG -> 3000 .toString,
64+ ProducerConfig .RETRY_BACKOFF_MS_CONFIG -> 1000 .toString
65+ ))
6166
62- val kafkaProducer = new KafkaProducer [String , String ](producerProps)
67+ val sendFuture = kafkaProducer.send(new ProducerRecord [String , String ](topic, message))
68+ val sendResult = Try { sendFuture.get(3 , SECONDS ) }
6369
64- kafkaProducer.send(new ProducerRecord [String , String ](topic, message))
6570 kafkaProducer.close()
71+
72+ if (sendResult.isFailure) throw new KafkaUnavailableException
6673 }
6774
6875
6976 /**
7077 * Consumes the first message available in a given topic, deserializing it as a String.
71- * Throws a [[java.util.concurrent.TimeoutException ]] if a message is not available in 3 seconds.
7278 *
7379 * @param topic the topic to consume a message from
7480 * @param config an implicit [[EmbeddedKafkaConfig ]]
7581 * @return the first message consumed from the given topic
82+ * @throws TimeoutException if unable to consume a message within 3 seconds
83+ * @throws KafkaUnavailableException if unable to connect to Kafka
7684 */
85+ @ throws(classOf [TimeoutException ])
86+ @ throws(classOf [KafkaUnavailableException ])
7787 def consumeFirstMessageFrom (topic : String )(implicit config : EmbeddedKafkaConfig ): String = {
7888 val props = new Properties ()
7989 props.put(" group.id" , " scalatest-embedded-kafka-spec" )
8090 props.put(" zookeeper.connect" , s " localhost: ${config.zooKeeperPort}" )
8191 props.put(" auto.offset.reset" , " smallest" )
92+ props.put(" zookeeper.connection.timeout.ms" , " 6000" )
8293
83- val consumer = Consumer .create(new ConsumerConfig (props))
94+ val consumer = Try {
95+ Consumer .create(new ConsumerConfig (props))
96+ }.getOrElse(throw new KafkaUnavailableException )
8497
8598 val filter = Whitelist (topic)
8699 val messageStreams =
0 commit comments