@@ -857,14 +857,12 @@ def _disable_heartbeat_thread(self):
857857 self ._heartbeat_thread .disable ()
858858
859859 def _close_heartbeat_thread (self , timeout_ms = None ):
860- with self ._lock :
861- if self ._heartbeat_thread is not None :
862- heartbeat_log .info ('Stopping heartbeat thread' )
863- try :
864- self ._heartbeat_thread .close (timeout_ms = timeout_ms )
865- except ReferenceError :
866- pass
867- self ._heartbeat_thread = None
860+ if self ._heartbeat_thread is not None :
861+ try :
862+ self ._heartbeat_thread .close (timeout_ms = timeout_ms )
863+ except ReferenceError :
864+ pass
865+ self ._heartbeat_thread = None
868866
869867 def __del__ (self ):
870868 try :
@@ -1047,17 +1045,20 @@ def disable(self):
10471045 self .enabled = False
10481046
10491047 def close (self , timeout_ms = None ):
1050- if self .closed :
1051- return
1052- self . closed = True
1048+ with self .coordinator . _lock :
1049+ if self . closed :
1050+ return
10531051
1054- # Generally this should not happen - close() is triggered
1055- # by the coordinator. But in some cases GC may close the coordinator
1056- # from within the heartbeat thread.
1057- if threading .current_thread () == self :
1058- return
1052+ heartbeat_log .info ('Stopping heartbeat thread' )
1053+ self .closed = True
10591054
1060- with self .coordinator ._lock :
1055+ # Generally this should not happen - close() is triggered
1056+ # by the coordinator. But in some cases GC may close the coordinator
1057+ # from within the heartbeat thread.
1058+ if threading .current_thread () == self :
1059+ return
1060+
1061+ # Notify coordinator lock to wake thread from sleep/lock.wait
10611062 self .coordinator ._lock .notify ()
10621063
10631064 if self .is_alive ():
0 commit comments