@@ -54,7 +54,7 @@ def __init__(self, client, subscription, metrics, **configs):
5454 auto_commit_interval_ms (int): milliseconds between automatic
5555 offset commits, if enable_auto_commit is True. Default: 5000.
5656 default_offset_commit_callback (callable): called as
57- callback(offsets, exception ) response will be either an Exception
57+ callback(offsets, response ) response will be either an Exception
5858 or None. This callback can be used to trigger custom actions when
5959 a commit request completes.
6060 assignors (list): List of objects to use to distribute partition
@@ -453,8 +453,8 @@ def close(self, autocommit=True, timeout_ms=None):
453453
454454 def _invoke_completed_offset_commit_callbacks (self ):
455455 while self .completed_offset_commits :
456- callback , offsets , exception = self .completed_offset_commits .popleft ()
457- callback (offsets , exception )
456+ callback , offsets , res_or_exc = self .completed_offset_commits .popleft ()
457+ callback (offsets , res_or_exc )
458458
459459 def commit_offsets_async (self , offsets , callback = None ):
460460 """Commit specific offsets asynchronously.
@@ -859,20 +859,19 @@ def _handle_offset_fetch_response(self, future, response):
859859 " %s" , self .group_id , tp )
860860 future .success (offsets )
861861
862- def _default_offset_commit_callback (self , offsets , exception ):
863- if exception is not None :
864- log .error ("Offset commit failed: %s" , exception )
865-
866- def _commit_offsets_async_on_complete (self , offsets , exception ):
867- if exception is not None :
862+ def _default_offset_commit_callback (self , offsets , res_or_exc ):
863+ if isinstance (res_or_exc , Exception ):
868864 log .warning ("Auto offset commit failed for group %s: %s" ,
869- self .group_id , exception )
870- if getattr (exception , 'retriable' , False ):
871- self .next_auto_commit_deadline = min (time .time () + self .config ['retry_backoff_ms' ] / 1000 , self .next_auto_commit_deadline )
865+ self .group_id , res_or_exc )
872866 else :
873867 log .debug ("Completed autocommit of offsets %s for group %s" ,
874868 offsets , self .group_id )
875869
870+ def _commit_offsets_async_on_complete (self , offsets , res_or_exc ):
871+ if isinstance (res_or_exc , Exception ) and getattr (res_or_exc , 'retriable' , False ):
872+ self .next_auto_commit_deadline = min (time .time () + self .config ['retry_backoff_ms' ] / 1000 , self .next_auto_commit_deadline )
873+ self .config ['default_offset_commit_callback' ](offsets , res_or_exc )
874+
876875 def _maybe_auto_commit_offsets_async (self ):
877876 if self .config ['enable_auto_commit' ]:
878877 if self .coordinator_unknown ():
0 commit comments