@@ -89,7 +89,7 @@ in a stream
8989 * [ .getProduceHandler()] ( #JSKafkaClient+getProduceHandler ) ⇒ <code >null</code > \| <code >EventEmitter</code >
9090 * [ .overwriteTopics(topics)] ( #JSKafkaClient+overwriteTopics )
9191 * [ .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)] ( #JSKafkaClient+start )
92- * [ .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback)] ( #JSKafkaClient+setupProducer )
92+ * [ .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig )] ( #JSKafkaClient+setupProducer )
9393 * [ .send(topic, message)] ( #JSKafkaClient+send ) ⇒ <code >\* </code >
9494 * [ .buffer(topic, identifier, payload, compressionType)] ( #JSKafkaClient+buffer ) ⇒ <code >\* </code >
9595 * [ .bufferFormat(topic, identifier, payload, version, compressionType)] ( #JSKafkaClient+bufferFormat ) ⇒ <code >\* </code >
@@ -153,7 +153,7 @@ will await a kafka-producer-ready-event if started withProducer=true
153153
154154<a name =" JSKafkaClient+setupProducer " ></a >
155155
156- # jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback)
156+ # jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig )
157157starts a new kafka-producer using sinek's publisher
158158will fire kafka-producer-ready-event
159159requires a topic's partition count during initialisation
@@ -166,6 +166,7 @@ requires a topic's partition count during initialisation
166166| partitions | <code >1</code > |
167167| readyCallback | <code ></code > |
168168| kafkaErrorCallback | <code ></code > |
169+ | outputKafkaConfig | <code ></code > |
169170
170171<a name =" JSKafkaClient+send " ></a >
171172
@@ -225,7 +226,7 @@ an uuid.4() will be generated
225226 * [ .getProduceHandler()] ( #NativeKafkaClient+getProduceHandler ) ⇒ <code >null</code > \| <code >EventEmitter</code >
226227 * [ .overwriteTopics(topics)] ( #NativeKafkaClient+overwriteTopics )
227228 * [ .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)] ( #NativeKafkaClient+start )
228- * [ .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback)] ( #NativeKafkaClient+setupProducer )
229+ * [ .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig )] ( #NativeKafkaClient+setupProducer )
229230 * [ .send(topicName, message, partition, key, partitionKey, opaqueKey)] ( #NativeKafkaClient+send ) ⇒ <code >Promise.< ; void> ; </code >
230231 * [ .buffer(topic, identifier, payload, _ , partition, version, partitionKey)] ( #NativeKafkaClient+buffer ) ⇒ <code >Promise.< ; void> ; </code >
231232 * [ .bufferFormat(topic, identifier, payload, version, _ , partitionKey, partition)] ( #NativeKafkaClient+bufferFormat ) ⇒ <code >Promise.< ; void> ; </code >
@@ -290,7 +291,7 @@ will await a kafka-producer-ready-event if started withProducer=true
290291
291292<a name =" NativeKafkaClient+setupProducer " ></a >
292293
293- # nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback)
294+ # nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig )
294295starts a new kafka-producer
295296will fire kafka-producer-ready-event
296297requires a topic's partition count during initialisation
@@ -303,6 +304,7 @@ requires a topic's partition count during initialisation
303304| partitions | <code >1</code > |
304305| readyCallback | <code ></code > |
305306| kafkaErrorCallback | <code ></code > |
307+ | outputKafkaConfig | <code ></code > |
306308
307309<a name =" NativeKafkaClient+send " ></a >
308310
@@ -369,14 +371,15 @@ change-log representation of a stream
369371
370372* [ KStream] ( #KStream )
371373 * [ new KStream(topicName, storage, kafka, isClone)] ( #new_KStream_new )
372- * [ .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure)] ( #KStream+start )
374+ * [ .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig )] ( #KStream+start )
373375 * [ .innerJoin(stream, key, windowed, combine)] ( #KStream+innerJoin ) ⇒ [ <code >KStream</code >] ( #KStream )
374376 * [ .outerJoin(stream)] ( #KStream+outerJoin )
375377 * [ .leftJoin(stream)] ( #KStream+leftJoin )
376378 * [ .merge(stream)] ( #KStream+merge ) ⇒ [ <code >KStream</code >] ( #KStream )
377379 * [ .fromMost()] ( #KStream+fromMost ) ⇒ [ <code >KStream</code >] ( #KStream )
378- * [ .clone()] ( #KStream+clone ) ⇒ [ <code >KStream</code >] ( #KStream )
379- * [ .window(from, to, etl, encapsulated)] ( #KStream+window ) ⇒ <code >Object</code >
380+ * [ .clone(cloneEvents, cloneDeep)] ( #KStream+clone ) ⇒ [ <code >KStream</code >] ( #KStream )
381+ * [ .branch(preds)] ( #KStream+branch ) ⇒ [ <code >Array.< ; KStream> ; </code >] ( #KStream )
382+ * [ .window(from, to, etl, encapsulated, collect)] ( #KStream+window ) ⇒ <code >Object</code >
380383 * [ .close()] ( #KStream+close ) ⇒ <code >Promise.< ; boolean> ; </code >
381384
382385<a name =" new_KStream_new " ></a >
@@ -396,19 +399,20 @@ and return new instances immediately
396399
397400<a name =" KStream+start " ></a >
398401
399- # kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure)
402+ # kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig )
400403start kafka consumption
401404prepare production of messages if necessary
402405when called with zero or just a single callback argument
403406this function will return a promise and use the callback for errors
404407
405408** Kind** : instance method of [ <code >KStream</code >] ( #KStream )
406409
407- | Param | Type | Default |
408- | --- | --- | --- |
409- | kafkaReadyCallback | <code >function</code > | <code ></code > |
410- | kafkaErrorCallback | <code >function</code > | <code ></code > |
411- | withBackPressure | <code >boolean</code > | <code >false</code > |
410+ | Param | Type | Default | Description |
411+ | --- | --- | --- | --- |
412+ | kafkaReadyCallback | <code >function</code > \| <code >Object</code > | <code ></code > | can also be an object (config) |
413+ | kafkaErrorCallback | <code >function</code > | <code ></code > | |
414+ | withBackPressure | <code >boolean</code > | <code >false</code > | |
415+ | outputKafkaConfig | <code >Object</code > | <code ></code > | |
412416
413417<a name =" KStream+innerJoin " ></a >
414418
@@ -478,15 +482,38 @@ no consumer will be build
478482
479483<a name =" KStream+clone " ></a >
480484
481- # kStream.clone() ⇒ [ <code >KStream</code >] ( #KStream )
485+ # kStream.clone(cloneEvents, cloneDeep ) ⇒ [ <code >KStream</code >] ( #KStream )
482486as only joins and window operations return new stream instances
483487you might need a clone sometimes, which can be accomplished
484488using this function
485489
486490** Kind** : instance method of [ <code >KStream</code >] ( #KStream )
491+
492+ | Param | Type | Default | Description |
493+ | --- | --- | --- | --- |
494+ | cloneEvents | <code >boolean</code > | <code >false</code > | if events in the stream should be cloned |
495+ | cloneDeep | <code >boolean</code > | <code >false</code > | if events in the stream should be cloned deeply |
496+
497+ <a name =" KStream+branch " ></a >
498+
499+ # kStream.branch(preds) ⇒ [ <code >Array.< ; KStream> ; </code >] ( #KStream )
500+ Splits a stream into multiple branches based on cloning
501+ and filtering it depending on the passed predicates.
502+ [ (message) => message.key.startsWith("A"),
503+ (message) => message.key.startsWith("B"),
504+ (message) => true ]
505+ ---
506+ [ streamA, streamB, streamTrue ]
507+
508+ ** Kind** : instance method of [ <code >KStream</code >] ( #KStream )
509+
510+ | Param | Type |
511+ | --- | --- |
512+ | preds | <code >Array.< ; function()> ; </code > |
513+
487514<a name =" KStream+window " ></a >
488515
489- # kStream.window(from, to, etl, encapsulated) ⇒ <code >Object</code >
516+ # kStream.window(from, to, etl, encapsulated, collect ) ⇒ <code >Object</code >
490517builds a window'ed stream across all events of the current kstream
491518when the first event with an exceeding "to" is received (or the abort()
492519callback is called) the window closes and emits its "collected" values to the
@@ -501,12 +528,13 @@ encapsulated in an object: {time, value}
501528
502529** Kind** : instance method of [ <code >KStream</code >] ( #KStream )
503530
504- | Param | Type | Default |
505- | --- | --- | --- |
506- | from | <code >number</code > | |
507- | to | <code >number</code > | |
508- | etl | <code >function</code > | <code ></code > |
509- | encapsulated | <code >boolean</code > | <code >true</code > |
531+ | Param | Type | Default | Description |
532+ | --- | --- | --- | --- |
533+ | from | <code >number</code > | | |
534+ | to | <code >number</code > | | |
535+ | etl | <code >function</code > | <code ></code > | |
536+ | encapsulated | <code >boolean</code > | <code >true</code > | if event should stay encapsulated {time, value} |
537+ | collect | <code >boolean</code > | <code >true</code > | if events should be collected first before publishing to result stream |
510538
511539<a name =" KStream+close " ></a >
512540
@@ -525,7 +553,7 @@ table representation of a stream
525553
526554* [ KTable] ( #KTable )
527555 * [ new KTable(topicName, keyMapETL, storage, kafka, isClone)] ( #new_KTable_new )
528- * [ .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure)] ( #KTable+start )
556+ * [ .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig )] ( #KTable+start )
529557 * [ .innerJoin(stream, key)] ( #KTable+innerJoin )
530558 * [ .outerJoin(stream)] ( #KTable+outerJoin )
531559 * [ .leftJoin(stream)] ( #KTable+leftJoin )
@@ -558,19 +586,20 @@ keyMapETL = v -> {key, value} (sync)
558586
559587<a name =" KTable+start " ></a >
560588
561- # kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure)
589+ # kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig )
562590start kafka consumption
563591prepare production of messages if necessary
564592when called with zero or just a single callback argument
565593this function will return a promise and use the callback for errors
566594
567595** Kind** : instance method of [ <code >KTable</code >] ( #KTable )
568596
569- | Param | Type | Default |
570- | --- | --- | --- |
571- | kafkaReadyCallback | <code >function</code > | <code ></code > |
572- | kafkaErrorCallback | <code >function</code > | <code ></code > |
573- | withBackPressure | <code >boolean</code > | <code >false</code > |
597+ | Param | Type | Default | Description |
598+ | --- | --- | --- | --- |
599+ | kafkaReadyCallback | <code >function</code > \| <code >Object</code > | <code ></code > | can also be an object (config) |
600+ | kafkaErrorCallback | <code >function</code > | <code ></code > | |
601+ | withBackPressure | <code >boolean</code > | <code >false</code > | |
602+ | outputKafkaConfig | <code >Object</code > | <code ></code > | |
574603
575604<a name =" KTable+innerJoin " ></a >
576605
@@ -712,13 +741,16 @@ Stream base class
712741 * [ .getStorage()] ( #StreamDSL+getStorage ) ⇒ <code >KStorage</code >
713742 * [ .writeToStream(message)] ( #StreamDSL+writeToStream )
714743 * [ .getMost()] ( #StreamDSL+getMost ) ⇒ <code >Object</code >
744+ * [ .getNewMostFrom(array)] ( #StreamDSL+getNewMostFrom ) ⇒ <code >Stream.< ; any> ; </code >
715745 * [ .replaceInternalObservable(newStream$)] ( #StreamDSL+replaceInternalObservable )
716746 * [ .setProduceHandler(handler)] ( #StreamDSL+setProduceHandler )
717747 * [ .createAndSetProduceHandler()] ( #StreamDSL+createAndSetProduceHandler ) ⇒ <code >module: events .internal</code >
718748 * [ .setKafkaStreamsReference(reference)] ( #StreamDSL+setKafkaStreamsReference )
719749 * [ .from(topicName)] ( #StreamDSL+from ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
750+ * [ .awaitPromises(etl)] ( #StreamDSL+awaitPromises ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
720751 * [ .map(etl)] ( #StreamDSL+map ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
721752 * [ .asyncMap(etl)] ( #StreamDSL+asyncMap ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
753+ * [ .concatMap(etl)] ( #StreamDSL+concatMap ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
722754 * [ .forEach(eff)] ( #StreamDSL+forEach ) ⇒ <code >\* </code >
723755 * [ .chainForEach(eff, callback)] ( #StreamDSL+chainForEach ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
724756 * [ .tap(eff)] ( #StreamDSL+tap )
@@ -761,7 +793,7 @@ Stream base class
761793 * [ .max(fieldName, maxField)] ( #StreamDSL+max ) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
762794 * [ ._ merge(otherStream$)] ( #StreamDSL+_merge )
763795 * [ ._ zip(otherStream$, combine)] ( #StreamDSL+_zip )
764- * [ .to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback)] ( #StreamDSL+to ) ⇒ <code >Promise.< ; boolean> ; </code >
796+ * [ .to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig )] ( #StreamDSL+to ) ⇒ <code >Promise.< ; boolean> ; </code >
765797
766798<a name =" new_StreamDSL_new " ></a >
767799
@@ -815,6 +847,18 @@ returns the internal most.js stream
815847
816848** Kind** : instance method of [ <code >StreamDSL</code >] ( #StreamDSL )
817849** Returns** : <code >Object</code > - most.js stream
850+ <a name =" StreamDSL+getNewMostFrom " ></a >
851+
852+ # streamDSL.getNewMostFrom(array) ⇒ <code >Stream.< ; any> ; </code >
853+ returns a new most stream from the
854+ given array
855+
856+ ** Kind** : instance method of [ <code >StreamDSL</code >] ( #StreamDSL )
857+
858+ | Param |
859+ | --- |
860+ | array |
861+
818862<a name =" StreamDSL+replaceInternalObservable " ></a >
819863
820864# streamDSL.replaceInternalObservable(newStream$)
@@ -870,6 +914,18 @@ add more topic/s to the consumer
870914| --- | --- |
871915| topicName | <code >string</code > \| <code >Array.< ; string> ; </code > |
872916
917+ <a name =" StreamDSL+awaitPromises " ></a >
918+
919+ # streamDSL.awaitPromises(etl) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
920+ given a stream of promises, returns stream containing the fulfillment values
921+ etl = Promise -> v
922+
923+ ** Kind** : instance method of [ <code >StreamDSL</code >] ( #StreamDSL )
924+
925+ | Param |
926+ | --- |
927+ | etl |
928+
873929<a name =" StreamDSL+map " ></a >
874930
875931# streamDSL.map(etl) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
@@ -895,6 +951,19 @@ etl = v -> Promise
895951| --- |
896952| etl |
897953
954+ <a name =" StreamDSL+concatMap " ></a >
955+
956+ # streamDSL.concatMap(etl) ⇒ [ <code >StreamDSL</code >] ( #StreamDSL )
957+ transform each etl in stream into a stream,
958+ and then concatenate it onto the end of the resulting stream.
959+ etl = v -> stream(v2)
960+
961+ ** Kind** : instance method of [ <code >StreamDSL</code >] ( #StreamDSL )
962+
963+ | Param |
964+ | --- |
965+ | etl |
966+
898967<a name =" StreamDSL+forEach " ></a >
899968
900969# streamDSL.forEach(eff) ⇒ <code >\* </code >
@@ -1366,7 +1435,7 @@ latest value which is stored
13661435
13671436<a name =" StreamDSL+_merge " ></a >
13681437
1369- # streamDSL._ merge(otherStream$)
1438+ # streamDSL.\ _ merge(otherStream$)
13701439merge this stream with another, resulting a
13711440stream with all elements from both streams
13721441
@@ -1378,7 +1447,7 @@ stream with all elements from both streams
13781447
13791448<a name =" StreamDSL+_zip " ></a >
13801449
1381- # streamDSL._ zip(otherStream$, combine)
1450+ # streamDSL.\ _ zip(otherStream$, combine)
13821451merge this stream with another stream
13831452by combining (zipping) every event from each stream
13841453to a single new event on the new stream
@@ -1393,7 +1462,7 @@ combine = (e1, e2) -> e1 + e2
13931462
13941463<a name =" StreamDSL+to " ></a >
13951464
1396- # streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback) ⇒ <code >Promise.< ; boolean> ; </code >
1465+ # streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig ) ⇒ <code >Promise.< ; boolean> ; </code >
13971466define an output topic
13981467when passed to KafkaStreams this will trigger
13991468the stream$ result to be produced to the given topic name
@@ -1404,10 +1473,11 @@ returns a promise
14041473
14051474| Param | Type | Default | Description |
14061475| --- | --- | --- | --- |
1407- | topic | <code >string</code > | | optional |
1476+ | topic | <code >string</code > \| < code >Object</ code > | | optional (can also be an object, containing the same parameters as fields) |
14081477| outputPartitionsCount | <code >number</code > | <code >1</code > | optional |
14091478| produceType | <code >string</code > | <code >" ; send" ; </code > | optional |
14101479| version | <code >number</code > | <code >1</code > | optional |
14111480| compressionType | <code >number</code > | <code >0</code > | optional |
14121481| producerErrorCallback | <code >function</code > | <code ></code > | optional |
1482+ | outputKafkaConfig | <code >Object</code > | <code ></code > | optional |
14131483
0 commit comments