77 runLagMonitoring : runLagMonitoringCommon ,
88 genericProduceToTopic,
99 getAutoCommit,
10+ PerformanceLogger,
1011 } = require ( './performance-primitives-common' ) ;
1112
1213module . exports = {
@@ -22,6 +23,9 @@ module.exports = {
2223
2324const CONSUMER_MAX_BATCH_SIZE = process . env . CONSUMER_MAX_BATCH_SIZE ? + process . env . CONSUMER_MAX_BATCH_SIZE : null ;
2425const IS_HIGHER_LATENCY_CLUSTER = process . env . IS_HIGHER_LATENCY_CLUSTER === 'true' ;
26+ const DEBUG = process . env . DEBUG ;
27+ const STATISTICS_INTERVAL_MS = process . env . STATISTICS_INTERVAL_MS ? + process . env . STATISTICS_INTERVAL_MS : null ;
28+ const ENABLE_LOGGING = DEBUG !== undefined || STATISTICS_INTERVAL_MS !== null ;
2529
2630function baseConfiguration ( parameters ) {
2731 let ret = {
@@ -39,6 +43,20 @@ function baseConfiguration(parameters) {
3943 'sasl.password' : parameters . saslPassword ,
4044 } ;
4145 }
46+ if ( DEBUG ) {
47+ ret [ 'debug' ] = DEBUG ;
48+ }
49+ if ( parameters . logToFile ) {
50+ ret . kafkaJS = {
51+ 'logger' : new PerformanceLogger ( parameters . logToFile ) ,
52+ } ;
53+ }
54+ if ( STATISTICS_INTERVAL_MS !== null ) {
55+ ret [ 'statistics.interval.ms' ] = STATISTICS_INTERVAL_MS ;
56+ ret [ 'stats_cb' ] = function ( event ) {
57+ this . logger ( ) . info ( event . message ) ;
58+ } ;
59+ }
4260 return ret ;
4361}
4462
@@ -116,6 +134,9 @@ function newCompatibleProducer(parameters, compression) {
116134}
117135
118136async function runProducer ( parameters , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) {
137+ if ( ENABLE_LOGGING && ! parameters . logToFile ) {
138+ parameters . logToFile = './confluent-producer.log' ;
139+ }
119140 return runProducerCommon ( newCompatibleProducer ( parameters , compression ) , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) ;
120141}
121142
@@ -151,7 +172,8 @@ class CompatibleConsumer {
151172 }
152173}
153174
154- function newCompatibleConsumer ( parameters , eachBatch ) {
175+ function newCompatibleConsumer ( parameters , eachBatch , messageSize , limitRPS ) {
176+ const minFetchBytes = messageSize * limitRPS ;
155177 const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
156178 const autoCommit = getAutoCommit ( ) ;
157179 const autoCommitOpts = autoCommit > 0 ?
@@ -173,6 +195,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
173195 'group.id' : groupId ,
174196 'auto.offset.reset' : 'earliest' ,
175197 'fetch.queue.backoff.ms' : '100' ,
198+ 'fetch.min.bytes' : minFetchBytes . toString ( ) ,
176199 ...autoCommitOpts ,
177200 ...jsOpts ,
178201 ...higherLatencyClusterOpts ,
@@ -181,16 +204,30 @@ function newCompatibleConsumer(parameters, eachBatch) {
181204}
182205
183206
184- async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ) {
207+ async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ,
208+ messageSize , limitRPS ) {
185209 let actionOnMessages = null ;
186210 let producer ;
187211 if ( produceToTopic ) {
188- producer = newCompatibleProducer ( parameters , produceCompression ) ;
212+ const producerParameters = {
213+ ...parameters ,
214+ } ;
215+ if ( ENABLE_LOGGING )
216+ producerParameters . logToFile = './confluent-consumer-producer.log' ;
217+ producer = newCompatibleProducer ( producerParameters , produceCompression ) ;
189218 await producer . connect ( ) ;
190219 actionOnMessages = ( messages ) =>
191220 genericProduceToTopic ( producer , produceToTopic , messages ) ;
192221 }
193- const ret = await runConsumerCommon ( newCompatibleConsumer ( parameters , eachBatch ) , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
222+ const consumerParameters = {
223+ ...parameters ,
224+ } ;
225+ if ( ENABLE_LOGGING )
226+ consumerParameters . logToFile = eachBatch ? './confluent-consumer-batch.log' :
227+ './confluent-consumer-message.log' ;
228+ const ret = await runConsumerCommon (
229+ newCompatibleConsumer ( consumerParameters , eachBatch , messageSize , limitRPS ) ,
230+ topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
194231 if ( producer ) {
195232 await producer . disconnect ( ) ;
196233 }
0 commit comments