Skip to content

Commit df12982

Browse files
committed
remove fetcher message_generator / iterator interface
1 parent 83fed59 commit df12982

File tree

2 files changed

+0
-86
lines changed

2 files changed

+0
-86
lines changed

kafka/consumer/fetcher.py

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -407,62 +407,6 @@ def _append(self, drained, part, max_records, update_offsets):
407407
part.discard()
408408
return 0
409409

410-
def _message_generator(self):
411-
"""Iterate over fetched_records"""
412-
while self._next_partition_records or self._completed_fetches:
413-
414-
if not self._next_partition_records:
415-
completion = self._completed_fetches.popleft()
416-
self._next_partition_records = self._parse_fetched_data(completion)
417-
continue
418-
419-
# Send additional FetchRequests when the internal queue is low
420-
# this should enable moderate pipelining
421-
if len(self._completed_fetches) <= self.config['iterator_refetch_records']:
422-
self.send_fetches()
423-
424-
tp = self._next_partition_records.topic_partition
425-
426-
# We can ignore any prior signal to drop pending record batches
427-
# because we are starting from a fresh one where fetch_offset == position
428-
# i.e., the user seek()'d to this position
429-
self._subscriptions.assignment[tp].drop_pending_record_batch = False
430-
431-
for msg in self._next_partition_records.take():
432-
433-
# Because we are in a generator, it is possible for
434-
# subscription state to change between yield calls
435-
# so we need to re-check on each loop
436-
# this should catch assignment changes, pauses
437-
# and resets via seek_to_beginning / seek_to_end
438-
if not self._subscriptions.is_fetchable(tp):
439-
log.debug("Not returning fetched records for partition %s"
440-
" since it is no longer fetchable", tp)
441-
self._next_partition_records = None
442-
break
443-
444-
# If there is a seek during message iteration,
445-
# we should stop unpacking this record batch and
446-
# wait for a new fetch response that aligns with the
447-
# new seek position
448-
elif self._subscriptions.assignment[tp].drop_pending_record_batch:
449-
log.debug("Skipping remainder of record batch for partition %s", tp)
450-
self._subscriptions.assignment[tp].drop_pending_record_batch = False
451-
self._next_partition_records = None
452-
break
453-
454-
# Compressed messagesets may include earlier messages
455-
elif msg.offset < self._subscriptions.assignment[tp].position.offset:
456-
log.debug("Skipping message offset: %s (expecting %s)",
457-
msg.offset,
458-
self._subscriptions.assignment[tp].position.offset)
459-
continue
460-
461-
self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, '', -1)
462-
yield msg
463-
464-
self._next_partition_records = None
465-
466410
def _unpack_records(self, tp, records):
467411
try:
468412
batch = records.next_batch()
@@ -514,18 +458,6 @@ def _unpack_records(self, tp, records):
514458
log.exception('StopIteration raised unpacking messageset')
515459
raise RuntimeError('StopIteration raised unpacking messageset')
516460

517-
def __iter__(self): # pylint: disable=non-iterator-returned
518-
return self
519-
520-
def __next__(self):
521-
if not self._iterator:
522-
self._iterator = self._message_generator()
523-
try:
524-
return next(self._iterator)
525-
except StopIteration:
526-
self._iterator = None
527-
raise
528-
529461
def _deserialize(self, f, topic, bytes_):
530462
if not f:
531463
return bytes_

test/test_fetcher.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -463,24 +463,6 @@ def test__unpack_records(fetcher):
463463
assert records[2].offset == 2
464464

465465

466-
def test__message_generator(fetcher, topic, mocker):
467-
fetcher.config['check_crcs'] = False
468-
tp = TopicPartition(topic, 0)
469-
msgs = []
470-
for i in range(10):
471-
msgs.append((None, b"foo", None))
472-
completed_fetch = CompletedFetch(
473-
tp, 0, 0, [0, 100, _build_record_batch(msgs)],
474-
mocker.MagicMock()
475-
)
476-
fetcher._completed_fetches.append(completed_fetch)
477-
for i in range(10):
478-
msg = next(fetcher)
479-
assert isinstance(msg, ConsumerRecord)
480-
assert msg.offset == i
481-
assert msg.value == b'foo'
482-
483-
484466
def test__parse_fetched_data(fetcher, topic, mocker):
485467
fetcher.config['check_crcs'] = False
486468
tp = TopicPartition(topic, 0)

0 commit comments

Comments
 (0)