diff --git a/.gitignore b/.gitignore
index 692e68f..b7e2b3a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,10 @@
target/
.idea/
*.iml
-.DS_STORE
\ No newline at end of file
+.DS_STORE
+/.classpath
+*/.classpath
+/.project
+*/.project
+*/.settings
+bin/
diff --git a/README.md b/README.md
index b520e72..ced3cc5 100644
--- a/README.md
+++ b/README.md
@@ -18,4 +18,4 @@ Happy learning!
- Word Count to learn the basic API
- Favourite Colour for a more advanced example (`Scala` version included)
- Bank Balance to demonstrate exactly once semantics
- - User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
\ No newline at end of file
+ - User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
diff --git a/bank-balance-exactly-once/pom.xml b/bank-balance-exactly-once/pom.xml
index 95e0266..19417ea 100644
--- a/bank-balance-exactly-once/pom.xml
+++ b/bank-balance-exactly-once/pom.xml
@@ -16,7 +16,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.1
+ 2.0.1
@@ -24,7 +24,7 @@
org.apache.kafka
kafka-clients
- 0.11.0.1
+ 2.0.1
diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
index a71f573..1aa791d 100644
--- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
+++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
@@ -8,13 +8,14 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Instant;
import java.util.Properties;
@@ -33,18 +34,17 @@ public static void main(String[] args) {
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-
+
// json Serde
final Serializer jsonSerializer = new JsonSerializer();
final Deserializer jsonDeserializer = new JsonDeserializer();
final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
+
+ StreamsBuilder builder = new StreamsBuilder();
-
- KStreamBuilder builder = new KStreamBuilder();
-
- KStream bankTransactions =
- builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
-
+ KStream bankTransactions = builder.stream("bank-transactions",
+ Consumed.with(Serdes.String(), jsonSerde));
+
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
@@ -53,22 +53,23 @@ public static void main(String[] args) {
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
KTable bankBalance = bankTransactions
- .groupByKey(Serdes.String(), jsonSerde)
+ .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
- jsonSerde,
- "bank-balance-agg"
+ Materialized.>as("bank-balance-agg")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(jsonSerde)
);
- bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once");
+ bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/favourite-colour-java/pom.xml b/favourite-colour-java/pom.xml
index a973825..cf20955 100644
--- a/favourite-colour-java/pom.xml
+++ b/favourite-colour-java/pom.xml
@@ -15,7 +15,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 2.0.1
diff --git a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
index 4828745..c5d115f 100644
--- a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
+++ b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
@@ -2,16 +2,20 @@
import java.util.Properties;
import java.util.Arrays;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
public class FavouriteColourApp {
@@ -26,8 +30,7 @@ public static void main(String[] args) {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
- KStreamBuilder builder = new KStreamBuilder();
-
+ StreamsBuilder builder = new StreamsBuilder();
// Step 1: We create the topic of users keys to colours
KStream textLines = builder.stream("favourite-colour-input");
@@ -43,6 +46,9 @@ public static void main(String[] args) {
usersAndColours.to("user-keys-and-colours");
+ Serde stringSerde = Serdes.String();
+ Serde longSerde = Serdes.Long();
+
// step 2 - we read that topic as a KTable so that updates are read correctly
KTable usersAndColoursTable = builder.table("user-keys-and-colours");
@@ -50,18 +56,20 @@ public static void main(String[] args) {
KTable favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
- .count("CountsByColours");
+ .count(Materialized.>as("CountsByColours")
+ .withKeySerde(stringSerde)
+ .withValueSerde(longSerde));
// 6 - we output the results to a Kafka Topic - don't forget the serializers
- favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output");
+ favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long()));
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
// only do this in dev - not in prod
streams.cleanUp();
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/favourite-colour-scala/build.sbt b/favourite-colour-scala/build.sbt
index ba01c4a..52f7701 100644
--- a/favourite-colour-scala/build.sbt
+++ b/favourite-colour-scala/build.sbt
@@ -1,16 +1,19 @@
name := "favourite-colour-scala"
organization := "com.github.simplesteph.udemy.kafka.streams"
-version := "1.0-SNAPSHOT"
+version := "2.0.1-SNAPSHOT"
scalaVersion := "2.12.3"
+// needed to resolve weird dependency
+libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts(
+ Artifact("javax.ws.rs-api", "jar", "jar"))
+
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies ++= Seq(
- "org.apache.kafka" % "kafka-streams" % "0.11.0.0",
+ "org.apache.kafka" % "kafka-streams" % "2.0.1",
"org.slf4j" % "slf4j-api" % "1.7.25",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
)
-
// leverage java 8
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")
scalacOptions := Seq("-target:jvm-1.8")
diff --git a/favourite-colour-scala/project/PackagingTypePlugin.scala b/favourite-colour-scala/project/PackagingTypePlugin.scala
new file mode 100644
index 0000000..16cc60d
--- /dev/null
+++ b/favourite-colour-scala/project/PackagingTypePlugin.scala
@@ -0,0 +1,8 @@
+import sbt._
+
+object PackagingTypePlugin extends AutoPlugin {
+ override val buildSettings = {
+ sys.props += "packaging.type" -> "jar"
+ Nil
+ }
+}
diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
index d3f1073..9428b81 100644
--- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
+++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
@@ -4,9 +4,11 @@ import java.lang
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.Serdes
-import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
-import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
+import org.apache.kafka.common.serialization.{Serde, Serdes}
+import org.apache.kafka.common.utils.Bytes
+import org.apache.kafka.streams.kstream._
+import org.apache.kafka.streams.state.KeyValueStore
+import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}
object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
@@ -21,7 +23,7 @@ object FavouriteColourAppScala {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
- val builder: KStreamBuilder = new KStreamBuilder
+ val builder: StreamsBuilder = new StreamsBuilder
// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
@@ -32,7 +34,9 @@ object FavouriteColourAppScala {
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
// 3 - we get the colour from the value (lowercase for safety)
- .mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
+ .mapValues[String](new ValueMapper[String, String] {
+ override def apply(value: String): String = { value.split(",")(1).toLowerCase }
+ })
// 4 - we filter undesired colours (could be a data sanitization step)
.filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))
@@ -42,21 +46,29 @@ object FavouriteColourAppScala {
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)
+ val stringSerde: Serde[String] = Serdes.String
+ val longSerde: Serde[lang.Long] = Serdes.Long
+
// step 3 - we count the occurences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
- .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
- .count("CountsByColours")
+ .groupBy(
+ (user: String, colour: String) => new KeyValue[String, String](colour, colour),
+ Serialized.`with`(stringSerde, stringSerde)
+ )
+ .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")
+ .withKeySerde(stringSerde)
+ .withValueSerde(longSerde))
// 6 - we output the results to a Kafka Topic - don't forget the serializers
- favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
+ favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde))
- val streams: KafkaStreams = new KafkaStreams(builder, config)
+ val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.cleanUp()
streams.start()
// print the topology
- System.out.println(streams.toString)
+ streams.localThreadsMetadata().forEach(t => System.out.print(t.toString))
// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
diff --git a/streams-starter-project/pom.xml b/streams-starter-project/pom.xml
index 84f1104..436bc4e 100644
--- a/streams-starter-project/pom.xml
+++ b/streams-starter-project/pom.xml
@@ -13,7 +13,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 2.0.1
diff --git a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
index 5a88d5f..3f4f842 100644
--- a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
+++ b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
@@ -3,9 +3,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
@@ -20,18 +20,18 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream kStream = builder.stream("input-topic-name");
// do stuff
kStream.to("word-count-output");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/user-event-enricher/pom.xml b/user-event-enricher/pom.xml
index 0be5452..5f16db6 100644
--- a/user-event-enricher/pom.xml
+++ b/user-event-enricher/pom.xml
@@ -14,7 +14,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 2.0.1
diff --git a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
index 180aae6..314e1fd 100644
--- a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
+++ b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
@@ -3,10 +3,10 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
@@ -20,7 +20,7 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
// we get a global table out of Kafka. This table will be replicated on each Kafka Streams application
// the key of our globalKTable is the user ID
@@ -55,12 +55,12 @@ public static void main(String[] args) {
userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/word-count/.gitignore b/word-count/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/word-count/.gitignore
@@ -0,0 +1 @@
+/bin/
diff --git a/word-count/pom.xml b/word-count/pom.xml
index 8e0f93f..953dc05 100644
--- a/word-count/pom.xml
+++ b/word-count/pom.xml
@@ -15,7 +15,22 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 2.0.1
+
+
+
+ org.apache.kafka
+ kafka-streams-test-utils
+ 2.0.1
+ test
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
index 4b8c43a..4e8a118 100644
--- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
+++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
@@ -6,21 +6,18 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
public class WordCountApp {
- public static void main(String[] args) {
- Properties config = new Properties();
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
- config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ public Topology createTopology(){
+ StreamsBuilder builder = new StreamsBuilder();
// 1 - stream from Kafka
KStream textLines = builder.stream("word-count-input");
@@ -36,15 +33,26 @@ public static void main(String[] args) {
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurences
- .count("Counts");
+ .count(Materialized.as("Counts"));
// 7 - to in order to write the results back to kafka
- wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
+ wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
- KafkaStreams streams = new KafkaStreams(builder, config);
- streams.start();
+ return builder.build();
+ }
+ public static void main(String[] args) {
+ Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+ config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ WordCountApp wordCountApp = new WordCountApp();
+
+ KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
+ streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
@@ -52,7 +60,7 @@ public static void main(String[] args) {
// Update:
// print the topology every 10 seconds for learning purposes
while(true){
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
diff --git a/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java b/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java
new file mode 100644
index 0000000..f88d610
--- /dev/null
+++ b/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java
@@ -0,0 +1,89 @@
+package com.github.simplesteph.udemy.kafka.streams;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class WordCountAppTest {
+
+ TopologyTestDriver testDriver;
+
+ StringSerializer stringSerializer = new StringSerializer();
+
+ ConsumerRecordFactory recordFactory =
+ new ConsumerRecordFactory<>(stringSerializer, stringSerializer);
+
+
+ @Before
+ public void setUpTopologyTestDriver(){
+ Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+ WordCountApp wordCountApp = new WordCountApp();
+ Topology topology = wordCountApp.createTopology();
+ testDriver = new TopologyTestDriver(topology, config);
+ }
+
+ @After
+ public void closeTestDriver(){
+ testDriver.close();
+ }
+
+ public void pushNewInputRecord(String value){
+ testDriver.pipeInput(recordFactory.create("word-count-input", null, value));
+ }
+
+ @Test
+ public void dummyTest(){
+ String dummy = "Du" + "mmy";
+ assertEquals(dummy, "Dummy");
+ }
+
+ public ProducerRecord readOutput(){
+ return testDriver.readOutput("word-count-output", new StringDeserializer(), new LongDeserializer());
+ }
+
+ @Test
+ public void makeSureCountsAreCorrect(){
+ String firstExample = "testing Kafka Streams";
+ pushNewInputRecord(firstExample);
+ OutputVerifier.compareKeyValue(readOutput(), "testing", 1L);
+ OutputVerifier.compareKeyValue(readOutput(), "kafka", 1L);
+ OutputVerifier.compareKeyValue(readOutput(), "streams", 1L);
+ assertEquals(readOutput(), null);
+
+ String secondExample = "testing Kafka again";
+ pushNewInputRecord(secondExample);
+ OutputVerifier.compareKeyValue(readOutput(), "testing", 2L);
+ OutputVerifier.compareKeyValue(readOutput(), "kafka", 2L);
+ OutputVerifier.compareKeyValue(readOutput(), "again", 1L);
+
+ }
+
+ @Test
+ public void makeSureWordsBecomeLowercase(){
+ String upperCaseString = "KAFKA kafka Kafka";
+ pushNewInputRecord(upperCaseString);
+ OutputVerifier.compareKeyValue(readOutput(), "kafka", 1L);
+ OutputVerifier.compareKeyValue(readOutput(), "kafka", 2L);
+ OutputVerifier.compareKeyValue(readOutput(), "kafka", 3L);
+
+ }
+}