Skip to content

Commit d9c2009

Browse files
authored
Do not reset_generation after RebalanceInProgressError; improve CommitFailed error messages (#2614)
1 parent 2f282eb commit d9c2009

File tree

2 files changed

+32
-22
lines changed

2 files changed

+32
-22
lines changed

kafka/coordinator/consumer.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -614,18 +614,19 @@ def _send_offset_commit_request(self, offsets):
614614
for tp, offset in six.iteritems(offsets):
615615
offset_data[tp.topic][tp.partition] = offset
616616

617-
if self._subscription.partitions_auto_assigned():
618-
generation = self.generation() or Generation.NO_GENERATION
617+
version = self._client.api_version(OffsetCommitRequest, max_version=6)
618+
if version > 1 and self._subscription.partitions_auto_assigned():
619+
generation = self.generation()
619620
else:
620621
generation = Generation.NO_GENERATION
621622

622623
# if the generation is None, we are not part of an active group
623624
# (and we expect to be). The only thing we can do is fail the commit
624625
# and let the user rejoin the group in poll()
625-
if self.config['api_version'] >= (0, 9) and generation is None:
626-
return Future().failure(Errors.CommitFailedError())
626+
if generation is None:
627+
log.info("Failing OffsetCommit request since the consumer is not part of an active group")
628+
return Future().failure(Errors.CommitFailedError('Group rebalance in progress'))
627629

628-
version = self._client.api_version(OffsetCommitRequest, max_version=6)
629630
if version == 0:
630631
request = OffsetCommitRequest[version](
631632
self.group_id,
@@ -747,13 +748,22 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
747748
self.coordinator_dead(error_type())
748749
future.failure(error_type(self.group_id))
749750
return
751+
elif error_type is Errors.RebalanceInProgressError:
752+
# Consumer never tries to commit offset in between join-group and sync-group,
753+
# and hence on broker-side it is not expected to see a commit offset request
754+
# during CompletingRebalance phase; if it ever happens then broker would return
755+
# this error. In this case we should just treat as a fatal CommitFailed exception.
756+
# However, we do not need to reset generations and just request re-join, such that
757+
# if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
758+
self.request_rejoin()
759+
future.failure(Errors.CommitFailedError('Group rebalance in progress'))
760+
return
750761
elif error_type in (Errors.UnknownMemberIdError,
751-
Errors.IllegalGenerationError,
752-
Errors.RebalanceInProgressError):
753-
# need to re-join group
762+
Errors.IllegalGenerationError):
763+
# need reset generation and re-join group
754764
error = error_type(self.group_id)
755-
log.debug("OffsetCommit for group %s failed: %s",
756-
self.group_id, error)
765+
log.warning("OffsetCommit for group %s failed: %s",
766+
self.group_id, error)
757767
self.reset_generation()
758768
future.failure(Errors.CommitFailedError())
759769
return

kafka/errors.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ class Cancelled(KafkaError):
2121

2222

2323
class CommitFailedError(KafkaError):
24-
def __init__(self, *args, **kwargs):
25-
super(CommitFailedError, self).__init__(
26-
"""Commit cannot be completed since the group has already
27-
rebalanced and assigned the partitions to another member.
28-
This means that the time between subsequent calls to poll()
29-
was longer than the configured max_poll_interval_ms, which
30-
typically implies that the poll loop is spending too much
31-
time message processing. You can address this either by
32-
increasing the rebalance timeout with max_poll_interval_ms,
33-
or by reducing the maximum size of batches returned in poll()
34-
with max_poll_records.
35-
""", *args, **kwargs)
24+
def __init__(self, *args):
25+
if not args:
26+
args = ("Commit cannot be completed since the group has already"
27+
" rebalanced and assigned the partitions to another member."
28+
" This means that the time between subsequent calls to poll()"
29+
" was longer than the configured max_poll_interval_ms, which"
30+
" typically implies that the poll loop is spending too much"
31+
" time message processing. You can address this either by"
32+
" increasing the rebalance timeout with max_poll_interval_ms,"
33+
" or by reducing the maximum size of batches returned in poll()"
34+
" with max_poll_records.",)
35+
super(CommitFailedError, self).__init__(*args)
3636

3737

3838
class IllegalArgumentError(KafkaError):

0 commit comments

Comments
 (0)