@@ -101,7 +101,7 @@ class Cluster(AsyncFirst, LoggingConfigurable):
101101
102102 @default ("cluster_id" )
103103 def _default_cluster_id (self ):
104- return f"{ socket . gethostname () } - { int (time .time ())} -{ '' .join (random .choice (_suffix_chars ) for i in range (4 ))} "
104+ return f"{ int (time .time ())} -{ '' .join (random .choice (_suffix_chars ) for i in range (4 ))} "
105105
106106 profile_dir = Unicode (
107107 help = """The profile directory.
@@ -511,6 +511,7 @@ def from_file(
511511 def write_cluster_file (self ):
512512 """Write cluster info to disk for later loading"""
513513 os .makedirs (os .path .dirname (self .cluster_file ), exist_ok = True )
514+ self .log .debug (f"Updating { self .cluster_file } " )
514515 with open (self .cluster_file , "w" ) as f :
515516 json .dump (self .to_dict (), f )
516517
@@ -523,6 +524,14 @@ def remove_cluster_file(self):
523524 else :
524525 self .log .debug (f"Removed cluster file: { self .cluster_file } " )
525526
527+ def _is_running (self ):
528+ """Return if we have any running components"""
529+ if self .controller and self .controller .state != 'after' :
530+ return True
531+ if any (es .state != 'after' for es in self .engines .values ()):
532+ return True
533+ return False
534+
526535 def update_cluster_file (self ):
527536 """Update my cluster file
528537
@@ -533,9 +542,7 @@ def update_cluster_file(self):
533542 # setting cluster_file='' disables saving to disk
534543 return
535544
536- if (not self .controller or self .controller .state == 'after' ) and not any (
537- es .state == 'after' for es in self .engines .values ()
538- ):
545+ if not self ._is_running ():
539546 self .remove_cluster_file ()
540547 else :
541548 self .write_cluster_file ()
@@ -604,14 +611,23 @@ def _controller_stopped(self, stop_data=None):
604611 self .log .info (f"Controller stopped: { stop_data } " )
605612 self .update_cluster_file ()
606613
614+ def _new_engine_set_id (self ):
615+ """Generate a new engine set id"""
616+ engine_set_id = base = f"{ int (time .time ())} "
617+ i = 1
618+ while engine_set_id in self .engines :
619+ engine_set_id = f"{ base } -{ i } "
620+ i += 1
621+ return engine_set_id
622+
607623 async def start_engines (self , n = None , engine_set_id = None , ** kwargs ):
608624 """Start an engine set
609625
610626 Returns an engine set id which can be used in stop_engines
611627 """
612628 # TODO: send engines connection info
613629 if engine_set_id is None :
614- engine_set_id = f" { int ( time . time ()) } - { '' . join ( random . choice ( _suffix_chars ) for i in range ( 4 )) } "
630+ engine_set_id = self . _new_engine_set_id ()
615631 engine_set = self .engines [engine_set_id ] = self .engine_launcher_class (
616632 work_dir = u'.' ,
617633 parent = self ,
@@ -853,10 +869,10 @@ def load_clusters(
853869 for key , cluster in list (self .clusters .items ()):
854870 # remove stopped clusters
855871 # but not *new* clusters that haven't started yet
856- if ( cluster .controller and cluster . controller . state == 'after' ) and all (
857- es . state == 'after' for es in cluster . engines . values ()
858- ):
859- self .log .info ("Removing stopped cluster {key}" )
872+ # if ` cluster.controller` is present
873+ # that means it was running at some point
874+ if cluster . controller and not cluster . _is_running ( ):
875+ self .log .info (f "Removing stopped cluster { key } " )
860876 self .clusters .pop (key )
861877
862878 if profile_dirs is None :
0 commit comments