From d095bbc572c8ff70892baa9ba41e1b743275eedc Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 20:01:26 +0000 Subject: [PATCH 01/18] stub out Record class inheriting from Message --- pykafka/protocol/message.py | 46 +++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 8e28824b5..b9ad6b42c 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -170,6 +170,52 @@ def set_timestamp(self, ts): raise RuntimeError() +class Record(Message): + """ + Specification:: + + Record => + Length => varint + Attributes => int8 + TimestampDelta => varint + OffsetDelta => varint + KeyLen => varint + Key => data + ValueLen => varint + Value => data + Headers => [Header] + """ + def __init__(self, + value, + partition_key=None, + compression_type=CompressionType.NONE, + offset=-1, + partition_id=-1, + produce_attempt=0, + protocol_version=0, + timestamp=None, + delivery_report_q=None, + headers=None): + super(Record, self).__init__(value, partition_key=partition_key, + compression_type=compression_type, + offset=offset, partition_id=partition_id, + produce_attempt=produce_attempt, + protocol_version=protocol_version, + timestamp=timestamp, + delivery_report_q=delivery_report_q) + self.headers = headers + + def __len__(self): + size = 4 + 1 + 1 + 4 + 4 + if self.value is not None: + size += len(self.value) + if self.partition_key is not None: + size += len(self.partition_key) + if self.protocol_version > 0 and self.timestamp: + size += 8 + return size + + class MessageSet(Serializable): """Representation of a set of messages in Kafka From 37b6afbdd9cfce3f2421a09051e1c8f03b6a62a6 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 20:01:47 +0000 Subject: [PATCH 02/18] add basic support for varint format string to struct_helpers --- pykafka/utils/struct_helpers.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index bc7d63d3b..6599001e8 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -18,6 +18,7 @@ """ __all__ = ["unpack_from"] import itertools +import re import struct from .compat import range @@ -84,6 +85,10 @@ def _unpack(fmt, buff, offset, count=1): array_fmt += ch elif ch == '[': array_fmt = '' # starts building string for array unpack + elif ch == 'V': + len_, unpacked = unpack_varint_from(buff, offset) + items.append(unpacked) + offset += len_ else: if ch in 'SY': len_fmt = '!h' if ch == 'S' else '!i' @@ -117,3 +122,28 @@ def _unpack_array(fmt, buff, offset, count): if len(fmt) == 1: output = list(itertools.chain.from_iterable(output)) return output, offset + + +def unpack_varint_from(buff, offset): + return len(69), 69 + + +NOARG_STRUCT_FMTS = re.compile(r'[^xcbB\?hHiIlLqQfdspP]') + + +def pack_into(fmt, buff, offset, *args): + if 'V' in fmt: + fmt_parts = fmt.split("V") + for fmt_part in fmt_parts: + if fmt_part: + args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) + part_args = [args.pop(0) for _ in range(len(args_only_fmt))] + struct.pack_into(fmt_part, buff, offset, part_args) + offset += struct.calsize(fmt_part) + pack_varint_into(buff, offset, args.pop(0)) + else: + return struct.pack_into(fmt, buff, offset, *args) + + +def pack_varint_into(buff, offset, val): + pass From 51ecd6395d177c72cc6b3d37a4ec76c0bc6793e4 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 22:06:42 +0000 Subject: [PATCH 03/18] simplify format string splitting and add some basic tests for varint --- pykafka/utils/struct_helpers.py | 18 ++++++++++-------- tests/pykafka/utils/test_struct_helpers.py | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 6599001e8..72d28718a 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -125,7 +125,7 @@ def _unpack_array(fmt, buff, offset, count): def unpack_varint_from(buff, offset): - return len(69), 69 + return -1, 100 NOARG_STRUCT_FMTS = re.compile(r'[^xcbB\?hHiIlLqQfdspP]') @@ -133,17 +133,19 @@ def unpack_varint_from(buff, offset): def pack_into(fmt, buff, offset, *args): if 'V' in fmt: - fmt_parts = fmt.split("V") - for fmt_part in fmt_parts: - if fmt_part: + args = list(args) + parts = [p for p in re.split('(V)', fmt) if p] + for fmt_part in parts: + if fmt_part != "V": args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) part_args = [args.pop(0) for _ in range(len(args_only_fmt))] - struct.pack_into(fmt_part, buff, offset, part_args) - offset += struct.calsize(fmt_part) - pack_varint_into(buff, offset, args.pop(0)) + struct.pack_into(fmt_part, buff, offset, *part_args) + offset += struct.calcsize(fmt_part) + else: + offset += pack_varint_into(buff, offset, args.pop(0)) else: return struct.pack_into(fmt, buff, offset, *args) def pack_varint_into(buff, offset, val): - pass + return 1 diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index 95a46f805..b61da66a4 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -27,5 +27,24 @@ def test_array_unpacking(self): # A 1-length tuple with a 4-length tuple as the element self.assertEqual(output, [1, 2, 3, 4]) + def test_varint_simple(self): + buff = bytearray(4) + offset = 0 + val = 69 + fmt = 'V' + struct_helpers.pack_into(fmt, buff, offset, val) + output = struct_helpers.unpack_from(fmt, buff) + self.assertEqual(output, (val,)) + + def test_varint_advanced(self): + buff = bytearray(20) + offset = 0 + val = 69 + fmt = 'qVi' + struct_helpers.pack_into(fmt, buff, offset, 68, val, 420) + output = struct_helpers.unpack_from(fmt, buff) + self.assertEqual(output[1], val) + + if __name__ == '__main__': unittest2.main() From 3440d54c6ca753446bbb34edc51ce396ce3785f1 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 23:06:54 +0000 Subject: [PATCH 04/18] first pass at actual encode/decode varint impl --- pykafka/utils/struct_helpers.py | 37 +++++++++++++++++++--- tests/pykafka/utils/test_struct_helpers.py | 16 +++++++--- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 72d28718a..5ae9278ba 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -125,7 +125,18 @@ def _unpack_array(fmt, buff, offset, count): def unpack_varint_from(buff, offset): - return -1, 100 + size = 0 + shift = 0 + result = 0 + while True: + size += 1 + i = ord(buff[offset:offset + 1]) + offset += 1 + result |= (i & 0x7f) << shift + shift += 7 + if not (i & 0x80): + break + return size, result NOARG_STRUCT_FMTS = re.compile(r'[^xcbB\?hHiIlLqQfdspP]') @@ -133,6 +144,7 @@ def unpack_varint_from(buff, offset): def pack_into(fmt, buff, offset, *args): if 'V' in fmt: + size = 0 args = list(args) parts = [p for p in re.split('(V)', fmt) if p] for fmt_part in parts: @@ -140,12 +152,29 @@ def pack_into(fmt, buff, offset, *args): args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) part_args = [args.pop(0) for _ in range(len(args_only_fmt))] struct.pack_into(fmt_part, buff, offset, *part_args) - offset += struct.calcsize(fmt_part) + fmtsize = struct.calcsize(fmt_part) + offset += fmtsize + size += fmtsize else: - offset += pack_varint_into(buff, offset, args.pop(0)) + fmtsize = pack_varint_into(buff, offset, args.pop(0)) + offset += fmtsize + size += fmtsize + return size else: return struct.pack_into(fmt, buff, offset, *args) def pack_varint_into(buff, offset, val): - return 1 + size = 0 + while True: + towrite = val & 0x7f + val >>= 7 + size += 1 + if val: + struct.pack_into('c', buff, offset, chr(towrite | 0x80)) + offset += 1 + else: + struct.pack_into('c', buff, offset, chr(towrite)) + offset += 1 + break + return size diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index b61da66a4..b20b6c717 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -27,6 +27,14 @@ def test_array_unpacking(self): # A 1-length tuple with a 4-length tuple as the element self.assertEqual(output, [1, 2, 3, 4]) + def test_varint_encode(self): + buff = bytearray(2) + val = 300 + expected = b'\xac\x02' + size = struct_helpers.pack_into("V", buff, 0, val) + self.assertEqual(size, 2) + self.assertEqual(buff, expected) + def test_varint_simple(self): buff = bytearray(4) offset = 0 @@ -39,11 +47,11 @@ def test_varint_simple(self): def test_varint_advanced(self): buff = bytearray(20) offset = 0 - val = 69 - fmt = 'qVi' - struct_helpers.pack_into(fmt, buff, offset, 68, val, 420) + vals = [68, 69, 420, 30001] + fmt = 'qViV' + struct_helpers.pack_into(fmt, buff, offset, *vals) output = struct_helpers.unpack_from(fmt, buff) - self.assertEqual(output[1], val) + self.assertEqual(output, vals) if __name__ == '__main__': From 57cacf7a465fc4912b32bd1b4f471c64943c686c Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 23:08:46 +0000 Subject: [PATCH 05/18] fix endianness bug --- pykafka/utils/struct_helpers.py | 4 ++-- tests/pykafka/utils/test_struct_helpers.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 5ae9278ba..a31508969 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -151,8 +151,8 @@ def pack_into(fmt, buff, offset, *args): if fmt_part != "V": args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) part_args = [args.pop(0) for _ in range(len(args_only_fmt))] - struct.pack_into(fmt_part, buff, offset, *part_args) - fmtsize = struct.calcsize(fmt_part) + struct.pack_into("!" + fmt_part, buff, offset, *part_args) + fmtsize = struct.calcsize("!" + fmt_part) offset += fmtsize size += fmtsize else: diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index b20b6c717..de81d368e 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -47,7 +47,7 @@ def test_varint_simple(self): def test_varint_advanced(self): buff = bytearray(20) offset = 0 - vals = [68, 69, 420, 30001] + vals = (68, 69, 420, 30001) fmt = 'qViV' struct_helpers.pack_into(fmt, buff, offset, *vals) output = struct_helpers.unpack_from(fmt, buff) From d79f01dd2e29e38e81a4ffd3fa9e637c86d0e78c Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 23:10:01 +0000 Subject: [PATCH 06/18] complicate test case --- tests/pykafka/utils/test_struct_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index de81d368e..34c789056 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -47,8 +47,8 @@ def test_varint_simple(self): def test_varint_advanced(self): buff = bytearray(20) offset = 0 - vals = (68, 69, 420, 30001) - fmt = 'qViV' + vals = (68, 69, 420, 30001, 3987533) + fmt = 'qViVV' struct_helpers.pack_into(fmt, buff, offset, *vals) output = struct_helpers.unpack_from(fmt, buff) self.assertEqual(output, vals) From aa898e97090a743dae69fa79455e9683bec6ff7e Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 26 Jul 2018 23:14:26 +0000 Subject: [PATCH 07/18] test for endianness prefix --- pykafka/utils/struct_helpers.py | 7 ++++--- tests/pykafka/utils/test_struct_helpers.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index a31508969..0e4f95560 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -147,12 +147,13 @@ def pack_into(fmt, buff, offset, *args): size = 0 args = list(args) parts = [p for p in re.split('(V)', fmt) if p] - for fmt_part in parts: + for i, fmt_part in enumerate(parts): if fmt_part != "V": args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) part_args = [args.pop(0) for _ in range(len(args_only_fmt))] - struct.pack_into("!" + fmt_part, buff, offset, *part_args) - fmtsize = struct.calcsize("!" + fmt_part) + prefixed = "!" + fmt_part if fmt.startswith("!") and i != 0 else fmt_part + struct.pack_into(prefixed, buff, offset, *part_args) + fmtsize = struct.calcsize(prefixed) offset += fmtsize size += fmtsize else: diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index 34c789056..1d84ec629 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -48,7 +48,7 @@ def test_varint_advanced(self): buff = bytearray(20) offset = 0 vals = (68, 69, 420, 30001, 3987533) - fmt = 'qViVV' + fmt = '!qViVV' struct_helpers.pack_into(fmt, buff, offset, *vals) output = struct_helpers.unpack_from(fmt, buff) self.assertEqual(output, vals) From 277120de2c1c749f7422f32fc72055a428a73bdb Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 30 Jul 2018 23:20:28 +0000 Subject: [PATCH 08/18] start implementing Record.pack_into --- pykafka/protocol/message.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 25dfec4b7..cce438ad8 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -207,14 +207,25 @@ def __init__(self, self.headers = headers def __len__(self): - size = 4 + 1 + 1 + 4 + 4 - if self.value is not None: - size += len(self.value) - if self.partition_key is not None: - size += len(self.partition_key) - if self.protocol_version > 0 and self.timestamp: - size += 8 - return size + return 0 + + def pack_into(self, buff, offset, base_timestamp=0, base_offset=0): + total_size = 0 + fmt = '!cVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) + args = (0, self.timestamp - base_timestamp, self.offset - base_offset, + len(self.partition_key), self.partition_key, len(self.value), + self.value, len(self.headers)) + # XXX offset needs to be moved up here to make room in front for Length + size = struct_helpers.pack_into(fmt, buff, offset, *args) + total_size += size + offset += size + for hkey, hval in self.headers: + fmt = '!V%dsV%ds' % (len(hkey), len(hval)) + args = (len(hkey), hkey, len(hval), hval) + size = struct_helpers.pack_into(fmt, buff, offset, *args) + total_size += size + offset += size + # TODO calculate and pack Length to beginning of message class MessageSet(Serializable): From 98c252223ad5b4143cf08741571971650bfc48d6 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 21:07:45 +0000 Subject: [PATCH 09/18] split calcsize functionality out from pack_into for varints --- pykafka/protocol/message.py | 3 +- pykafka/utils/struct_helpers.py | 50 ++++++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index cce438ad8..91067f899 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -1,6 +1,7 @@ # - coding: utf-8 - -from datetime import datetime +import math import struct +from datetime import datetime from pkg_resources import parse_version from six import integer_types from zlib import crc32 diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 0e4f95560..7caa2676b 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -18,6 +18,7 @@ """ __all__ = ["unpack_from"] import itertools +import math import re import struct from .compat import range @@ -139,30 +140,51 @@ def unpack_varint_from(buff, offset): return size, result +def _split_struct_fmt(fmt, args): + if 'V' in fmt: + for i, fmt_part in enumerate([p for p in re.split('(V)', fmt) if p]): + if fmt_part != "V": + args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) + part_args = [args.pop(0) for _ in range(len(args_only_fmt))] + prefixed = "!" + fmt_part if fmt.startswith("!") and i != 0 else fmt_part + yield prefixed, part_args + else: + yield fmt_part, [args.pop(0)] + else: + yield fmt, args + + NOARG_STRUCT_FMTS = re.compile(r'[^xcbB\?hHiIlLqQfdspP]') def pack_into(fmt, buff, offset, *args): + if 'V' in fmt: + args = list(args) + for part_fmt, part_args in _split_struct_fmt(fmt, args): + if part_fmt != "V": + struct.pack_into(part_fmt, buff, offset, *part_args) + fmtsize = struct.calcsize(part_fmt) + else: + pack_varint_into(buff, offset, *part_args) + fmtsize = calcsize(part_fmt, *part_args) + offset += fmtsize + else: + return struct.pack_into(fmt, buff, offset, *args) + + +def calcsize(fmt, *args): if 'V' in fmt: size = 0 args = list(args) - parts = [p for p in re.split('(V)', fmt) if p] - for i, fmt_part in enumerate(parts): - if fmt_part != "V": - args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) - part_args = [args.pop(0) for _ in range(len(args_only_fmt))] - prefixed = "!" + fmt_part if fmt.startswith("!") and i != 0 else fmt_part - struct.pack_into(prefixed, buff, offset, *part_args) - fmtsize = struct.calcsize(prefixed) - offset += fmtsize - size += fmtsize + for part_fmt, part_args in _split_struct_fmt(fmt, args): + if part_fmt != "V": + size += struct.calcsize(part_fmt) else: - fmtsize = pack_varint_into(buff, offset, args.pop(0)) - offset += fmtsize - size += fmtsize + for arg in part_args: + size += math.ceil(math.log(arg, 128)) return size else: - return struct.pack_into(fmt, buff, offset, *args) + return struct.calcsize(fmt) def pack_varint_into(buff, offset, val): From 98a8c71227edb423208d2fbf34fff54774bc01c6 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 21:40:16 +0000 Subject: [PATCH 10/18] flesh out Record.__len__ and use it in Record.pack_into --- pykafka/protocol/message.py | 43 +++++++++++++++++++++++---------- pykafka/utils/struct_helpers.py | 6 ++++- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 91067f899..51f67dbdc 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -1,5 +1,4 @@ # - coding: utf-8 - -import math import struct from datetime import datetime from pkg_resources import parse_version @@ -207,26 +206,44 @@ def __init__(self, delivery_report_q=delivery_report_q) self.headers = headers + @property + def timestamp_delta(self, base_timestamp=0): + return self.timestamp - base_timestamp + + @property + def offset_delta(self, base_offset=0): + return self.offset - base_offset + def __len__(self): - return 0 + size = 1 + size += struct_helpers.get_varint_size(self.timestamp_delta) + size += struct_helpers.get_varint_size(self.offset_delta) + if self.partition_key is not None: + size += struct_helpers.get_varint_size(len(self.partition_key)) + size += len(self.partition_key) + if self.value is not None: + size += struct_helpers.get_varint_size(len(self.value)) + size += len(self.value) + size += 4 + for hkey, hval in self.headers: + size += struct_helpers.get_varint_size(len(hkey)) + size += len(hkey) + size += struct_helpers.get_varint_size(len(hval)) + size += len(hval) + return size def pack_into(self, buff, offset, base_timestamp=0, base_offset=0): - total_size = 0 - fmt = '!cVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) - args = (0, self.timestamp - base_timestamp, self.offset - base_offset, + fmt = '!VcVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) + args = (len(self), 0, self.timestamp_delta, self.offset_delta, len(self.partition_key), self.partition_key, len(self.value), self.value, len(self.headers)) - # XXX offset needs to be moved up here to make room in front for Length - size = struct_helpers.pack_into(fmt, buff, offset, *args) - total_size += size - offset += size + struct_helpers.pack_into(fmt, buff, offset, *args) + offset += struct_helpers.calcsize(fmt, *args) for hkey, hval in self.headers: fmt = '!V%dsV%ds' % (len(hkey), len(hval)) args = (len(hkey), hkey, len(hval), hval) - size = struct_helpers.pack_into(fmt, buff, offset, *args) - total_size += size - offset += size - # TODO calculate and pack Length to beginning of message + struct_helpers.pack_into(fmt, buff, offset, *args) + offset += struct_helpers.calcsize(fmt, *args) class MessageSet(Serializable): diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 7caa2676b..6effb628e 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -181,12 +181,16 @@ def calcsize(fmt, *args): size += struct.calcsize(part_fmt) else: for arg in part_args: - size += math.ceil(math.log(arg, 128)) + size += get_varint_size(arg) return size else: return struct.calcsize(fmt) +def get_varint_size(val): + return math.ceil(math.log(val, 128)) + + def pack_varint_into(buff, offset, val): size = 0 while True: From 0f6e1d2207973e745ae32cd5ed15a1b3e93f5d78 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 22:12:36 +0000 Subject: [PATCH 11/18] stub out Record.decode() --- pykafka/protocol/message.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 51f67dbdc..6403c8775 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -186,6 +186,8 @@ class Record(Message): Value => data Headers => [Header] """ + __slots__ = Message.__slots__ + ["headers"] + def __init__(self, value, partition_key=None, @@ -206,11 +208,9 @@ def __init__(self, delivery_report_q=delivery_report_q) self.headers = headers - @property def timestamp_delta(self, base_timestamp=0): return self.timestamp - base_timestamp - @property def offset_delta(self, base_offset=0): return self.offset - base_offset @@ -222,21 +222,32 @@ def __len__(self): size += struct_helpers.get_varint_size(len(self.partition_key)) size += len(self.partition_key) if self.value is not None: - size += struct_helpers.get_varint_size(len(self.value)) - size += len(self.value) + size += struct_helpers.get_varint_size(len(self.value)) + len(self.value) size += 4 for hkey, hval in self.headers: - size += struct_helpers.get_varint_size(len(hkey)) - size += len(hkey) - size += struct_helpers.get_varint_size(len(hval)) - size += len(hval) + size += struct_helpers.get_varint_size(len(hkey)) + len(hkey) + size += struct_helpers.get_varint_size(len(hval)) + len(hval) return size + @classmethod + def decode(self, buff, base_timestamp=0, base_offset=0, partition_id=-1, + compression_type=CompressionType.NONE): + (length, attr, timestamp_delta, + offset_delta, partition_key, value, + headers) = struct_helpers.unpack_from('VBVVGG [GG]', buff, 0) + return Record(value, + partition_key=partition_key, + compression_type=compression_type, + offset=base_offset + offset_delta, + protocol_version=1, # XXX + timestamp=base_timestamp + timestamp_delta, + partition_id=partition_id) + def pack_into(self, buff, offset, base_timestamp=0, base_offset=0): - fmt = '!VcVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) - args = (len(self), 0, self.timestamp_delta, self.offset_delta, - len(self.partition_key), self.partition_key, len(self.value), - self.value, len(self.headers)) + fmt = '!VBVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) + args = (len(self), 0, self.timestamp_delta(base_timestamp), + self.offset_delta(base_offset), len(self.partition_key), + self.partition_key, len(self.value), self.value, len(self.headers)) struct_helpers.pack_into(fmt, buff, offset, *args) offset += struct_helpers.calcsize(fmt, *args) for hkey, hval in self.headers: From 478e22ea4ff54e1a75a37ebb32805afa9be25365 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 22:12:50 +0000 Subject: [PATCH 12/18] handle G as a varint-prefixed string format character --- pykafka/utils/struct_helpers.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 6effb628e..4ca551d70 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -57,6 +57,9 @@ def unpack_from(fmt, buff, offset=0): return output +BYTES_PREFIXES = {'S': '!h', 'Y': '!i', 'G': 'V'} + + def _unpack(fmt, buff, offset, count=1): """Recursive call for unpacking @@ -91,10 +94,10 @@ def _unpack(fmt, buff, offset, count=1): items.append(unpacked) offset += len_ else: - if ch in 'SY': - len_fmt = '!h' if ch == 'S' else '!i' - len_ = struct.unpack_from(len_fmt, buff, offset)[0] - offset += struct.calcsize(len_fmt) + if ch in 'SYG': + len_fmt = BYTES_PREFIXES[ch] + len_ = unpack_from(len_fmt, buff, offset)[0] + offset += calcsize(len_fmt, len_) if len_ == -1: items.append(None) continue From dde6419da764f87a26fb92ca57935a1ce42809da Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 22:13:06 +0000 Subject: [PATCH 13/18] use B instead of c for single-byte packing --- pykafka/utils/struct_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index 4ca551d70..00407e1ba 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -201,10 +201,10 @@ def pack_varint_into(buff, offset, val): val >>= 7 size += 1 if val: - struct.pack_into('c', buff, offset, chr(towrite | 0x80)) + struct.pack_into('B', buff, offset, chr(towrite | 0x80)) offset += 1 else: - struct.pack_into('c', buff, offset, chr(towrite)) + struct.pack_into('B', buff, offset, chr(towrite)) offset += 1 break return size From 5dbc6e6c4e6753ff946c173057b8b143cf2711d8 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 31 Jul 2018 22:57:42 +0000 Subject: [PATCH 14/18] stub RecordBatch and the pack_into method --- pykafka/protocol/message.py | 56 +++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 6403c8775..b4e49abfc 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -383,3 +383,59 @@ def pack_into(self, buff, offset): offset += 12 message.pack_into(buff, offset) offset += mlen + + +class RecordBatch(Serializable): + """Representation of a Kafka RecordBatch + + Specification:: + + RecordBatch => + FirstOffset => int64 + Length => int32 + PartitionLeaderEpoch => int32 + Magic => int8 + CRC => int32 + Attributes => int16 + LastOffsetDelta => int32 + FirstTimestamp => int64 + MaxTimestamp => int64 + ProducerId => int64 + ProducerEpoch => int16 + FirstSequence => int32 + Records => [Record] + """ + def __init__(self, records=None, compression_type=CompressionType.NONE): + self.compression_type = compression_type + self._records = records or [] + + def __len__(self): + pass + + @property + def records(self): + self._compressed = None + return self._records + + def _get_compressed(self): + pass + + @classmethod + def decode(cls, buff, partition_id=-1): + pass + + def pack_into(self, buff, offset): + if self.compression_type == CompressionType.NONE: + records = self._records + else: + raise NotImplementedError() + offset = 0 + fmt = '!qiiBihiqqqhii' + # NB these -1s are for currently unsupported fields introduced in 0.11 + # XXX "magicbyte" aka protocol version (2) should be a class attribute + args = (-1, len(self), -1, 2, crc, attr, -1, -1, -1, -1, -1, -1, len(records)) + struct.pack_into(fmt, buff, offset, *args) + offset += struct.calcsize(fmt) + for record in records: + record.pack_into(buff, offset) + offset += len(record) From c312f67c54b050e685763752457b45f50211eceb Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 9 Aug 2018 22:07:07 +0000 Subject: [PATCH 15/18] handle the addition of messages to a RecordBatch --- pykafka/protocol/message.py | 42 +++++++++++++++++++++++++++++-------- pykafka/protocol/produce.py | 2 ++ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index b4e49abfc..d1e79a0d2 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -7,7 +7,7 @@ from ..common import CompressionType, Message from ..exceptions import MessageSetDecodeFailure -from ..utils import Serializable, struct_helpers, compression +from ..utils import Serializable, struct_helpers, compression, msg_protocol_version from ..utils.compat import buffer @@ -195,7 +195,7 @@ def __init__(self, offset=-1, partition_id=-1, produce_attempt=0, - protocol_version=0, + protocol_version=2, timestamp=None, delivery_report_q=None, headers=None): @@ -311,6 +311,9 @@ def messages(self): self._compressed = None return self._messages + def add_message(self, message): + self.messages.append(message) + def _get_compressed(self): """Get a compressed representation of all current messages. @@ -385,7 +388,7 @@ def pack_into(self, buff, offset): offset += mlen -class RecordBatch(Serializable): +class RecordBatch(MessageSet): """Representation of a Kafka RecordBatch Specification:: @@ -405,9 +408,16 @@ class RecordBatch(Serializable): FirstSequence => int32 Records => [Record] """ - def __init__(self, records=None, compression_type=CompressionType.NONE): - self.compression_type = compression_type - self._records = records or [] + def __init__(self, records=None, compression_type=CompressionType.NONE, + broker_version='0.9.0'): + super(RecordBatch, self).__init__(messages=records, + broker_version=broker_version, + compression_type=compression_type) + self.protocol_version = msg_protocol_version(broker_version) + self.first_offset = -1 + self.last_offset_delta = -1 + self.first_timestamp = -1 + self.max_timestamp = -1 def __len__(self): pass @@ -415,7 +425,19 @@ def __len__(self): @property def records(self): self._compressed = None - return self._records + return self._messages + + def add_message(self, message): + self._messages.append(message) + if message.offset < self.first_offset: + self.first_offset = message.offset + msg_offset_delta = message.offset - self.first_offset + if msg_offset_delta > self.last_offset_delta: + self.last_offset_delta = msg_offset_delta + if message.timestamp < self.first_timestamp: + self.first_timestamp = message.timestamp + if message.timestamp > self.max_timestamp: + self.max_timestamp = message.timestamp def _get_compressed(self): pass @@ -429,11 +451,13 @@ def pack_into(self, buff, offset): records = self._records else: raise NotImplementedError() + attr = self.compression_type offset = 0 fmt = '!qiiBihiqqqhii' # NB these -1s are for currently unsupported fields introduced in 0.11 - # XXX "magicbyte" aka protocol version (2) should be a class attribute - args = (-1, len(self), -1, 2, crc, attr, -1, -1, -1, -1, -1, -1, len(records)) + args = (self.first_offset, len(self), -1, self.protocol_version, crc, attr, + self.last_offset_delta, self.first_timestamp, self.max_timestamp, -1, -1, + -1, len(records)) struct.pack_into(fmt, buff, offset, *args) offset += struct.calcsize(fmt) for record in records: diff --git a/pykafka/protocol/produce.py b/pykafka/protocol/produce.py index 3ea49715f..de08904ac 100644 --- a/pykafka/protocol/produce.py +++ b/pykafka/protocol/produce.py @@ -75,6 +75,8 @@ def add_message(self, message, topic_name, partition_id): :param topic_name: the name of the topic to publish to :param partition_id: the partition to publish to """ + # TODO this needs to use a method on MessageSet + # self.msets[topic_name][partition_id].add_message(message) self.msets[topic_name][partition_id].messages.append(message) self._message_count += 1 From e1a45296cf4e5f6412e0bd5cac402554bef581b8 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 9 Aug 2018 22:23:10 +0000 Subject: [PATCH 16/18] first attempt at length and crc calculations for recordbatch --- pykafka/protocol/message.py | 31 +++++++++++++++++++++++++------ pykafka/protocol/produce.py | 4 +--- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index d1e79a0d2..c6dd436d1 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -420,7 +420,16 @@ def __init__(self, records=None, compression_type=CompressionType.NONE, self.max_timestamp = -1 def __len__(self): - pass + if self.compression_type == CompressionType.NONE: + messages = self._messages + else: + if self._compressed is None: + self._compressed = self._get_compressed() + messages = [self._compressed] + length = 8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4 + # XXX does this logic still apply to RecordBatch? + length += sum(len(m) for m in messages) + return length @property def records(self): @@ -453,13 +462,23 @@ def pack_into(self, buff, offset): raise NotImplementedError() attr = self.compression_type offset = 0 - fmt = '!qiiBihiqqqhii' - # NB these -1s are for currently unsupported fields introduced in 0.11 - args = (self.first_offset, len(self), -1, self.protocol_version, crc, attr, - self.last_offset_delta, self.first_timestamp, self.max_timestamp, -1, -1, - -1, len(records)) + fmt = '!qiiB' + args = (self.first_offset, len(self), -1, self.protocol_version) struct.pack_into(fmt, buff, offset, *args) offset += struct.calcsize(fmt) + + crc_offset = offset + fmt = '!hiqqqhii' + args = (attr, self.last_offset_delta, self.first_timestamp, self.max_timestamp, + # NB these -1s are for currently unsupported fields introduced in 0.11 + -1, -1, -1, len(records)) + struct.pack_into(fmt, buff, offset + 4, *args) + offset += struct.calcsize(fmt) + 4 for record in records: record.pack_into(buff, offset) offset += len(record) + end_offset = offset + + data = buffer(buff[(crc_offset + 4):end_offset]) + crc = crc32(data) & 0xffffffff + struct.pack_into('!I', buff, crc_offset, crc) diff --git a/pykafka/protocol/produce.py b/pykafka/protocol/produce.py index de08904ac..30d845ea0 100644 --- a/pykafka/protocol/produce.py +++ b/pykafka/protocol/produce.py @@ -75,9 +75,7 @@ def add_message(self, message, topic_name, partition_id): :param topic_name: the name of the topic to publish to :param partition_id: the partition to publish to """ - # TODO this needs to use a method on MessageSet - # self.msets[topic_name][partition_id].add_message(message) - self.msets[topic_name][partition_id].messages.append(message) + self.msets[topic_name][partition_id].add_message(message) self._message_count += 1 def get_bytes(self): From 32f5237b4f0462aa75c911612afb89a1bb2c16e2 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 9 Aug 2018 22:54:14 +0000 Subject: [PATCH 17/18] first implementation of RecordBatch.decode --- pykafka/protocol/message.py | 43 ++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index c6dd436d1..5a6e19f84 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -408,16 +408,20 @@ class RecordBatch(MessageSet): FirstSequence => int32 Records => [Record] """ - def __init__(self, records=None, compression_type=CompressionType.NONE, - broker_version='0.9.0'): - super(RecordBatch, self).__init__(messages=records, + def __init__(self, messages=None, compression_type=CompressionType.NONE, + broker_version='0.9.0', first_offset=-1, last_offset_delta=-1, + first_timestamp=-1, max_timestamp=-1, protocol_version=None): + super(RecordBatch, self).__init__(messages=messages, broker_version=broker_version, compression_type=compression_type) - self.protocol_version = msg_protocol_version(broker_version) - self.first_offset = -1 - self.last_offset_delta = -1 - self.first_timestamp = -1 - self.max_timestamp = -1 + if protocol_version is not None: + self.protocol_version = protocol_version + else: + self.protocol_version = msg_protocol_version(broker_version) + self.first_offset = first_offset + self.last_offset_delta = last_offset_delta + self.first_timestamp = first_timestamp + self.max_timestamp = max_timestamp def __len__(self): if self.compression_type == CompressionType.NONE: @@ -453,7 +457,25 @@ def _get_compressed(self): @classmethod def decode(cls, buff, partition_id=-1): - pass + offset = 0 + fmt = '!qiiBihiqqqhii' + (first_offset, _, _, protocol_version, _, attr, last_offset_delta, + first_timestamp, max_timestamp, _, _, _, + records_count) = struct_helpers.unpack_from(fmt, buff, offset) + offset += struct.calcsize(fmt) + + messages = [] + while offset < len(buff): + size = struct.unpack_from('V', buff, offset) + message = Record.decode(buff[offset:offset + size], + partition_id=partition_id) + messages.append(message) + offset += size + + return RecordBatch(messages=messages, first_offset=first_offset, + protocol_version=protocol_version, compression_type=attr, + last_offset_delta=last_offset_delta, + first_timestamp=first_timestamp, max_timestamp=max_timestamp) def pack_into(self, buff, offset): if self.compression_type == CompressionType.NONE: @@ -475,7 +497,8 @@ def pack_into(self, buff, offset): struct.pack_into(fmt, buff, offset + 4, *args) offset += struct.calcsize(fmt) + 4 for record in records: - record.pack_into(buff, offset) + record.pack_into(buff, offset, base_timestamp=self.first_timestamp, + base_offset=self.first_offset) offset += len(record) end_offset = offset From 7370c9d03b017e7925cde747a3987a23ffbf321e Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 9 Aug 2018 23:11:35 +0000 Subject: [PATCH 18/18] rough ideation for recordbatch compression --- pykafka/protocol/message.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 5a6e19f84..262953cc4 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -453,7 +453,24 @@ def add_message(self, message): self.max_timestamp = message.timestamp def _get_compressed(self): - pass + assert self.compression_type != CompressionType.NONE + tmp_mset = RecordBatch(messages=self._messages) + uncompressed = bytearray(len(tmp_mset)) + tmp_mset.pack_record_array_into(uncompressed, 0) + if self.compression_type == CompressionType.GZIP: + compressed = compression.encode_gzip(buffer(uncompressed)) + elif self.compression_type == CompressionType.SNAPPY: + compressed = compression.encode_snappy(buffer(uncompressed)) + elif self.compression_type == CompressionType.LZ4: + if parse_version(self._broker_version) >= parse_version('0.10.0'): + compressed = compression.encode_lz4(buffer(uncompressed)) + else: + compressed = compression.encode_lz4_old_kafka(buffer(uncompressed)) + else: + raise TypeError("Unknown compression: %s" % self.compression_type) + protocol_version = max((m.protocol_version for m in self._messages)) + return Record(compressed, compression_type=self.compression_type, + protocol_version=protocol_version) @classmethod def decode(cls, buff, partition_id=-1): @@ -481,7 +498,9 @@ def pack_into(self, buff, offset): if self.compression_type == CompressionType.NONE: records = self._records else: - raise NotImplementedError() + if self._compressed is None: + self._compressed = self._get_compressed() + records = [self._compressed] attr = self.compression_type offset = 0 fmt = '!qiiB' @@ -496,6 +515,8 @@ def pack_into(self, buff, offset): -1, -1, -1, len(records)) struct.pack_into(fmt, buff, offset + 4, *args) offset += struct.calcsize(fmt) + 4 + + # TODO replace this with call to pack_record_array_into for record in records: record.pack_into(buff, offset, base_timestamp=self.first_timestamp, base_offset=self.first_offset) @@ -505,3 +526,7 @@ def pack_into(self, buff, offset): data = buffer(buff[(crc_offset + 4):end_offset]) crc = crc32(data) & 0xffffffff struct.pack_into('!I', buff, crc_offset, crc) + + def pack_record_array_into(self, buff, offset): + """Pack only the array of Records, ignoring headers""" + pass