Skip to content

Commit fc4874a

Browse files
author
Christian Perez-Llamas
committed
Added partitions and replicationFactor options to createCustomTopic
1 parent 8fd4506 commit fc4874a

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,13 @@ sealed trait EmbeddedKafkaSupport {
246246
* @param topicConfig per topic configuration [[Map]]
247247
* @param config an implicit [[EmbeddedKafkaConfig]]
248248
*/
249-
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty)(implicit config: EmbeddedKafkaConfig): Unit = {
249+
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty,
250+
partitions: Int = 1, replicationFactor: Int = 1)(implicit config: EmbeddedKafkaConfig): Unit = {
251+
250252
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
251253
val topicProperties = topicConfig.foldLeft(new Properties){case (props, (k,v)) => props.put(k,v); props}
252254

253-
try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicProperties) finally zkUtils.close()
255+
try AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties) finally zkUtils.close()
254256
}
255257

256258
}

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
121121

122122
}
123123
}
124+
125+
"create a topic with custom number of partitions" in {
126+
implicit val config = EmbeddedKafkaConfig()
127+
val topic = "test_custom_topic"
128+
129+
withRunningKafka {
130+
131+
createCustomTopic(topic, Map("cleanup.policy"->"compact"), partitions = 2)
132+
133+
val zkSessionTimeoutMs = 10000
134+
val zkConnectionTimeoutMs = 10000
135+
val zkSecurityEnabled = false
136+
137+
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
138+
try { AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata.size shouldBe 2 } finally zkUtils.close()
139+
140+
}
141+
}
124142
}
125143

126144
"the consumeFirstStringMessageFrom method" should {

0 commit comments

Comments
 (0)