Skip to content

Commit 9cddfdb

Browse files
committed
Merge pull request #23 from lucapertile/refactor-create-custom-topic
refactored createCustomTopic signature to use a scala Map
2 parents f4aa0fc + 8667734 commit 9cddfdb

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ The `EmbeddedKafka` trait provides also some utility methods to interact with th
8181
8282
def consumeFirstMessageFrom(topic: String): String
8383

84-
def createCustomTopic(topic: String, topicConfig: Properties): Unit
84+
def createCustomTopic(topic: String, topicConfig: Map[String,String]): Unit
8585
8686
## Custom producers
8787

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,14 @@ sealed trait EmbeddedKafkaSupport {
243243
* Creates a topic with a custom configuration
244244
*
245245
* @param topic the topic name
246-
* @param topicConfig per topic configuration [[java.util.Properties]]
246+
* @param topicConfig per topic configuration [[Map]]
247247
* @param config an implicit [[EmbeddedKafkaConfig]]
248248
*/
249-
def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
249+
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty)(implicit config: EmbeddedKafkaConfig): Unit = {
250250
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
251-
try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfig) finally zkUtils.close()
251+
val topicProperties = topicConfig.foldLeft(new Properties){case (props, (k,v)) => props.put(k,v); props}
252+
253+
try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicProperties) finally zkUtils.close()
252254
}
255+
253256
}

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,8 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
109109
val topic = "test_custom_topic"
110110

111111
withRunningKafka {
112-
val properties: Properties = new Properties
113-
properties.put("cleanup.policy", "compact")
114112

115-
createCustomTopic(topic, properties)
113+
createCustomTopic(topic, Map("cleanup.policy"->"compact"))
116114

117115
val zkSessionTimeoutMs = 10000
118116
val zkConnectionTimeoutMs = 10000

0 commit comments

Comments
 (0)