Skip to content

Commit 2f0409a

Browse files
committed
upgrade
1 parent 8cc5b77 commit 2f0409a

File tree

9 files changed

+293
-259
lines changed

9 files changed

+293
-259
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# kafka-streams CHANGELOG
22

3+
## 2019-07-04, Version 4.12.0
4+
5+
* upgraded dependencies
6+
* fixed issue #91
7+
* added topic extractor example
8+
39
## 2019-05-16, Version 4.11.0
410

511
* added continueWith dsl operation

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
[![npm version](https://badge.fury.io/js/kafka-streams.svg)](https://badge.fury.io/js/kafka-streams)
66

77
```
8+
// suggested Node.js version: v11.15.0
89
npm install --save kafka-streams
910
```
1011

examples/consumeFromTopic.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ stream.forEach((message) => {
2525
//(wait for the kafka consumer to be ready)
2626
stream.start().then(_ => {
2727
//wait a few ms and close all connections
28-
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 1000);
28+
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);
2929
});

examples/produceToTopic.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ stream.start().then(_ => {
2828
]);
2929

3030
//wait a few ms and close all connections
31-
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 1000);
31+
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);
3232
});

examples/topicExtractor.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"use strict";
2+
3+
const { KafkaStreams } = require("./../index.js");
4+
const { nativeConfig: config } = require("./../test/test-config.js");
5+
6+
const kafkaStreams = new KafkaStreams(config);
7+
8+
kafkaStreams.on("error", (error) => {
9+
console.log("Error occured:", error.message);
10+
});
11+
12+
const stream = kafkaStreams.getKStream(null);
13+
//creating a stream without topic is possible
14+
//no consumer will be created during stream.start()
15+
16+
// 2nd arg is the output partition count, if you do not provide a partition the messages are automatically split across based on key or randomly
17+
// 3nd arg is the produce type of the sinek library, suggestion: stick to "send" this leaves all design options for you
18+
stream
19+
.tap(console.log) // lets log some stream events
20+
.to("default-target-topic", 1, "send");
21+
//define a topic to stream messages to, if nothing is actually defined in the kv structure
22+
23+
//start the stream
24+
//(wait for the kafka producer to be ready, same as await .to())
25+
//and write a few messages to the topic stream.start()
26+
stream.start().then((_) => {
27+
28+
stream.writeToStream(getKafkaStyledMessage({ ping: "pong" })); // will be produced to default-target-topic
29+
stream.writeToStream(getKafkaStyledMessage({ ping: "pong" }, "other-topic")); // will be produced to other-topic
30+
stream.writeToStream([
31+
getKafkaStyledMessage({ ping: "pong" }, "other-topic-2"),
32+
getKafkaStyledMessage({ ping: "pong" }, "other-topic-3")
33+
]);
34+
35+
//wait a few ms and close all connections
36+
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);
37+
});
38+
39+
// its very important the events on your stream are shipped in a certain format
40+
// if they are not in the required KV format they will be treated as message values and not as full kafka messages
41+
function getKafkaStyledMessage(payload, topic = undefined, partition = undefined) {
42+
return {
43+
key: null, // (required) just to not be undefined, keys will otherwise receive random uuids
44+
value: JSON.stringify(payload), // required, ensure this is a string or a buffer!
45+
// optional:
46+
topic,
47+
partition,
48+
};
49+
}

examples/window.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ stream
3232
//start the stream
3333
consumeStream.start();
3434

35-
//setTimeout(abort, 5000); // -> abort the window collection after 5 seconds
35+
//setTimeout(() => { abort(); }, 5000); // -> abort the window collection after 5 seconds

lib/messageProduceHandle.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,14 @@ const messageProduceHandle = (kafka, message, outputTopicName, produceType, comp
142142
_version
143143

144144
).then((produceMessageValue) => {
145+
146+
debug("Produce successfull", kafkaMessage.topic, produceMessageValue);
145147
if (produceHandler) {
146148
produceHandler.emit("delivered", kafkaMessage, produceMessageValue);
147149
}
148150
}).catch((error) => {
151+
152+
debug("Produce failed", kafkaMessage, error.message);
149153
if (producerErrorCallback) {
150154
error.message = "During message produce: " + error.message;
151155
producerErrorCallback(error);

package.json

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "kafka-streams",
3-
"version": "4.11.0",
3+
"version": "4.12.0",
44
"description": "kafka-streams for Node.js",
55
"typings": "index.d.ts",
66
"main": "index.js",
@@ -15,6 +15,7 @@
1515
"window": "node ./examples/window.js",
1616
"wordCount": "node ./examples/wordCount.js",
1717
"word-count": "yarn wordCount",
18+
"topicExtractor": "DEBUG=kafka-streams:mph node ./examples/topicExtractor.js",
1819
"obs-test": "_mocha test/unit/Observable.test.js",
1920
"test-unit": "_mocha --recursive --exit -R spec test/unit",
2021
"test-int": "_mocha --recursive --timeout 22500 --exit -R spec test/int",
@@ -59,20 +60,20 @@
5960
},
6061
"homepage": "https://github.com/nodefluent/kafka-streams#readme",
6162
"dependencies": {
62-
"bluebird": "~3.5.4",
63+
"bluebird": "~3.5.5",
6364
"debug": "~4.1.1",
64-
"global": "^4.3.2",
65-
"lodash.clone": "^4.5.0",
66-
"lodash.clonedeep": "^4.5.0",
67-
"most": "~1.7.3",
68-
"most-subject": "~5.3.0",
69-
"sinek": "~7.29.3"
65+
"global": "~4.4.0",
66+
"lodash.clone": "~4.5.0",
67+
"lodash.clonedeep": "~4.5.0",
68+
"most": "1.7.3",
69+
"most-subject": "5.3.0",
70+
"sinek": "~7.30.0"
7071
},
7172
"devDependencies": {
72-
"async": "~2.6.2",
73-
"jsdoc": "~3.6.1",
74-
"jsdoc-to-markdown": "~4.0.1",
75-
"log4bro": "~3.9.0",
73+
"async": "~3.1.0",
74+
"jsdoc": "~3.6.2",
75+
"jsdoc-to-markdown": "~5.0.0",
76+
"log4bro": "~3.10.0",
7677
"mocha": "~6.1.4",
7778
"proxyquire": "~2.1.0",
7879
"uuid": "~3.3.2"

0 commit comments

Comments
 (0)