Skip to content

Commit c763939

Browse files
committed
Add record.validate_crc() for v0/v1 crc checks
1 parent df12982 commit c763939

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

kafka/record/abc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ def checksum(self):
4545
be the checksum for v0 and v1 and None for v2 and above.
4646
"""
4747

48+
@abc.abstractmethod
49+
def validate_crc(self):
50+
""" Return True if v0/v1 record matches checksum. noop/True for v2 records
51+
"""
52+
4853
@abc.abstractproperty
4954
def headers(self):
5055
""" If supported by version list of key-value tuples, or empty list if

kafka/record/default_records.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,9 @@ def headers(self):
366366
def checksum(self):
367367
return None
368368

369+
def validate_crc(self):
370+
return True
371+
369372
def __repr__(self):
370373
return (
371374
"DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"

kafka/record/legacy_records.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ def _read_key_value(self, pos):
236236
value = self._buffer[pos:pos + value_size].tobytes()
237237
return key, value
238238

239+
def _crc_bytes(self, msg_pos, length):
240+
return self._buffer[msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length]
241+
239242
def __iter__(self):
240243
if self._magic == 1:
241244
key_offset = self.KEY_OFFSET_V1
@@ -259,7 +262,7 @@ def __iter__(self):
259262
absolute_base_offset = -1
260263

261264
for header, msg_pos in headers:
262-
offset, _, crc, _, attrs, timestamp = header
265+
offset, length, crc, _, attrs, timestamp = header
263266
# There should only ever be a single layer of compression
264267
assert not attrs & self.CODEC_MASK, (
265268
'MessageSet at offset %d appears double-compressed. This '
@@ -275,29 +278,32 @@ def __iter__(self):
275278
offset += absolute_base_offset
276279

277280
key, value = self._read_key_value(msg_pos + key_offset)
281+
crc_bytes = self._crc_bytes(msg_pos, length)
278282
yield LegacyRecord(
279283
self._magic, offset, timestamp, timestamp_type,
280-
key, value, crc)
284+
key, value, crc, crc_bytes)
281285
else:
282286
key, value = self._read_key_value(key_offset)
287+
crc_bytes = self._crc_bytes(0, len(self._buffer) - self.LOG_OVERHEAD)
283288
yield LegacyRecord(
284289
self._magic, self._offset, self._timestamp, timestamp_type,
285-
key, value, self._crc)
290+
key, value, self._crc, crc_bytes)
286291

287292

288293
class LegacyRecord(ABCRecord):
289294

290295
__slots__ = ("_magic", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
291-
"_crc")
296+
"_crc", "_crc_bytes")
292297

293-
def __init__(self, magic, offset, timestamp, timestamp_type, key, value, crc):
298+
def __init__(self, magic, offset, timestamp, timestamp_type, key, value, crc, crc_bytes):
294299
self._magic = magic
295300
self._offset = offset
296301
self._timestamp = timestamp
297302
self._timestamp_type = timestamp_type
298303
self._key = key
299304
self._value = value
300305
self._crc = crc
306+
self._crc_bytes = crc_bytes
301307

302308
@property
303309
def magic(self):
@@ -339,6 +345,10 @@ def headers(self):
339345
def checksum(self):
340346
return self._crc
341347

348+
def validate_crc(self):
349+
crc = calc_crc32(self._crc_bytes)
350+
return self._crc == crc
351+
342352
@property
343353
def size_in_bytes(self):
344354
return LegacyRecordBatchBuilder.estimate_size_in_bytes(self._magic, None, self._key, self._value)

0 commit comments

Comments
 (0)