Skip to content

Commit 8a5b928

Browse files
gigaSproulemanub
authored andcommitted
Updated to allow passing in maximum number of attempts and poll timeout (#72)
* Updated to allow passing in maximum number of attempts and poll timeout * Updated after code review comments (upgraded mockito and minor refactoring) * Changed mapAsJavaMap to asJava, so it can compile in 2.11 without using a deprecated object
1 parent 7c91709 commit 8a5b928

File tree

3 files changed

+83
-19
lines changed

3 files changed

+83
-19
lines changed

build.sbt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ lazy val commonSettings = Seq(
2020
)
2121

2222
lazy val commonLibrarySettings = libraryDependencies ++= Seq(
23-
"org.scalatest" %% "scalatest" % "3.0.1",
24-
"org.apache.kafka" %% "kafka" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
25-
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
26-
"org.apache.avro" % "avro" % "1.8.1" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
27-
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
28-
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
29-
)
23+
"org.scalatest" %% "scalatest" % "3.0.1",
24+
"org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
25+
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
26+
"org.apache.avro" % "avro" % "1.8.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
27+
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
28+
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
29+
)
3030

3131
lazy val publishSettings = Seq(
3232
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
@@ -69,6 +69,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka"))
6969
.settings(publishSettings: _*)
7070
.settings(commonSettings: _*)
7171
.settings(commonLibrarySettings)
72+
.settings(libraryDependencies += "org.mockito" % "mockito-core" % "2.7.14" % Test)
7273
.settings(releaseSettings: _*)
7374

7475
lazy val kafkaStreams = (project in file("kafka-streams"))
@@ -78,6 +79,6 @@ lazy val kafkaStreams = (project in file("kafka-streams"))
7879
.settings(commonLibrarySettings)
7980
.settings(releaseSettings: _*)
8081
.settings(libraryDependencies ++= Seq(
81-
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact)
82+
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact)
8283
))
8384
.dependsOn(embeddedKafka)

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import org.apache.log4j.Logger
66

77
import scala.util.Try
88

9-
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing.*/
9+
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing. */
1010
object ConsumerExtensions {
11-
val MaximumAttempts = 3
11+
1212
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
1313

1414
private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])
@@ -18,14 +18,16 @@ object ConsumerExtensions {
1818
* to consume batches from the given topic, until it reaches the number of desired messages or
1919
* return otherwise.
2020
*
21-
* @param topic the topic from which to consume messages
21+
* @param topic the topic from which to consume messages
22+
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
23+
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
2224
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
2325
* to evaluate the requested number of messages.
2426
*/
25-
def consumeLazily(topic: String): Stream[(K, V)] = {
26-
val attempts = 1 to MaximumAttempts
27+
def consumeLazily(topic: String, maximumAttempts: Int = 3, poll: Long = 2000): Stream[(K, V)] = {
28+
val attempts = 1 to maximumAttempts
2729
attempts.toStream.flatMap { attempt =>
28-
val batch: Seq[(K, V)] = getNextBatch(topic)
30+
val batch: Seq[(K, V)] = getNextBatch(topic, poll)
2931
logger.debug(s"----> Batch $attempt ($topic) | ${batch.mkString("|")}")
3032
batch
3133
}
@@ -34,18 +36,20 @@ object ConsumerExtensions {
3436
/** Get the next batch of messages from Kafka.
3537
*
3638
* @param topic the topic to consume
39+
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
3740
* @return the next batch of messages
3841
*/
39-
def getNextBatch(topic: String): Seq[(K, V)] =
42+
private def getNextBatch(topic: String, poll: Long): Seq[(K, V)] =
4043
Try {
41-
import scala.collection.JavaConversions._
42-
consumer.subscribe(List(topic))
44+
import scala.collection.JavaConverters._
45+
consumer.subscribe(List(topic).asJava)
4346
consumer.partitionsFor(topic)
44-
val records = consumer.poll(2000)
47+
val records = consumer.poll(poll)
4548
// use toList to force eager evaluation. toSeq is lazy
46-
records.iterator().toList.map(r => r.key -> r.value)
49+
records.iterator().asScala.toList.map(r => r.key -> r.value)
4750
}.recover {
4851
case ex: KafkaException => throw new KafkaUnavailableException(ex)
4952
}.get
5053
}
54+
5155
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package net.manub.embeddedkafka
2+
3+
import net.manub.embeddedkafka.ConsumerExtensions._
4+
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
5+
import org.apache.kafka.common.TopicPartition
6+
import org.mockito.Mockito.{times, verify, when}
7+
import org.scalatest.mockito.MockitoSugar
8+
9+
import scala.collection.JavaConverters._
10+
11+
class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
12+
13+
"ConsumeLazily " should {
14+
"retry to get messages with the configured maximum number of attempts when poll fails" in {
15+
val consumer = mock[KafkaConsumer[String, String]]
16+
val consumerRecords =
17+
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
18+
19+
val pollTimeout = 1
20+
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
21+
22+
val maximumAttempts = 2
23+
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
24+
25+
verify(consumer, times(maximumAttempts)).poll(pollTimeout)
26+
}
27+
28+
"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {
29+
val consumer = mock[KafkaConsumer[String, String]]
30+
val consumerRecord = mock[ConsumerRecord[String, String]]
31+
val consumerRecords = new ConsumerRecords[String, String](
32+
Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
33+
)
34+
35+
val pollTimeout = 1
36+
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
37+
38+
val maximumAttempts = 2
39+
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
40+
41+
verify(consumer).poll(pollTimeout)
42+
}
43+
44+
"poll to get messages with the configured poll timeout" in {
45+
val consumer = mock[KafkaConsumer[String, String]]
46+
val consumerRecords =
47+
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
48+
49+
val pollTimeout = 10
50+
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
51+
52+
val maximumAttempts = 1
53+
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
54+
55+
verify(consumer).poll(pollTimeout)
56+
}
57+
}
58+
59+
}

0 commit comments

Comments
 (0)