@@ -20,6 +20,7 @@ var TopicPartition = require('./topic-partition');
2020var shallowCopy = require ( './util' ) . shallowCopy ;
2121var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500 ;
2222var DEFAULT_CONSUME_TIME_OUT = 1000 ;
23+ const DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE = false ;
2324util . inherits ( KafkaConsumer , Client ) ;
2425
2526/**
@@ -142,6 +143,7 @@ function KafkaConsumer(conf, topicConf) {
142143
143144 this . _consumeTimeout = DEFAULT_CONSUME_TIME_OUT ;
144145 this . _consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY ;
146+ this . _consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE ;
145147}
146148
147149/**
@@ -160,6 +162,20 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs)
160162 this . _consumeLoopTimeoutDelay = intervalMs ;
161163} ;
162164
165+ /**
166+ * If true:
167+ * In consume(number, cb), we will wait for `timeoutMs` for the first message to be fetched.
168+ * Subsequent messages will not be waited for and will be fetched (upto `number`) if already ready.
169+ *
170+ * If false:
171+ * In consume(number, cb), we will wait for upto `timeoutMs` for each message to be fetched.
172+ *
173+ * @param {boolean } isTimeoutOnlyForFirstMessage
174+ */
175+ KafkaConsumer . prototype . setDefaultIsTimeoutOnlyForFirstMessage = function ( isTimeoutOnlyForFirstMessage ) {
176+ this . _consumeIsTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessage ;
177+ } ;
178+
163179/**
164180 * Get a stream representation of this KafkaConsumer
165181 *
@@ -512,7 +528,7 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
512528KafkaConsumer . prototype . _consumeNum = function ( timeoutMs , numMessages , cb ) {
513529 var self = this ;
514530
515- this . _client . consume ( timeoutMs , numMessages , function ( err , messages , eofEvents ) {
531+ this . _client . consume ( timeoutMs , numMessages , this . _consumeIsTimeoutOnlyForFirstMessage , function ( err , messages , eofEvents ) {
516532 if ( err ) {
517533 err = LibrdKafkaError . create ( err ) ;
518534 if ( cb ) {
0 commit comments