Skip to content

Commit eb04187

Browse files
committed
4.10.0
1 parent 2f9209b commit eb04187

15 files changed

+152
-21
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# kafka-streams CHANGELOG
22

3+
## 2019-05-13, Version 4.10.0
4+
5+
* fixed typo that broke window functionality
6+
* correctly handling rejection of .to() promise call
7+
* correctly handling produce errors (make sure to sub `kafkaStreams.on("error", (error) => {...});`)
8+
* `KStream.branch([...])` now returns stream clones that consist of deeply cloned events
9+
* `KStream.clone(cloneObjects = false, cloneDeep = false)` now offers optional parameters to clone the stream events,
10+
otherwise mutating the origin stream will alter the cloned stream's objects
11+
* hardened the StreamDSL JSON convenience methods
12+
* other small refactorings
13+
314
## 2019-05-07, Version 4.9.0
415

516
* upgraded dependencies

examples/consumeFromTopic.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ const { nativeConfig: config } = require("./../test/test-config.js");
66
const kafkaStreams = new KafkaStreams(config);
77
const stream = kafkaStreams.getKStream("my-input-topic");
88

9+
kafkaStreams.on("error", (error) => {
10+
console.log("Error occured:", error.message);
11+
});
12+
913
//adding a side effect call to the stream via tap
1014
stream.forEach((message) => {
1115
console.log("key", message.key ? message.key.toString("utf8") : null);

examples/consumeOneProduceTwo.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ const { nativeConfig: config } = require("./../test/test-config.js");
66
const kafkaStreams = new KafkaStreams(config);
77
const stream = kafkaStreams.getKStream();
88

9+
kafkaStreams.on("error", (error) => {
10+
console.log("Error occured:", error.message);
11+
});
12+
913
stream
1014
.from("input_topic")
1115
.mapJSONConvenience()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"use strict";
2+
3+
const { KafkaStreams } = require("./../index.js");
4+
const { nativeConfig: config } = require("./../test/test-config.js");
5+
6+
const kafkaStreams = new KafkaStreams(config);
7+
const stream$ = kafkaStreams.getKStream("input-topic");
8+
9+
kafkaStreams.on("error", (error) => {
10+
console.log("Error occured:", error.message);
11+
});
12+
13+
const [one$, two$] = stream$
14+
.branch([() => true, () => true]);
15+
16+
const producerPromiseOne = one$
17+
.mapJSONConvenience()
18+
.mapWrapKafkaValue()
19+
.tap((msg) => console.log("one", msg))
20+
.wrapAsKafkaValue()
21+
.to("output-topic-1", 1, "buffer");
22+
23+
const producerPromiseTwo = two$
24+
.mapJSONConvenience()
25+
.mapWrapKafkaValue()
26+
.tap((msg) => console.log("two", msg))
27+
.wrapAsKafkaValue()
28+
.to("output-topic-2", 1, "buffer");
29+
30+
Promise.all([
31+
producerPromiseOne,
32+
producerPromiseTwo,
33+
stream$.start(),
34+
]).then(() => {
35+
console.log("Stream started, as kafka consumer and producers are ready.");
36+
}, (error) => {
37+
console.log("Streaming operation failed to start: ", error);
38+
});

examples/fieldSum.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ const { nativeConfig: config } = require("./../test/test-config.js");
66
const kafkaStreams = new KafkaStreams(config);
77
const stream = kafkaStreams.getKStream("my-input-topic");
88

9+
kafkaStreams.on("error", (error) => {
10+
console.log("Error occured:", error.message);
11+
});
12+
913
stream
1014
.mapStringToKV(" ", 0, 1) //string to key-value object; args: delimiter, key-index, value-index
1115
.sumByKey("key", "value", "sum")

examples/mergeTopics.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ const { nativeConfig: config } = require("./../test/test-config.js");
55

66
const kafkaStreams = new KafkaStreams(config);
77

8+
kafkaStreams.on("error", (error) => {
9+
console.log("Error occured:", error.message);
10+
});
11+
812
const stream1 = kafkaStreams.getKStream("my-input-topic-1");
913
const stream2 = kafkaStreams.getKStream("my-input-topic-2");
1014
const stream3 = kafkaStreams.getKStream("my-input-topic-3");

examples/produceToTopic.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ const { KafkaStreams } = require("./../index.js");
44
const { nativeConfig: config } = require("./../test/test-config.js");
55

66
const kafkaStreams = new KafkaStreams(config);
7+
8+
kafkaStreams.on("error", (error) => {
9+
console.log("Error occured:", error.message);
10+
});
11+
712
const stream = kafkaStreams.getKStream(null);
813
//creating a stream without topic is possible
914
//no consumer will be created during stream.start()

examples/topicAsTable.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ const { nativeConfig: config } = require("./../test/test-config.js");
2424

2525
const kafkaStreams = new KafkaStreams(config);
2626

27+
kafkaStreams.on("error", (error) => {
28+
console.log("Error occured:", error.message);
29+
});
30+
2731
//creating a ktable requires a function that can be
2832
//used to turn the kafka messages into key-value objects
2933
//as tables can only be built on key-value pairs

examples/window.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ const { KafkaStreams } = require("./../index.js");
44
const { nativeConfig: config } = require("./../test/test-config.js");
55

66
const kafkaStreams = new KafkaStreams(config);
7+
8+
kafkaStreams.on("error", (error) => {
9+
console.log("Error occured:", error.message);
10+
});
11+
712
const consumeStream = kafkaStreams.getKStream("my-input-topic");
813

914
const windowPeriod = 30 * 1000; // 30 seconds

examples/wordCount.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ const keyMapperEtl = (kafkaMessage) => {
3232
};
3333

3434
const kafkaStreams = new KafkaStreams(config);
35+
36+
kafkaStreams.on("error", (error) => {
37+
console.log("Error occured:", error.message);
38+
});
39+
3540
const stream = kafkaStreams.getKStream();
3641

3742
stream

0 commit comments

Comments
 (0)