77 runLagMonitoring : runLagMonitoringCommon ,
88 genericProduceToTopic,
99 getAutoCommit,
10+ PerformanceLogger,
1011 } = require ( './performance-primitives-common' ) ;
1112
1213module . exports = {
@@ -22,6 +23,9 @@ module.exports = {
2223
2324const CONSUMER_MAX_BATCH_SIZE = process . env . CONSUMER_MAX_BATCH_SIZE ? + process . env . CONSUMER_MAX_BATCH_SIZE : null ;
2425const IS_HIGHER_LATENCY_CLUSTER = process . env . IS_HIGHER_LATENCY_CLUSTER === 'true' ;
26+ const DEBUG = process . env . DEBUG ;
27+ const STATISTICS_INTERVAL_MS = process . env . STATISTICS_INTERVAL_MS ? + process . env . STATISTICS_INTERVAL_MS : null ;
28+ const ENABLE_LOGGING = DEBUG !== undefined || STATISTICS_INTERVAL_MS !== null ;
2529
2630function baseConfiguration ( parameters ) {
2731 let ret = {
@@ -39,11 +43,29 @@ function baseConfiguration(parameters) {
3943 'sasl.password' : parameters . saslPassword ,
4044 } ;
4145 }
46+ if ( DEBUG && ! parameters . disableLogging ) {
47+ ret [ 'debug' ] = DEBUG ;
48+ }
49+ if ( parameters . logToFile ) {
50+ ret . kafkaJS = {
51+ 'logger' : new PerformanceLogger ( parameters . logToFile ) ,
52+ } ;
53+ }
54+ if ( STATISTICS_INTERVAL_MS !== null ) {
55+ ret [ 'statistics.interval.ms' ] = STATISTICS_INTERVAL_MS ;
56+ ret [ 'stats_cb' ] = function ( event ) {
57+ this . logger ( ) . info ( event . message ) ;
58+ } ;
59+ }
4260 return ret ;
4361}
4462
4563async function runCreateTopics ( parameters , topic , topic2 , numPartitions ) {
46- const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
64+ const adminParameters = {
65+ ...parameters ,
66+ disableLogging : true ,
67+ } ;
68+ const kafka = new Kafka ( baseConfiguration ( adminParameters ) ) ;
4769
4870 const admin = kafka . admin ( ) ;
4971 await admin . connect ( ) ;
@@ -73,7 +95,11 @@ async function runCreateTopics(parameters, topic, topic2, numPartitions) {
7395}
7496
7597function runLagMonitoring ( parameters , topic ) {
76- const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
98+ const monitoringParameters = {
99+ ...parameters ,
100+ disableLogging : true ,
101+ } ;
102+ const kafka = new Kafka ( baseConfiguration ( monitoringParameters ) ) ;
77103 const admin = kafka . admin ( ) ;
78104
79105 return runLagMonitoringCommon ( admin , topic ) ;
@@ -116,6 +142,9 @@ function newCompatibleProducer(parameters, compression) {
116142}
117143
118144async function runProducer ( parameters , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) {
145+ if ( ENABLE_LOGGING && ! parameters . logToFile ) {
146+ parameters . logToFile = './confluent-producer.log' ;
147+ }
119148 return runProducerCommon ( newCompatibleProducer ( parameters , compression ) , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) ;
120149}
121150
@@ -151,7 +180,8 @@ class CompatibleConsumer {
151180 }
152181}
153182
154- function newCompatibleConsumer ( parameters , eachBatch ) {
183+ function newCompatibleConsumer ( parameters , eachBatch , messageSize , limitRPS ) {
184+ const minFetchBytes = messageSize * limitRPS ;
155185 const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
156186 const autoCommit = getAutoCommit ( ) ;
157187 const autoCommitOpts = autoCommit > 0 ?
@@ -173,6 +203,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
173203 'group.id' : groupId ,
174204 'auto.offset.reset' : 'earliest' ,
175205 'fetch.queue.backoff.ms' : '100' ,
206+ 'fetch.min.bytes' : minFetchBytes . toString ( ) ,
176207 ...autoCommitOpts ,
177208 ...jsOpts ,
178209 ...higherLatencyClusterOpts ,
@@ -181,16 +212,30 @@ function newCompatibleConsumer(parameters, eachBatch) {
181212}
182213
183214
184- async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ) {
215+ async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ,
216+ messageSize , limitRPS ) {
185217 let actionOnMessages = null ;
186218 let producer ;
187219 if ( produceToTopic ) {
188- producer = newCompatibleProducer ( parameters , produceCompression ) ;
220+ const producerParameters = {
221+ ...parameters ,
222+ } ;
223+ if ( ENABLE_LOGGING )
224+ producerParameters . logToFile = './confluent-consumer-producer.log' ;
225+ producer = newCompatibleProducer ( producerParameters , produceCompression ) ;
189226 await producer . connect ( ) ;
190227 actionOnMessages = ( messages ) =>
191228 genericProduceToTopic ( producer , produceToTopic , messages ) ;
192229 }
193- const ret = await runConsumerCommon ( newCompatibleConsumer ( parameters , eachBatch ) , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
230+ const consumerParameters = {
231+ ...parameters ,
232+ } ;
233+ if ( ENABLE_LOGGING )
234+ consumerParameters . logToFile = eachBatch ? './confluent-consumer-batch.log' :
235+ './confluent-consumer-message.log' ;
236+ const ret = await runConsumerCommon (
237+ newCompatibleConsumer ( consumerParameters , eachBatch , messageSize , limitRPS ) ,
238+ topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
194239 if ( producer ) {
195240 await producer . disconnect ( ) ;
196241 }
0 commit comments