77import time
88
99import kafka .errors as Errors
10- from kafka .producer .buffer import SimpleBufferPool
1110from kafka .producer .future import FutureRecordMetadata , FutureProduceResult
1211from kafka .record .memory_records import MemoryRecordsBuilder
1312from kafka .structs import TopicPartition
@@ -36,7 +35,7 @@ def get(self):
3635
3736
3837class ProducerBatch (object ):
39- def __init__ (self , tp , records , buffer ):
38+ def __init__ (self , tp , records ):
4039 self .max_record_size = 0
4140 now = time .time ()
4241 self .created = now
@@ -48,7 +47,6 @@ def __init__(self, tp, records, buffer):
4847 self .topic_partition = tp
4948 self .produce_future = FutureProduceResult (tp )
5049 self ._retry = False
51- self ._buffer = buffer # We only save it, we don't write to it
5250
5351 @property
5452 def record_count (self ):
@@ -123,9 +121,6 @@ def in_retry(self):
123121 def set_retry (self ):
124122 self ._retry = True
125123
126- def buffer (self ):
127- return self ._buffer
128-
129124 def __str__ (self ):
130125 return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
131126 self .topic_partition , self .records .next_offset ())
@@ -145,12 +140,6 @@ class RecordAccumulator(object):
145140 A small batch size will make batching less common and may reduce
146141 throughput (a batch size of zero will disable batching entirely).
147142 Default: 16384
148- buffer_memory (int): The total bytes of memory the producer should use
149- to buffer records waiting to be sent to the server. If records are
150- sent faster than they can be delivered to the server the producer
151- will block up to max_block_ms, raising an exception on timeout.
152- In the current implementation, this setting is an approximation.
153- Default: 33554432 (32MB)
154143 compression_attrs (int): The compression type for all data generated by
155144 the producer. Valid values are gzip(1), snappy(2), lz4(3), or
156145 none(0).
@@ -168,7 +157,6 @@ class RecordAccumulator(object):
168157 all retries in a short period of time. Default: 100
169158 """
170159 DEFAULT_CONFIG = {
171- 'buffer_memory' : 33554432 ,
172160 'batch_size' : 16384 ,
173161 'compression_attrs' : 0 ,
174162 'linger_ms' : 0 ,
@@ -189,18 +177,13 @@ def __init__(self, **configs):
189177 self ._appends_in_progress = AtomicInteger ()
190178 self ._batches = collections .defaultdict (collections .deque ) # TopicPartition: [ProducerBatch]
191179 self ._tp_locks = {None : threading .Lock ()} # TopicPartition: Lock, plus a lock to add entries
192- self ._free = SimpleBufferPool (self .config ['buffer_memory' ],
193- self .config ['batch_size' ],
194- metrics = self .config ['metrics' ],
195- metric_group_prefix = self .config ['metric_group_prefix' ])
196180 self ._incomplete = IncompleteProducerBatches ()
197181 # The following variables should only be accessed by the sender thread,
198182 # so we don't need to protect them w/ locking.
199183 self .muted = set ()
200184 self ._drain_index = 0
201185
202- def append (self , tp , timestamp_ms , key , value , headers , max_time_to_block_ms ,
203- estimated_size = 0 ):
186+ def append (self , tp , timestamp_ms , key , value , headers ):
204187 """Add a record to the accumulator, return the append result.
205188
206189 The append result will contain the future metadata, and flag for
@@ -213,8 +196,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
213196 key (bytes): The key for the record
214197 value (bytes): The value for the record
215198 headers (List[Tuple[str, bytes]]): The header fields for the record
216- max_time_to_block_ms (int): The maximum time in milliseconds to
217- block for buffer memory to be available
218199
219200 Returns:
220201 tuple: (future, batch_is_full, new_batch_created)
@@ -240,9 +221,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
240221 batch_is_full = len (dq ) > 1 or last .records .is_full ()
241222 return future , batch_is_full , False
242223
243- size = max (self .config ['batch_size' ], estimated_size )
244- log .debug ("Allocating a new %d byte message buffer for %s" , size , tp ) # trace
245- buf = self ._free .allocate (size , max_time_to_block_ms )
246224 with self ._tp_locks [tp ]:
247225 # Need to check if producer is closed again after grabbing the
248226 # dequeue lock.
@@ -254,7 +232,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
254232 if future is not None :
255233 # Somebody else found us a batch, return the one we
256234 # waited for! Hopefully this doesn't happen often...
257- self ._free .deallocate (buf )
258235 batch_is_full = len (dq ) > 1 or last .records .is_full ()
259236 return future , batch_is_full , False
260237
@@ -264,7 +241,7 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
264241 self .config ['batch_size' ]
265242 )
266243
267- batch = ProducerBatch (tp , records , buf )
244+ batch = ProducerBatch (tp , records )
268245 future = batch .try_append (timestamp_ms , key , value , headers )
269246 if not future :
270247 raise Exception ()
@@ -384,7 +361,6 @@ def ready(self, cluster):
384361 unknown_leaders_exist = False
385362 now = time .time ()
386363
387- exhausted = bool (self ._free .queued () > 0 )
388364 # several threads are accessing self._batches -- to simplify
389365 # concurrent access, we iterate over a snapshot of partitions
390366 # and lock each partition separately as needed
@@ -414,7 +390,7 @@ def ready(self, cluster):
414390 full = bool (len (dq ) > 1 or batch .records .is_full ())
415391 expired = bool (waited_time >= time_to_wait )
416392
417- sendable = (full or expired or exhausted or self ._closed or
393+ sendable = (full or expired or self ._closed or
418394 self ._flush_in_progress ())
419395
420396 if sendable and not backing_off :
@@ -506,7 +482,6 @@ def drain(self, cluster, nodes, max_size):
506482 def deallocate (self , batch ):
507483 """Deallocate the record batch."""
508484 self ._incomplete .remove (batch )
509- self ._free .deallocate (batch .buffer ())
510485
511486 def _flush_in_progress (self ):
512487 """Are there any threads currently waiting on a flush?"""
0 commit comments