1515"""
1616import asyncio
1717import logging
18+ import multiprocessing
1819import os
1920import signal
21+ import sys
2022from multiprocessing import Pipe
21- from multiprocessing import Process
23+ from multiprocessing . connection import Connection
2224from threading import Thread
2325
2426import psutil
25- import tornado .ioloop
2627import zmq
2728from jupyter_client .session import Session
29+ from tornado .ioloop import IOLoop
2830from traitlets .config import Config
2931from zmq .eventloop .zmqstream import ZMQStream
3032
3537class KernelNanny :
3638 """Object for monitoring
3739
38- Must be handling signal messages"""
40+ Must be child of engine
41+
42+ Handles signal messages and watches Engine process for exiting
43+ """
3944
4045 def __init__ (
4146 self ,
@@ -65,6 +70,7 @@ def __init__(
6570 self .control_handlers = {
6671 "signal_request" : self .signal_request ,
6772 }
73+ self ._finish_called = False
6874
6975 def wait_for_parent_thread (self ):
7076 """Wait for my parent to exit, then I'll notify the controller and shut down"""
@@ -79,6 +85,20 @@ def wait_for_parent_thread(self):
7985 self .log .critical (f"Parent { self .pid } exited with status { exit_code } ." )
8086 self .loop .add_callback (self .finish )
8187
88+ def pipe_handler (self , fd , events ):
89+ self .log .debug (f"Pipe event { events } " )
90+ self .loop .remove_handler (fd )
91+ try :
92+ status = self .parent_process .wait (0 )
93+ except psutil .TimeoutExpired :
94+ try :
95+ status = self .parent_process .status ()
96+ except psutil .NoSuchProcessError :
97+ status = "exited"
98+
99+ self .log .critical (f"Pipe closed, parent { self .pid } has status: { status } " )
100+ self .finish ()
101+
82102 def notify_exit (self ):
83103 """Notify the Hub that our parent has exited"""
84104 self .log .info ("Notifying Hub that our parent has shut down" )
@@ -91,6 +111,9 @@ def notify_exit(self):
91111
92112 def finish (self ):
93113 """Prepare to exit and stop our event loop."""
114+ if self ._finish_called :
115+ return
116+ self ._finish_called = True
94117 self .notify_exit ()
95118 self .loop .add_callback (self .loop .stop )
96119
@@ -160,7 +183,7 @@ def start(self):
160183 # ignore SIGINT sent to parent
161184 signal .signal (signal .SIGINT , signal .SIG_IGN )
162185
163- self .loop = tornado . ioloop . IOLoop .current ()
186+ self .loop = IOLoop .current ()
164187 self .context = zmq .Context ()
165188
166189 # set up control socket (connection to Scheduler)
@@ -176,7 +199,15 @@ def start(self):
176199
177200 # now that we've bound, pass port to parent via AsyncResult
178201 self .start_pipe .send (f"tcp://127.0.0.1:{ port } " )
179- self .loop .add_timeout (self .loop .time () + 10 , self .start_pipe .close )
202+ if sys .platform .startswith ("win" ):
203+ # close the pipe on Windows
204+ self .loop .add_timeout (self .loop .time () + 10 , self .start_pipe .close )
205+ else :
206+ # otherwise, watch for the pipe to close
207+ # as a signal that our parent is shutting down
208+ self .loop .add_handler (
209+ self .start_pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
210+ )
180211 self .parent_stream = ZMQStream (self .parent_socket )
181212 self .parent_stream .on_recv_stream (self .dispatch_parent )
182213 try :
@@ -197,7 +228,7 @@ def main(cls, *args, **kwargs):
197228 """
198229 # start a new event loop for the forked process
199230 asyncio .set_event_loop (asyncio .new_event_loop ())
200- tornado . ioloop . IOLoop ().make_current ()
231+ IOLoop ().make_current ()
201232 self = cls (* args , ** kwargs )
202233 self .start ()
203234
@@ -214,12 +245,22 @@ def start_nanny(**kwargs):
214245 """
215246
216247 pipe_r , pipe_w = Pipe (duplex = False )
248+
217249 kwargs ['start_pipe' ] = pipe_w
218250 kwargs ['pid' ] = os .getpid ()
219- p = Process (target = KernelNanny .main , kwargs = kwargs , name = "KernelNanny" , daemon = True )
251+ # make sure to not use fork, which can be an issue for MPI
252+ p = multiprocessing .get_context ("spawn" ).Process (
253+ target = KernelNanny .main ,
254+ kwargs = kwargs ,
255+ name = "KernelNanny" ,
256+ daemon = True ,
257+ )
220258 p .start ()
221259 # close our copy of the write pipe
222260 pipe_w .close ()
223261 nanny_url = pipe_r .recv ()
224- pipe_r .close ()
225- return nanny_url
262+ if sys .platform .startswith ("win" ):
263+ pipe_r .close ()
264+ # return the handle on the read pipe
265+ # need to keep this open for the nanny
266+ return nanny_url , pipe_r
0 commit comments