Skip to content

Commit 83fed59

Browse files
committed
Add size_in_bytes to ABCRecordBatch and implement for Legacy and Default
1 parent 44eb48d commit 83fed59

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

kafka/record/abc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
class ABCRecord(object):
1010
__slots__ = ()
1111

12+
@abc.abstractproperty
13+
def size_in_bytes(self):
14+
""" Number of total bytes in record
15+
"""
16+
1217
@abc.abstractproperty
1318
def offset(self):
1419
""" Absolute offset of record

kafka/record/default_records.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,10 @@ def _read_msg(
275275

276276
if self.is_control_batch:
277277
return ControlRecord(
278-
offset, timestamp, self.timestamp_type, key, value, headers)
278+
length, offset, timestamp, self.timestamp_type, key, value, headers)
279279
else:
280280
return DefaultRecord(
281-
offset, timestamp, self.timestamp_type, key, value, headers)
281+
length, offset, timestamp, self.timestamp_type, key, value, headers)
282282

283283
def __iter__(self):
284284
self._maybe_uncompress()
@@ -314,17 +314,22 @@ def validate_crc(self):
314314

315315
class DefaultRecord(ABCRecord):
316316

317-
__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
317+
__slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
318318
"_headers")
319319

320-
def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
320+
def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers):
321+
self._size_in_bytes = size_in_bytes
321322
self._offset = offset
322323
self._timestamp = timestamp
323324
self._timestamp_type = timestamp_type
324325
self._key = key
325326
self._value = value
326327
self._headers = headers
327328

329+
@property
330+
def size_in_bytes(self):
331+
return self._size_in_bytes
332+
328333
@property
329334
def offset(self):
330335
return self._offset
@@ -371,16 +376,16 @@ def __repr__(self):
371376

372377

373378
class ControlRecord(DefaultRecord):
374-
__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
379+
__slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
375380
"_headers", "_version", "_type")
376381

377382
KEY_STRUCT = struct.Struct(
378383
">h" # Current Version => Int16
379384
"h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit)
380385
)
381386

382-
def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
383-
super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers)
387+
def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers):
388+
super(ControlRecord, self).__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers)
384389
(self._version, self._type) = self.KEY_STRUCT.unpack(self._key)
385390

386391
# see https://kafka.apache.org/documentation/#controlbatch

kafka/record/legacy_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ def headers(self):
339339
def checksum(self):
340340
return self._crc
341341

342+
@property
343+
def size_in_bytes(self):
344+
return LegacyRecordBatchBuilder.estimate_size_in_bytes(self._magic, None, self._key, self._value)
345+
342346
def __repr__(self):
343347
return (
344348
"LegacyRecord(magic={!r} offset={!r}, timestamp={!r}, timestamp_type={!r},"

0 commit comments

Comments
 (0)