44from __future__ import absolute_import , print_function
55
66import argparse
7- import logging
87import pprint
98import sys
109import threading
10+ import time
1111import traceback
1212
13- from kafka .vendor .six .moves import range
14-
15- from kafka import KafkaConsumer , KafkaProducer
16- from test .fixtures import KafkaFixture , ZookeeperFixture
17-
18- logging .basicConfig (level = logging .ERROR )
19-
20-
21- def start_brokers (n ):
22- print ('Starting {0} {1}-node cluster...' .format (KafkaFixture .kafka_version , n ))
23- print ('-> 1 Zookeeper' )
24- zk = ZookeeperFixture .instance ()
25- print ('---> {0}:{1}' .format (zk .host , zk .port ))
26- print ()
27-
28- partitions = min (n , 3 )
29- replicas = min (n , 3 )
30- print ('-> {0} Brokers [{1} partitions / {2} replicas]' .format (n , partitions , replicas ))
31- brokers = [
32- KafkaFixture .instance (i , zk , zk_chroot = '' ,
33- partitions = partitions , replicas = replicas )
34- for i in range (n )
35- ]
36- for broker in brokers :
37- print ('---> {0}:{1}' .format (broker .host , broker .port ))
38- print ()
39- return brokers
13+ from kafka import KafkaConsumer
4014
4115
4216class ConsumerPerformance (object ):
43-
4417 @staticmethod
4518 def run (args ):
4619 try :
@@ -53,28 +26,17 @@ def run(args):
5326 pass
5427 if v == 'None' :
5528 v = None
29+ elif v == 'False' :
30+ v = False
31+ elif v == 'True' :
32+ v = True
5633 props [k ] = v
5734
58- if args .brokers :
59- brokers = start_brokers (args .brokers )
60- props ['bootstrap_servers' ] = ['{0}:{1}' .format (broker .host , broker .port )
61- for broker in brokers ]
62- print ('---> bootstrap_servers={0}' .format (props ['bootstrap_servers' ]))
63- print ()
64-
65- print ('-> Producing records' )
66- record = bytes (bytearray (args .record_size ))
67- producer = KafkaProducer (compression_type = args .fixture_compression ,
68- ** props )
69- for i in range (args .num_records ):
70- producer .send (topic = args .topic , value = record )
71- producer .flush ()
72- producer .close ()
73- print ('-> OK!' )
74- print ()
75-
7635 print ('Initializing Consumer...' )
36+ props ['bootstrap_servers' ] = args .bootstrap_servers
7737 props ['auto_offset_reset' ] = 'earliest'
38+ if 'group_id' not in props :
39+ props ['group_id' ] = 'kafka-consumer-benchmark'
7840 if 'consumer_timeout_ms' not in props :
7941 props ['consumer_timeout_ms' ] = 10000
8042 props ['metrics_sample_window_ms' ] = args .stats_interval * 1000
@@ -92,14 +54,18 @@ def run(args):
9254 print ('-> OK!' )
9355 print ()
9456
57+ start_time = time .time ()
9558 records = 0
9659 for msg in consumer :
9760 records += 1
9861 if records >= args .num_records :
9962 break
100- print ('Consumed {0} records' .format (records ))
10163
64+ end_time = time .time ()
10265 timer_stop .set ()
66+ timer .join ()
67+ print ('Consumed {0} records' .format (records ))
68+ print ('Execution time:' , end_time - start_time , 'secs' )
10369
10470 except Exception :
10571 exc_info = sys .exc_info ()
@@ -143,32 +109,27 @@ def get_args_parser():
143109 parser = argparse .ArgumentParser (
144110 description = 'This tool is used to verify the consumer performance.' )
145111
112+ parser .add_argument (
113+ '--bootstrap-servers' , type = str , nargs = '+' , default = (),
114+ help = 'host:port for cluster bootstrap servers' )
146115 parser .add_argument (
147116 '--topic' , type = str ,
148- help = 'Topic for consumer test' ,
117+ help = 'Topic for consumer test (default: kafka-python-benchmark-test) ' ,
149118 default = 'kafka-python-benchmark-test' )
150119 parser .add_argument (
151120 '--num-records' , type = int ,
152- help = 'number of messages to consume' ,
121+ help = 'number of messages to consume (default: 1000000) ' ,
153122 default = 1000000 )
154- parser .add_argument (
155- '--record-size' , type = int ,
156- help = 'message size in bytes' ,
157- default = 100 )
158123 parser .add_argument (
159124 '--consumer-config' , type = str , nargs = '+' , default = (),
160125 help = 'kafka consumer related configuration properties like '
161126 'bootstrap_servers,client_id etc..' )
162127 parser .add_argument (
163128 '--fixture-compression' , type = str ,
164129 help = 'specify a compression type for use with broker fixtures / producer' )
165- parser .add_argument (
166- '--brokers' , type = int ,
167- help = 'Number of kafka brokers to start' ,
168- default = 0 )
169130 parser .add_argument (
170131 '--stats-interval' , type = int ,
171- help = 'Interval in seconds for stats reporting to console' ,
132+ help = 'Interval in seconds for stats reporting to console (default: 5) ' ,
172133 default = 5 )
173134 parser .add_argument (
174135 '--raw-metrics' , action = 'store_true' ,
0 commit comments