Skip to content

Commit 7c91709

Browse files
kalondarmanub
authored andcommitted
Custom configuration for consumer and producer (#70)
* Unit test for publishing message bigger than default 1MB * Add custom producer and consumer properties * Update readme file with information about custom config properties * Clean code fixes
1 parent f769a0f commit 7c91709

File tree

4 files changed

+63
-3
lines changed

4 files changed

+63
-3
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,30 @@ class MySpec extends WordSpec with EmbeddedKafka {
7777

7878
}
7979
```
80+
81+
The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties
82+
83+
```scala
84+
class MySpec extends WordSpec with EmbeddedKafka {
85+
86+
"runs with custom producer and consumer properties" should {
87+
val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000",
88+
"message.max.bytes" -> "2000000")
89+
90+
val customProducerConfig = Map("max.request.size" -> "2000000")
91+
val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000")
92+
93+
implicit val customKafkaConfig = EmbeddedKafkaConfig(
94+
customBrokerProperties = customBrokerConfig,
95+
customProducerProperties = customProducerConfig,
96+
customConsumerProperties = customConsumerConfig)
97+
98+
withRunningKafka {
99+
// now a kafka broker is listening on port 12345
100+
}
101+
102+
}
103+
```
80104

81105
This works for both `withRunningKafka` and `EmbeddedKafka.start()`
82106

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ sealed trait EmbeddedKafkaSupport {
219219
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
220220
ProducerConfig.MAX_BLOCK_MS_CONFIG -> 10000.toString,
221221
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
222-
)
222+
) ++ config.customProducerProperties
223223

224224
private def baseConsumerConfig(
225225
implicit config: EmbeddedKafkaConfig): Properties = {
@@ -228,6 +228,7 @@ sealed trait EmbeddedKafkaSupport {
228228
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
229229
props.put("auto.offset.reset", "earliest")
230230
props.put("enable.auto.commit", "false")
231+
props.putAll(config.customConsumerProperties)
231232
props
232233
}
233234

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ package net.manub.embeddedkafka
22

33
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
44
zooKeeperPort: Int = 6000,
5-
customBrokerProperties: Map[String, String] =
6-
Map.empty)
5+
customBrokerProperties: Map[String, String] = Map.empty,
6+
customProducerProperties: Map[String, String] = Map.empty,
7+
customConsumerProperties: Map[String, String] = Map.empty)
78

89
object EmbeddedKafkaConfig {
910
implicit val defaultConfig = EmbeddedKafkaConfig()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package net.manub.embeddedkafka
2+
3+
import scala.language.postfixOps
4+
5+
class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
6+
val TwoMegabytes = 2097152
7+
val ThreeMegabytes = 3145728
8+
9+
"the custom config" should {
10+
"allow pass additional producer parameters" in {
11+
val customBrokerConfig = Map(
12+
"replica.fetch.max.bytes" -> s"$ThreeMegabytes",
13+
"message.max.bytes" -> s"$ThreeMegabytes")
14+
15+
val customProducerConfig = Map("max.request.size" -> s"$ThreeMegabytes")
16+
val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$ThreeMegabytes")
17+
18+
implicit val customKafkaConfig = EmbeddedKafkaConfig(
19+
customBrokerProperties = customBrokerConfig,
20+
customProducerProperties = customProducerConfig,
21+
customConsumerProperties = customConsumerConfig)
22+
23+
val bigMessage = generateMessageOfLength(TwoMegabytes)
24+
val topic = "big-message-topic"
25+
26+
withRunningKafka {
27+
publishStringMessageToKafka(topic, bigMessage)
28+
consumeFirstStringMessageFrom(topic) shouldBe bigMessage
29+
}
30+
}
31+
}
32+
33+
def generateMessageOfLength(length: Int): String = Stream.continually(util.Random.nextPrintableChar) take length mkString
34+
}

0 commit comments

Comments
 (0)