11#!/usr/bin/env python
2- import threading , logging , time , collections
2+ import threading , logging , time
33
4- from kafka .client import KafkaClient
5- from kafka .consumer import SimpleConsumer
6- from kafka .producer import SimpleProducer
4+ from kafka import KafkaConsumer , KafkaProducer
75
86msg_size = 524288
97
8+ producer_stop = threading .Event ()
9+ consumer_stop = threading .Event ()
10+
1011class Producer (threading .Thread ):
11- daemon = True
12- big_msg = "1" * msg_size
12+ big_msg = b'1' * msg_size
1313
1414 def run (self ):
15- client = KafkaClient ("localhost:9092" )
16- producer = SimpleProducer (client )
15+ producer = KafkaProducer (bootstrap_servers = 'localhost:9092' )
1716 self .sent = 0
1817
19- while True :
20- producer .send_messages ('my-topic' , self .big_msg )
18+ while not producer_stop . is_set () :
19+ producer .send ('my-topic' , self .big_msg )
2120 self .sent += 1
21+ producer .flush ()
2222
2323
2424class Consumer (threading .Thread ):
25- daemon = True
2625
2726 def run (self ):
28- client = KafkaClient ("localhost:9092" )
29- consumer = SimpleConsumer (client , "test-group" , "my-topic" ,
30- max_buffer_size = None ,
31- )
27+ consumer = KafkaConsumer (bootstrap_servers = 'localhost:9092' ,
28+ auto_offset_reset = 'earliest' )
29+ consumer .subscribe (['my-topic' ])
3230 self .valid = 0
3331 self .invalid = 0
3432
3533 for message in consumer :
36- if len (message .message . value ) == msg_size :
34+ if len (message .value ) == msg_size :
3735 self .valid += 1
3836 else :
3937 self .invalid += 1
4038
39+ if consumer_stop .is_set ():
40+ break
41+
42+ consumer .close ()
43+
4144def main ():
4245 threads = [
4346 Producer (),
@@ -48,13 +51,15 @@ def main():
4851 t .start ()
4952
5053 time .sleep (10 )
54+ producer_stop .set ()
55+ consumer_stop .set ()
5156 print 'Messages sent: %d' % threads [0 ].sent
5257 print 'Messages recvd: %d' % threads [1 ].valid
5358 print 'Messages invalid: %d' % threads [1 ].invalid
5459
5560if __name__ == "__main__" :
5661 logging .basicConfig (
5762 format = '%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s' ,
58- level = logging .DEBUG
63+ level = logging .INFO
5964 )
6065 main ()
0 commit comments