Skip to content

Commit 585a111

Browse files
committed
MemoryRecordsBuilder: support arbitrary offset, skipping offsets
1 parent c763939 commit 585a111

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

kafka/record/memory_records.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class MemoryRecordsBuilder(object):
115115
__slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
116116
"_bytes_written")
117117

118-
def __init__(self, magic, compression_type, batch_size):
118+
def __init__(self, magic, compression_type, batch_size, offset=0):
119119
assert magic in [0, 1, 2], "Not supported magic"
120120
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
121121
if magic >= 2:
@@ -130,10 +130,14 @@ def __init__(self, magic, compression_type, batch_size):
130130
self._batch_size = batch_size
131131
self._buffer = None
132132

133-
self._next_offset = 0
133+
self._next_offset = offset
134134
self._closed = False
135135
self._bytes_written = 0
136136

137+
def skip(self, offsets_to_skip):
138+
# Exposed for testing compacted records
139+
self._next_offset += offsets_to_skip
140+
137141
def append(self, timestamp, key, value, headers=[]):
138142
""" Append a message to the buffer.
139143

0 commit comments

Comments
 (0)