1010
1111from kafka .common import ProduceRequest , TopicAndPartition
1212from kafka .partitioner import HashedPartitioner
13- from kafka .protocol import create_message
13+ from kafka .protocol import (
14+ CODEC_NONE , CODEC_GZIP , CODEC_SNAPPY , ALL_CODECS ,
15+ create_message , create_gzip_message , create_snappy_message ,
16+ )
1417
1518log = logging .getLogger ("kafka" )
1619
2023STOP_ASYNC_PRODUCER = - 1
2124
2225
23- def _send_upstream (queue , client , batch_time , batch_size ,
26+ def _send_upstream (queue , client , codec , batch_time , batch_size ,
2427 req_acks , ack_timeout ):
2528 """
2629 Listen on the queue for a specified number of messages or till
@@ -61,7 +64,14 @@ def _send_upstream(queue, client, batch_time, batch_size,
6164
6265 # Send collected requests upstream
6366 reqs = []
64- for topic_partition , messages in msgset .items ():
67+ for topic_partition , msg in msgset .items ():
68+ if codec == CODEC_GZIP :
69+ messages = [create_gzip_message (msg )]
70+ elif codec == CODEC_SNAPPY :
71+ messages = [create_snappy_message (msg )]
72+ else :
73+ messages = [create_message (m ) for m in msg ]
74+
6575 req = ProduceRequest (topic_partition .topic ,
6676 topic_partition .partition ,
6777 messages )
@@ -101,6 +111,7 @@ class Producer(object):
101111 def __init__ (self , client , async = False ,
102112 req_acks = ACK_AFTER_LOCAL_WRITE ,
103113 ack_timeout = DEFAULT_ACK_TIMEOUT ,
114+ codec = None ,
104115 batch_send = False ,
105116 batch_send_every_n = BATCH_SEND_MSG_COUNT ,
106117 batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
@@ -118,11 +129,17 @@ def __init__(self, client, async=False,
118129 self .req_acks = req_acks
119130 self .ack_timeout = ack_timeout
120131
132+ if codec is None :
133+ codec = CODEC_NONE
134+ assert codec in ALL_CODECS
135+ self .codec = codec
136+
121137 if self .async :
122138 self .queue = Queue () # Messages are sent through this queue
123139 self .proc = Process (target = _send_upstream ,
124140 args = (self .queue ,
125141 self .client .copy (),
142+ self .codec ,
126143 batch_send_every_t ,
127144 batch_send_every_n ,
128145 self .req_acks ,
@@ -138,11 +155,16 @@ def send_messages(self, topic, partition, *msg):
138155 """
139156 if self .async :
140157 for m in msg :
141- self .queue .put ((TopicAndPartition (topic , partition ),
142- create_message (m )))
158+ self .queue .put ((TopicAndPartition (topic , partition ), m ))
143159 resp = []
144160 else :
145- messages = [create_message (m ) for m in msg ]
161+ if self .codec == CODEC_GZIP :
162+ messages = [create_gzip_message (msg )]
163+ elif self .codec == CODEC_SNAPPY :
164+ messages = [create_snappy_message (msg )]
165+ else :
166+ messages = [create_message (m ) for m in msg ]
167+
146168 req = ProduceRequest (topic , partition , messages )
147169 try :
148170 resp = self .client .send_produce_request ([req ], acks = self .req_acks ,
@@ -167,7 +189,7 @@ def stop(self, timeout=1):
167189
168190class SimpleProducer (Producer ):
169191 """
170- A simple, round-robbin producer. Each message goes to exactly one partition
192+ A simple, round-robin producer. Each message goes to exactly one partition
171193
172194 Params:
173195 client - The Kafka client instance to use
@@ -184,12 +206,13 @@ class SimpleProducer(Producer):
184206 def __init__ (self , client , async = False ,
185207 req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
186208 ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
209+ codec = None ,
187210 batch_send = False ,
188211 batch_send_every_n = BATCH_SEND_MSG_COUNT ,
189212 batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
190213 self .partition_cycles = {}
191214 super (SimpleProducer , self ).__init__ (client , async , req_acks ,
192- ack_timeout , batch_send ,
215+ ack_timeout , codec , batch_send ,
193216 batch_send_every_n ,
194217 batch_send_every_t )
195218
@@ -227,6 +250,7 @@ class KeyedProducer(Producer):
227250 def __init__ (self , client , partitioner = None , async = False ,
228251 req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
229252 ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
253+ codec = None ,
230254 batch_send = False ,
231255 batch_send_every_n = BATCH_SEND_MSG_COUNT ,
232256 batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
@@ -236,7 +260,7 @@ def __init__(self, client, partitioner=None, async=False,
236260 self .partitioners = {}
237261
238262 super (KeyedProducer , self ).__init__ (client , async , req_acks ,
239- ack_timeout , batch_send ,
263+ ack_timeout , codec , batch_send ,
240264 batch_send_every_n ,
241265 batch_send_every_t )
242266
0 commit comments