Skip to content

Commit 77971ac

Browse files
authored
Merge pull request #752 from dpkp/compacted_offsets
Fix consumer iteration on compacted topics
2 parents 58991c5 + 7b5ade1 commit 77971ac

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

kafka/consumer/fetcher.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,12 @@ def _message_generator(self):
424424
elif fetch_offset == position:
425425
log.log(0, "Returning fetched records at offset %d for assigned"
426426
" partition %s", position, tp)
427+
428+
# We can ignore any prior signal to drop pending message sets
429+
# because we are starting from a fresh one where fetch_offset == position
430+
# i.e., the user seek()'d to this position
431+
self._subscriptions.assignment[tp].drop_pending_message_set = False
432+
427433
for msg in self._unpack_message_set(tp, messages):
428434

429435
# Because we are in a generator, it is possible for
@@ -436,9 +442,17 @@ def _message_generator(self):
436442
" since it is no longer fetchable", tp)
437443
break
438444

445+
# If there is a seek during message iteration,
446+
# we should stop unpacking this message set and
447+
# wait for a new fetch response that aligns with the
448+
# new seek position
449+
elif self._subscriptions.assignment[tp].drop_pending_message_set:
450+
log.debug("Skipping remainder of message set for partition %s", tp)
451+
self._subscriptions.assignment[tp].drop_pending_message_set = False
452+
break
453+
439454
# Compressed messagesets may include earlier messages
440-
# It is also possible that the user called seek()
441-
elif msg.offset != self._subscriptions.assignment[tp].position:
455+
elif msg.offset < self._subscriptions.assignment[tp].position:
442456
log.debug("Skipping message offset: %s (expecting %s)",
443457
msg.offset,
444458
self._subscriptions.assignment[tp].position)

kafka/consumer/subscription_state.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ def __init__(self):
350350
self.reset_strategy = None # the reset strategy if awaitingReset is set
351351
self._position = None # offset exposed to the user
352352
self.highwater = None
353+
self.drop_pending_message_set = False
353354

354355
def _set_position(self, offset):
355356
assert self.has_valid_position, 'Valid position required'
@@ -371,6 +372,7 @@ def seek(self, offset):
371372
self.awaiting_reset = False
372373
self.reset_strategy = None
373374
self.has_valid_position = True
375+
self.drop_pending_message_set = True
374376

375377
def pause(self):
376378
self.paused = True

0 commit comments

Comments
 (0)