@@ -776,12 +776,12 @@ def _disable_heartbeat_thread(self):
776776 if self ._heartbeat_thread is not None :
777777 self ._heartbeat_thread .disable ()
778778
779- def _close_heartbeat_thread (self ):
779+ def _close_heartbeat_thread (self , timeout_ms = None ):
780780 with self ._lock :
781781 if self ._heartbeat_thread is not None :
782782 log .info ('Stopping heartbeat thread' )
783783 try :
784- self ._heartbeat_thread .close ()
784+ self ._heartbeat_thread .close (timeout_ms = timeout_ms )
785785 except ReferenceError :
786786 pass
787787 self ._heartbeat_thread = None
@@ -790,13 +790,13 @@ def __del__(self):
790790 if hasattr (self , '_heartbeat_thread' ):
791791 self ._close_heartbeat_thread ()
792792
793- def close (self ):
793+ def close (self , timeout_ms = None ):
794794 """Close the coordinator, leave the current group,
795795 and reset local generation / member_id"""
796- self ._close_heartbeat_thread ()
797- self .maybe_leave_group ()
796+ self ._close_heartbeat_thread (timeout_ms = timeout_ms )
797+ self .maybe_leave_group (timeout_ms = timeout_ms )
798798
799- def maybe_leave_group (self ):
799+ def maybe_leave_group (self , timeout_ms = None ):
800800 """Leave the current group and reset local generation/memberId."""
801801 with self ._client ._lock , self ._lock :
802802 if (not self .coordinator_unknown ()
@@ -811,7 +811,7 @@ def maybe_leave_group(self):
811811 future = self ._client .send (self .coordinator_id , request )
812812 future .add_callback (self ._handle_leave_group_response )
813813 future .add_errback (log .error , "LeaveGroup request failed: %s" )
814- self ._client .poll (future = future )
814+ self ._client .poll (future = future , timeout_ms = timeout_ms )
815815
816816 self .reset_generation ()
817817
@@ -957,7 +957,7 @@ def disable(self):
957957 log .debug ('Disabling heartbeat thread' )
958958 self .enabled = False
959959
960- def close (self ):
960+ def close (self , timeout_ms = None ):
961961 if self .closed :
962962 return
963963 self .closed = True
@@ -972,7 +972,9 @@ def close(self):
972972 self .coordinator ._lock .notify ()
973973
974974 if self .is_alive ():
975- self .join (self .coordinator .config ['heartbeat_interval_ms' ] / 1000 )
975+ if timeout_ms is None :
976+ timeout_ms = self .coordinator .config ['heartbeat_interval_ms' ]
977+ self .join (timeout_ms / 1000 )
976978 if self .is_alive ():
977979 log .warning ("Heartbeat thread did not fully terminate during close" )
978980
0 commit comments