@@ -721,11 +721,15 @@ def init_hub(self):
721721 if self .restore_engines :
722722 self .hub ._load_engine_state ()
723723
724- def launch_python_scheduler (self , scheduler_args , children ):
724+ def launch_python_scheduler (self , name , scheduler_args , children ):
725725 if 'Process' in self .mq_class :
726726 # run the Python scheduler in a Process
727- q = Process (target = launch_scheduler , kwargs = scheduler_args )
728- q .daemon = True
727+ q = Process (
728+ target = launch_scheduler ,
729+ kwargs = scheduler_args ,
730+ name = name ,
731+ daemon = True ,
732+ )
729733 children .append (q )
730734 else :
731735 # single-threaded Controller
@@ -750,12 +754,16 @@ def get_python_scheduler_args(
750754 }
751755
752756 def launch_broadcast_schedulers (self , monitor_url , children ):
753- def launch_in_thread_or_process (scheduler_args ):
757+ def launch_in_thread_or_process (scheduler_args , depth , identity ):
754758
755759 if 'Process' in self .mq_class :
756760 # run the Python scheduler in a Process
757- q = Process (target = launch_broadcast_scheduler , kwargs = scheduler_args )
758- q .daemon = True
761+ q = Process (
762+ target = launch_broadcast_scheduler ,
763+ kwargs = scheduler_args ,
764+ name = f"BroadcastScheduler(depth={ depth } , id={ identity } )" ,
765+ daemon = True ,
766+ )
759767 children .append (q )
760768 else :
761769 # single-threaded Controller
@@ -805,7 +813,7 @@ def recursively_start_schedulers(identity, depth):
805813 self .client_url (BroadcastScheduler .port_name , outgoing_id2 ),
806814 ]
807815 )
808- launch_in_thread_or_process (scheduler_args )
816+ launch_in_thread_or_process (scheduler_args , depth = depth , identity = identity )
809817 if not is_leaf :
810818 recursively_start_schedulers (outgoing_id1 , depth + 1 )
811819 recursively_start_schedulers (outgoing_id2 , depth + 1 )
@@ -823,6 +831,7 @@ def init_schedulers(self):
823831 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
824832 # IOPub relay (in a Process)
825833 q = mq (zmq .PUB , zmq .SUB , zmq .PUB , b'N/A' , b'iopub' )
834+ q .name = "IOPubScheduler"
826835 q .bind_in (self .client_url ('iopub' ))
827836 q .setsockopt_in (zmq .IDENTITY , ident + b"_iopub" )
828837 q .bind_out (self .engine_url ('iopub' ))
@@ -833,6 +842,7 @@ def init_schedulers(self):
833842
834843 # Multiplexer Queue (in a Process)
835844 q = mq (zmq .ROUTER , zmq .ROUTER , zmq .PUB , b'in' , b'out' )
845+ q .name = "DirectScheduler"
836846
837847 q .bind_in (self .client_url ('mux' ))
838848 q .setsockopt_in (zmq .IDENTITY , b'mux_in' )
@@ -844,6 +854,7 @@ def init_schedulers(self):
844854
845855 # Control Queue (in a Process)
846856 q = mq (zmq .ROUTER , zmq .ROUTER , zmq .PUB , b'incontrol' , b'outcontrol' )
857+ q .name = "ControlScheduler"
847858 q .bind_in (self .client_url ('control' ))
848859 q .setsockopt_in (zmq .IDENTITY , b'control_in' )
849860 q .bind_out (self .engine_url ('control' ))
@@ -859,6 +870,7 @@ def init_schedulers(self):
859870 if scheme == 'pure' :
860871 self .log .warn ("task::using pure DEALER Task scheduler" )
861872 q = mq (zmq .ROUTER , zmq .DEALER , zmq .PUB , b'intask' , b'outtask' )
873+ q .name = "TaskScheduler(pure)"
862874 # q.setsockopt_out(zmq.HWM, hub.hwm)
863875 q .bind_in (self .client_url ('task' ))
864876 q .setsockopt_in (zmq .IDENTITY , b'task_in' )
@@ -868,11 +880,12 @@ def init_schedulers(self):
868880 q .daemon = True
869881 children .append (q )
870882 elif scheme == 'none' :
871- self .log .warn ("task::using no Task scheduler" )
883+ self .log .warning ("task::using no Task scheduler" )
872884
873885 else :
874886 self .log .info ("task::using Python %s Task scheduler" % scheme )
875887 self .launch_python_scheduler (
888+ 'TaskScheduler' ,
876889 self .get_python_scheduler_args ('task' , TaskScheduler , monitor_url ),
877890 children ,
878891 )
@@ -944,6 +957,14 @@ def start(self):
944957 # otherwise signal-handling will fire multiple times
945958 for child in self .children :
946959 child .start ()
960+ if hasattr (child , 'launcher' ):
961+ # apply name to actual process/thread for logging
962+ setattr (child .launcher , 'name' , child .name )
963+ if not self .use_threads :
964+ process = getattr (child , 'launcher' , child )
965+ self .log .debug (f"Started process { child .name } : { process .pid } " )
966+ else :
967+ self .log .debug (f"Started thread { child .name } " )
947968
948969 self .init_signal ()
949970
0 commit comments