Skip to content

Commit ca9d2fa

Browse files
committed
Fix bug causing KafkaProducer to double-compress message batches on retry
1 parent 594f707 commit ca9d2fa

File tree

1 file changed

+23
-16
lines changed

1 file changed

+23
-16
lines changed

kafka/producer/buffer.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,29 @@ def is_full(self):
8989
return self._buffer.tell() >= self._batch_size
9090

9191
def close(self):
92-
if self._compressor:
93-
# TODO: avoid copies with bytearray / memoryview
94-
self._buffer.seek(4)
95-
msg = Message(self._compressor(self._buffer.read()),
96-
attributes=self._compression_attributes,
97-
magic=self._message_version)
98-
encoded = msg.encode()
99-
self._buffer.seek(4)
100-
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
101-
self._buffer.write(Int32.encode(len(encoded)))
102-
self._buffer.write(encoded)
103-
104-
# Update the message set size, and return ready for full read()
105-
size = self._buffer.tell() - 4
106-
self._buffer.seek(0)
107-
self._buffer.write(Int32.encode(size))
92+
# This method may be called multiple times on the same batch
93+
# i.e., on retries
94+
# we need to make sure we only close it out once
95+
# otherwise compressed messages may be double-compressed
96+
# see Issue 718
97+
if not self._closed:
98+
if self._compressor:
99+
# TODO: avoid copies with bytearray / memoryview
100+
self._buffer.seek(4)
101+
msg = Message(self._compressor(self._buffer.read()),
102+
attributes=self._compression_attributes,
103+
magic=self._message_version)
104+
encoded = msg.encode()
105+
self._buffer.seek(4)
106+
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
107+
self._buffer.write(Int32.encode(len(encoded)))
108+
self._buffer.write(encoded)
109+
110+
# Update the message set size, and return ready for full read()
111+
size = self._buffer.tell() - 4
112+
self._buffer.seek(0)
113+
self._buffer.write(Int32.encode(size))
114+
108115
self._buffer.seek(0)
109116
self._closed = True
110117

0 commit comments

Comments
 (0)